Skip to content

Commit 41f9860

Browse files
committed
Make async pool handle graceful degradation
When asyncio event loop fails to start (e.g., on free-threaded Python), the async pool now starts in degraded mode instead of crashing. All async requests return {error, async_not_supported} in this mode. This mirrors the behavior of py_subinterp_pool for unsupported sub-interpreters.
1 parent 2e76d00 commit 41f9860

2 files changed

Lines changed: 66 additions & 19 deletions

File tree

src/py_async_pool.erl

Lines changed: 62 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@
3636
]).
3737

3838
-record(state, {
39-
workers :: queue:queue(pid()),
40-
num_workers :: pos_integer(),
39+
workers :: queue:queue(pid()) | undefined,
40+
num_workers :: non_neg_integer(),
4141
pending :: non_neg_integer(),
42-
worker_sup :: pid()
42+
worker_sup :: pid() | undefined,
43+
supported :: boolean() %% whether async workers are supported
4344
}).
4445

4546
%%% ============================================================================
@@ -70,27 +71,49 @@ init([NumWorkers]) ->
7071
%% Start worker supervisor
7172
{ok, WorkerSup} = py_async_worker_sup:start_link(),
7273

73-
%% Start workers
74-
Workers = start_workers(WorkerSup, NumWorkers),
75-
76-
{ok, #state{
77-
workers = queue:from_list(Workers),
78-
num_workers = NumWorkers,
79-
pending = 0,
80-
worker_sup = WorkerSup
81-
}}.
74+
%% Try to start workers - may fail on free-threaded Python
75+
case start_workers(WorkerSup, NumWorkers) of
76+
{ok, Workers} ->
77+
{ok, #state{
78+
workers = queue:from_list(Workers),
79+
num_workers = NumWorkers,
80+
pending = 0,
81+
worker_sup = WorkerSup,
82+
supported = true
83+
}};
84+
{error, _Reason} ->
85+
%% Async workers not supported (e.g., free-threaded Python)
86+
%% Pool starts but all requests will return an error
87+
{ok, #state{
88+
workers = undefined,
89+
num_workers = 0,
90+
pending = 0,
91+
worker_sup = WorkerSup,
92+
supported = false
93+
}}
94+
end.
8295

8396
handle_call(get_stats, _From, State) ->
97+
AvailWorkers = case State#state.workers of
98+
undefined -> 0;
99+
Q -> queue:len(Q)
100+
end,
84101
Stats = #{
85102
num_workers => State#state.num_workers,
86103
pending_requests => State#state.pending,
87-
available_workers => queue:len(State#state.workers)
104+
available_workers => AvailWorkers,
105+
supported => State#state.supported
88106
},
89107
{reply, Stats, State};
90108

91109
handle_call(_Request, _From, State) ->
92110
{reply, {error, unknown_request}, State}.
93111

112+
handle_cast({request, Request}, #state{supported = false} = State) ->
113+
{Ref, Caller, _} = extract_ref_caller(Request),
114+
Caller ! {py_error, Ref, async_not_supported},
115+
{noreply, State};
116+
94117
handle_cast({request, Request}, State) ->
95118
case queue:out(State#state.workers) of
96119
{{value, Worker}, Rest} ->
@@ -115,17 +138,27 @@ handle_cast(_Msg, State) ->
115138
handle_info({worker_done, _WorkerPid}, State) ->
116139
{noreply, State#state{pending = max(0, State#state.pending - 1)}};
117140

141+
handle_info({'EXIT', _Pid, _Reason}, #state{supported = false} = State) ->
142+
{noreply, State};
143+
118144
handle_info({'EXIT', Pid, Reason}, State) ->
119145
error_logger:error_msg("py_async_pool: worker ~p died: ~p~n", [Pid, Reason]),
120146
%% Remove dead worker from queue and start a new one
121147
Workers = queue:filter(fun(W) -> W =/= Pid end, State#state.workers),
122-
NewWorker = py_async_worker_sup:start_worker(State#state.worker_sup),
123-
NewWorkers = queue:in(NewWorker, Workers),
124-
{noreply, State#state{workers = NewWorkers}};
148+
case py_async_worker_sup:start_worker(State#state.worker_sup) of
149+
{ok, NewWorker} ->
150+
NewWorkers = queue:in(NewWorker, Workers),
151+
{noreply, State#state{workers = NewWorkers}};
152+
{error, _} ->
153+
%% Can't restart worker, continue with remaining workers
154+
{noreply, State#state{workers = Workers}}
155+
end;
125156

126157
handle_info(_Info, State) ->
127158
{noreply, State}.
128159

160+
terminate(_Reason, #state{workers = undefined}) ->
161+
ok;
129162
terminate(_Reason, State) ->
130163
%% Shutdown all workers
131164
Workers = queue:to_list(State#state.workers),
@@ -137,7 +170,19 @@ terminate(_Reason, State) ->
137170
%%% ============================================================================
138171

139172
start_workers(Sup, N) ->
140-
[py_async_worker_sup:start_worker(Sup) || _ <- lists:seq(1, N)].
173+
start_workers(Sup, N, []).
174+
175+
start_workers(_Sup, 0, Acc) ->
176+
{ok, lists:reverse(Acc)};
177+
start_workers(Sup, N, Acc) ->
178+
case py_async_worker_sup:start_worker(Sup) of
179+
{ok, Pid} ->
180+
start_workers(Sup, N - 1, [Pid | Acc]);
181+
{error, Reason} ->
182+
%% Failed to start worker, shutdown any already started
183+
lists:foreach(fun(W) -> W ! shutdown end, Acc),
184+
{error, Reason}
185+
end.
141186

142187
extract_ref_caller({async_call, Ref, Caller, _, _, _, _}) -> {Ref, Caller, async_call};
143188
extract_ref_caller({async_gather, Ref, Caller, _}) -> {Ref, Caller, async_gather};

src/py_async_worker_sup.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ start_link() ->
2828
supervisor:start_link(?MODULE, []).
2929

3030
start_worker(Sup) ->
31-
{ok, Pid} = supervisor:start_child(Sup, []),
32-
Pid.
31+
case supervisor:start_child(Sup, []) of
32+
{ok, Pid} -> {ok, Pid};
33+
{error, Reason} -> {error, Reason}
34+
end.
3335

3436
init([]) ->
3537
WorkerSpec = #{

0 commit comments

Comments
 (0)