-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhs_client.py
More file actions
147 lines (124 loc) · 5.31 KB
/
hs_client.py
File metadata and controls
147 lines (124 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from collections.abc import Generator
from contextlib import contextmanager
import datetime
import re
import threading
import xmlrpc.client
from xmlrpc.client import Fault
import requests
import yaml
from cachetools import cached, Cache
from fastapi import HTTPException, Request
CAS_URL = "https://login.hostsharing.net/cas/v1/tickets"
SERVICE = "https://config.hostsharing.net:443/hsar/backend"
BACKEND = "https://config.hostsharing.net:443/hsar/xmlrpc/hsadmin"
class GrantPools:
"""
A grant can only have one active ticket at a time. If you fetch a new ticket old tickets become invalid. So if we want to handle multiple requests concurently we need to reserve the grant for the duration of the request.
To help, this class handles one pool of grants for each username/password combination. The acquire Method provides an exclusive grant.
"""
def __init__(self) -> None:
self.pools = dict[(str, str), list[tuple[str, datetime.datetime]]]()
self.lock = threading.Lock()
@contextmanager
def acquire(self, username: str, password: str) -> Generator[str]:
"""
Use with a with statement to temporarily acquire a grant. The grant will be automatically returned on leaving the block. E.g. after the request
"""
key = (username, password)
(grant, validity) = self._try_get_grant(key)
if grant is None:
grant = get_ticket_grant(username, password)
validity = datetime.datetime.now() + datetime.timedelta(seconds=3000)
try:
yield grant
finally:
self._put_grant(key, (grant, validity))
def _try_get_grant(self, key) -> tuple[str, datetime.datetime] | tuple[None, None]:
with self.lock:
if key not in self.pools:
self.pools[key] = []
grants = self.pools[key]
while len(grants) > 0:
(grant, validity) = grants.pop(0)
if validity > datetime.datetime.now():
return grant, validity
return None, None
def _put_grant(self, key, grant):
with self.lock:
if key not in self.pools:
self.pools[key] = []
self.pools[key].append(grant)
grantPools = GrantPools()
@cached(Cache(maxsize=float("inf")))
def get_credentials(api_key : str) -> dict[str,str]:
with open("env.yaml", "r") as file:
data = yaml.safe_load(file)
api_entries = list(api for api in data['api'] if api['key'] == api_key)
if len(api_entries) != 1:
raise HTTPException(500, "API Key not found")
allowed_pacs = api_entries[0]["pacs"]
if isinstance(allowed_pacs, str):
allowed_pacs = allowed_pacs.split(',')
filtered_creds = {pac: data["pacs"][pac] for pac in allowed_pacs if pac in data["pacs"]}
return filtered_creds
def get_ticket_grant(username: str, password: str) -> str:
# Ticket-Granting Ticket (TGT) holen
resp = requests.post(
CAS_URL,
data={"username": username, "password": password},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
tgt_match = re.search(r'action="([^"]+)"', resp.text)
if not tgt_match:
raise RuntimeError("TGT nicht gefunden")
tgt_url = tgt_match.group(1)
return tgt_url
# ---------- Step 1+2: CAS Authentication ----------
def get_service_ticket(grant: str) -> str:
# Service-Ticket holen
resp = requests.post(
grant,
data={"service": SERVICE},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return resp.text.strip()
# ---------- Step 3: XML-RPC Call ----------
def hs_call(request: Request, method: str, param1, param2=None) -> list:
headers = request.headers
api_key = headers.get("Authorization")
credentials = get_credentials(api_key)
if len(credentials) == 1:
# pac not given, but only one pac configured
username = list(credentials.keys())[0]
elif headers.get("PAC") in credentials:
username = headers.get("PAC")
else:
pac = headers.get("PAC")
raise HTTPException(400, f"PAC {pac} is not configured in this API, please check your credentials.yaml file")
with grantPools.acquire(username, credentials[username]) as grant:
service_ticket = get_service_ticket(grant)
server = xmlrpc.client.ServerProxy(BACKEND)
remote = getattr(server, method)
if param2:
return remote(username, service_ticket, param1, param2)
else:
return remote(username, service_ticket, param1)
def hs_search(request: Request, module: str, where : dict) -> list:
method = module + ".search"
return hs_call(request, method, where)
def hs_update(request: Request, module: str, where : dict, set: dict):
method = module + ".update"
return hs_call(request, method, set, where)
def hs_delete(request: Request, module: str, where : dict) -> list:
method = module + ".delete"
return hs_call(request, method, where)
def hs_add(request: Request, module: str, set : dict) -> list:
try:
method = module + ".add"
return hs_call(request, method, set)
except Fault as e:
print(e)
raise HTTPException(status_code=400, detail="Fehlerhafte Eingaben")
def hs_api(request: Request):
return hs_call(request, 'property.search', {})