Skip to content

Commit b93a625

Browse files
authored
Merge pull request #750 from lupko/flight-fixes
RELATED: CQ-555 - small improvements in flex funs implementation
2 parents ba9e2fa + a31fcf9 commit b93a625

4 files changed

Lines changed: 117 additions & 64 deletions

File tree

gooddata-flight-server/gooddata_flight_server/flexfun/flex_fun_registry.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,16 @@ class FlexFunRegistry:
1717
def __init__(self) -> None:
1818
self._logger = structlog.get_logger("gooddata_flexfun.registry")
1919
self._fun_by_name: Dict[str, Type[FlexFun]] = {}
20+
self._fun_names: tuple[str, ...] = ()
2021
self._loaded_modules: List[str] = []
2122

23+
@property
24+
def flex_funs_names(self) -> tuple[str, ...]:
25+
"""
26+
:return: names of available functions
27+
"""
28+
return self._fun_names
29+
2230
@property
2331
def flex_funs(self) -> Dict[str, Type[FlexFun]]:
2432
"""
@@ -56,6 +64,7 @@ def _initialize_and_register(self, ctx: ServerContext, fun: Type[FlexFun]) -> No
5664

5765
fun.on_load(ctx)
5866
self._fun_by_name[fun.Name] = fun
67+
self._fun_names = tuple(self._fun_by_name.keys())
5968

6069
def register(self, ctx: ServerContext, *funs: Type[FlexFun]) -> "FlexFunRegistry":
6170
"""

gooddata-flight-server/gooddata_flight_server/flexfun/flex_fun_task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Dict, Iterable, List, Optional, Tuple, Union
33

44
import pyarrow
5+
import structlog
56

67
from gooddata_flight_server.flexfun.flex_fun import FlexFun
78
from gooddata_flight_server.tasks.base import ArrowData
@@ -12,6 +13,8 @@
1213
TaskResult,
1314
)
1415

16+
_LOGGER = structlog.get_logger("gooddata_flexfun.task")
17+
1518

1619
class _FlexFunResult(FlightDataTaskResult):
1720
__slots__ = ("_result",)
@@ -59,7 +62,16 @@ def __init__(
5962
self._columns = columns
6063
self._headers = headers
6164

65+
_LOGGER.info("flexfun_task_created", fun=fun.Name, task_id=self._task_id)
66+
67+
@property
68+
def fun_name(self) -> Optional[str]:
69+
return self._fun.Name
70+
6271
def run(self) -> Union[TaskResult, TaskError]:
72+
structlog.contextvars.bind_contextvars(fun=self._fun.Name, task_id=self._task_id)
73+
_LOGGER.info("flexfun_task_run")
74+
6375
result = self._fun.call(
6476
parameters=self._parameters,
6577
columns=self._columns,
@@ -69,4 +81,6 @@ def run(self) -> Union[TaskResult, TaskError]:
6981
return _FlexFunResult(result)
7082

7183
def on_task_cancel(self) -> None:
84+
_LOGGER.info("flexfun_task_cancel", fun=self._fun.Name, task_id=self._task_id)
85+
7286
self._fun.cancel()

gooddata-flight-server/gooddata_flight_server/flexfun/flight_methods.py

Lines changed: 93 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -126,87 +126,117 @@ def _prepare_flight_info(self, task_result: TaskExecutionResult) -> pyarrow.flig
126126
def list_flights(
127127
self, context: pyarrow.flight.ServerCallContext, criteria: bytes
128128
) -> Generator[pyarrow.flight.FlightInfo, None, None]:
129+
structlog.contextvars.bind_contextvars(peer=context.peer())
130+
_LOGGER.info("list_flights", available_funs=self._registry.flex_funs_names)
131+
129132
return (self._create_fun_info(fun) for fun in self._registry.flex_funs.values())
130133

131134
def get_flight_info(
132135
self,
133136
context: pyarrow.flight.ServerCallContext,
134137
descriptor: pyarrow.flight.FlightDescriptor,
135138
) -> pyarrow.flight.FlightInfo:
136-
task = self._prepare_task(context, descriptor)
137-
138-
self._ctx.task_executor.submit(task)
139+
structlog.contextvars.bind_contextvars(peer=context.peer())
140+
task: Optional[FlexFunTask] = None
139141

140142
try:
141-
# XXX: this should be enhanced to implement polling
142-
task_result = self._ctx.task_executor.wait_for_result(task.task_id, _DEFAULT_TASK_WAIT)
143-
except TaskWaitTimeoutError:
144-
raise ErrorInfo.for_reason(
145-
ErrorCode.TIMEOUT, f"GetFlightInfo timed out while waiting for task {task.task_id}."
146-
).to_timeout_error()
147-
148-
# if this bombs then there must be something really wrong because the task
149-
# was clearly submitted and code was waiting for its completion. this invariant
150-
# should not happen in this particular code path. The None return value may
151-
# be applicable one day when polling is in use and a request comes to check whether
152-
# particular task id finished
153-
assert task_result is not None
143+
task = self._prepare_task(context, descriptor)
144+
self._ctx.task_executor.submit(task)
145+
146+
try:
147+
# XXX: this should be enhanced to implement polling
148+
task_result = self._ctx.task_executor.wait_for_result(task.task_id, _DEFAULT_TASK_WAIT)
149+
except TaskWaitTimeoutError:
150+
raise ErrorInfo.for_reason(
151+
ErrorCode.TIMEOUT, f"GetFlightInfo timed out while waiting for task {task.task_id}."
152+
).to_timeout_error()
153+
154+
# if this bombs then there must be something really wrong because the task
155+
# was clearly submitted and code was waiting for its completion. this invariant
156+
# should not happen in this particular code path. The None return value may
157+
# be applicable one day when polling is in use and a request comes to check whether
158+
# particular task id finished
159+
assert task_result is not None
160+
161+
return self._prepare_flight_info(task_result)
162+
except Exception:
163+
if task is not None:
164+
_LOGGER.error("get_flight_info_failed", task_id=task.task_id, fun=task.fun_name, exc_info=True)
165+
else:
166+
_LOGGER.error("flexfun_submit_failed", exc_info=True)
154167

155-
return self._prepare_flight_info(task_result)
168+
raise
156169

157170
def do_get(
158171
self,
159172
context: pyarrow.flight.ServerCallContext,
160173
ticket: pyarrow.flight.Ticket,
161174
) -> pyarrow.flight.FlightDataStream:
175+
structlog.contextvars.bind_contextvars(peer=context.peer())
176+
162177
try:
163-
ticket_payload = orjson.loads(ticket.ticket)
178+
try:
179+
ticket_payload = orjson.loads(ticket.ticket)
180+
except Exception:
181+
raise ErrorInfo.bad_argument("Incorrect ticket payload. The ticket payload is not a valid JSON.")
182+
183+
task_id = ticket_payload.get("task_id")
184+
if task_id is None or not len(task_id):
185+
raise ErrorInfo.bad_argument("Incorrect ticket payload. The ticket payload does not specify 'task_id'.")
186+
187+
task_result = self._ctx.task_executor.wait_for_result(task_id)
188+
if task_result is None:
189+
raise ErrorInfo.for_reason(
190+
ErrorCode.INVALID_TICKET,
191+
f"Unable to serve data for task '{task_id}'. The task result is not present.",
192+
).to_user_error()
193+
194+
result = task_result.result
195+
if not isinstance(result, FlightDataTaskResult):
196+
raise ErrorInfo.for_reason(
197+
ErrorCode.INTERNAL_ERROR,
198+
f"An internal error has occurred while attempting read result for '{task_id}'."
199+
f"While the result exists, it is of an unexpected type. "
200+
f"This is a bug in FlexFun server implementation.",
201+
).to_internal_error()
202+
203+
rlock, data = result.acquire_data()
204+
205+
def _on_end(_: Optional[pyarrow.ArrowException]) -> None:
206+
"""
207+
Once the request that streams the data out is done, make sure
208+
to release the read-lock. Single-use results are closed at
209+
this point because the data cannot be read again anyway.
210+
"""
211+
rlock.release()
212+
213+
if result.single_use_data:
214+
# note: results with single-use data can only ever have one active
215+
# reader (e.g. this one). since the rlock is now released the
216+
# close will proceed without chance of being blocked
217+
try:
218+
result.close()
219+
except Exception:
220+
# log and sink these Exceptions - not much to do
221+
_LOGGER.error("do_get_close_failed", exc_info=True)
222+
223+
finalizer = self.call_finalizer_middleware(context)
224+
finalizer.register_on_end(_on_end)
225+
226+
if isinstance(data, pyarrow.Table):
227+
_LOGGER.info("do_get_table", task_id=task_id, num_rows=data.num_rows)
228+
229+
return pyarrow.flight.RecordBatchStream(data)
230+
elif isinstance(data, pyarrow.RecordBatchReader):
231+
_LOGGER.info("do_get_reader", task_id=task_id)
232+
233+
return pyarrow.flight.RecordBatchStream(data)
234+
235+
_LOGGER.info("do_get_generator", task_id=task_id)
236+
return pyarrow.flight.GeneratorStream(data)
164237
except Exception:
165-
raise ErrorInfo.bad_argument("Incorrect ticket payload. The ticket payload is not a valid JSON.")
166-
167-
task_id = ticket_payload.get("task_id")
168-
if task_id is None or not len(task_id):
169-
raise ErrorInfo.bad_argument("Incorrect ticket payload. The ticket payload does not specify 'task_id'.")
170-
171-
task_result = self._ctx.task_executor.wait_for_result(task_id)
172-
if task_result is None:
173-
raise ErrorInfo.for_reason(
174-
ErrorCode.INVALID_TICKET,
175-
f"Unable to serve data for task '{task_id}'. The task result is not present.",
176-
).to_user_error()
177-
178-
result = task_result.result
179-
if not isinstance(result, FlightDataTaskResult):
180-
raise ErrorInfo.for_reason(
181-
ErrorCode.INTERNAL_ERROR,
182-
f"An internal error has occurred while attempting read result for '{task_id}'."
183-
f"While the result exists, it is of an unexpected type. This is a bug in FlexFun server implementation.",
184-
).to_internal_error()
185-
186-
rlock, data = result.acquire_data()
187-
188-
def _on_end(_: Optional[pyarrow.ArrowException]) -> None:
189-
"""
190-
Once the request that streams the data out is done, make sure
191-
to release the read-lock. Single-use results are closed at
192-
this point because the data cannot be read again anyway.
193-
"""
194-
rlock.release()
195-
196-
if result.single_use_data:
197-
# note: results with single-use data can only ever have one active
198-
# reader (e.g. this one). since the rlock is now released the
199-
# close will proceed without chance of being blocked
200-
try:
201-
result.close()
202-
except Exception:
203-
# log and sink these Exceptions - not much to do
204-
_LOGGER.error("do_get_close_failed", exc_info=True)
205-
206-
finalizer = self.call_finalizer_middleware(context)
207-
finalizer.register_on_end(_on_end)
208-
209-
return pyarrow.flight.RecordBatchStream(data)
238+
_LOGGER.error("do_get_failed", exc_info=True)
239+
raise
210240

211241

212242
_FLEXFUN_CONFIG_SECTION = "flexfun"

gooddata-flight-server/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
description="Flight RPC server to host custom functions for GoodData Cloud",
2323
long_description=long_description,
2424
long_description_content_type="text/markdown",
25-
version="1.21.0",
25+
version="1.23.0",
2626
author="GoodData",
2727
author_email="support@gooddata.com",
2828
license="MIT",

0 commit comments

Comments
 (0)