Skip to content

Commit 2f90289

Browse files
committed
Improve and add cooperative thread built-ins
1 parent ad9f471 commit 2f90289

3 files changed

Lines changed: 316 additions & 60 deletions

File tree

design/mvp/CanonicalABI.md

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,9 @@ A `Thread` can be in one of the following 3 states:
627627
component instance
628628

629629
The `waiting` state has additional `ready` and non-`ready` sub-states such that
630-
a `waiting` thread may only be resumed once it's `ready`.
630+
a `waiting` thread may only be resumed once it's `ready`. Because all threads
631+
are contained by a single component instance, each component instance has an
632+
associated (potentially-empty) list of threads that are `ready` to be resumed.
631633
```python
632634
def running(self):
633635
return self.parent_lock is not None
@@ -640,6 +642,9 @@ a `waiting` thread may only be resumed once it's `ready`.
640642

641643
def ready(self):
642644
return self.waiting() and self.ready_func()
645+
646+
def ready_list(inst: ComponentInstance) -> list[Thread]:
647+
return [t for t in inst.threads.array if t and t.ready()]
643648
```
644649

645650
When a `Thread` is created, an internal `threading.Thread` is started and
@@ -664,32 +669,75 @@ Thus, threads start blocked and have to be explicitly unblocked by wasm code.
664669
self.fiber_lock.acquire()
665670
assert(self.running() and not cancelled(self.resume_arg))
666671
self.resume_arg = None
667-
thread_func(self)
668-
assert(self.running())
669-
self.task.thread_stop(self)
670-
if self.index is not None:
671-
self.task.inst.threads.remove(self.index)
672-
self.parent_lock.release()
672+
try:
673+
thread_func(self)
674+
self.exit()
675+
except ThreadExit:
676+
return
677+
assert(False)
673678
self.fiber = threading.Thread(target = fiber_func)
674679
self.fiber.start()
675680
assert(self.suspended())
676681
```
677682
`Thread`s register themselves with their containing `Task` via
678-
`Task.thread_start` and unregister themselves via `Task.thread_stop` when they
679-
exit. This registration is used for delivering cancellation requests sent to the
680-
`Task` by the caller (via `Task.request_cancellation`) as well as enforcing
681-
Canonical ABI rules when the last `Thread` in a `Task` exits.
683+
`Task.thread_start` and unregister themselves via `Task.thread_stop` in
684+
`Thread.exit` (defined next).
682685

683-
If a `Thread` was not cancelled while waiting for backpressure, it will be
684-
allocated an `index` in its containing component instance's `threads` table and,
685-
when the `Thread`'s root function returns, this `index` is deallocated by the
686-
code above.
686+
A thread always exits by throwing a `ThreadExit` exception that is thrown by
687+
`Thread.exit`. The call to `exit()` can either happen implicitly, when the core
688+
wasm thread function returns, or explicitly, if core wasm calls the
689+
`thread.exit` built-in (defined below in `canon_thread_exit`). In either case,
690+
the following things happen on the thread before it is released:
691+
```python
692+
def exit(self):
693+
assert(self.running() and self.task.may_block())
694+
if self.index is not None:
695+
self.task.inst.threads.remove(self.index)
696+
self.task.thread_stop(self)
697+
if self.task.is_sync_and_not_returned():
698+
self.release_some_ready_sibling()
699+
else:
700+
self.parent_lock.release()
701+
raise ThreadExit()
702+
```
703+
First, if a `Thread` was not cancelled while waiting for backpressure, it will
704+
have been allocated an `index` in its containing component instance's `threads`
705+
table which is then deallocated and recycled when the thread exits.
687706

707+
Next, `Task.thread_stop` is called to pair with the `Task.thread_start` call in
708+
thread initialization above which, among other things, will trap if this was the
709+
last thread in a task that has not yet returned its value.
710+
711+
Lastly, before throwing the `ThreadExit` exception that actually exits the
712+
Python thread, control flow is transferred to some other thread by releasing its
713+
lock. *Which* thread depends on the state of the `Task` that this thread is
714+
executing on behalf of:
715+
* If the current task was spawned for a non-`async`-typed export and the task
716+
has not yet returned a value, then, by the calling contract, it must not
717+
block. So instead, control flow is transferred to some other thread *in the
718+
same component instance*
719+
720+
721+
...
688722
While in the `running` state, threads have a `parent_lock` which points to a
689723
locked mutex that some parent thread is currently waiting to `acquire()`. When a
690724
thread exits, it transfers control flow to this parent thread by `release()`ing
691725
the `parent_lock` as its final action.
692726

727+
728+
```python
729+
def release_some_ready_sibling(self):
730+
other = random.choice(Thread.ready_list(self.task.inst))
731+
assert(self is not other)
732+
other.stop_waiting()
733+
self.release_sibling(other)
734+
735+
def stop_waiting(self):
736+
assert(self.waiting())
737+
self.ready_func = None
738+
self.task.inst.store.waiting.remove(self)
739+
```
740+
693741
While in the `waiting` state, threads have an associated `ready_func` callback
694742
which is continuously polled by `Store.tick` (semantically, an optimized
695743
implementation would use triggers instead) such that once `ready_func` returns

design/mvp/canonical-abi/definitions.py

Lines changed: 108 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -400,6 +401,9 @@ def waiting(self):
400401
def ready(self):
401402
return self.waiting() and self.ready_func()
402403

404+
def ready_list(inst: ComponentInstance) -> list[Thread]:
405+
return [t for t in inst.threads.array if t and t.ready()]
406+
403407
def __init__(self, task, thread_func):
404408
self.task = task
405409
self.fiber_lock = threading.Lock()
@@ -417,17 +421,47 @@ def fiber_func():
417421
self.fiber_lock.acquire()
418422
assert(self.running() and not cancelled(self.resume_arg))
419423
self.resume_arg = None
420-
thread_func(self)
421-
assert(self.running())
422-
self.task.thread_stop(self)
423-
if self.index is not None:
424-
self.task.inst.threads.remove(self.index)
425-
self.parent_lock.release()
424+
try:
425+
thread_func(self)
426+
self.exit()
427+
except ThreadExit:
428+
return
429+
assert(False)
426430
self.fiber = threading.Thread(target = fiber_func)
427431
self.fiber.start()
428432
assert(self.suspended())
429433

430-
def resume_later(self):
434+
def exit(self):
435+
assert(self.running() and self.task.may_block())
436+
if self.index is not None:
437+
self.task.inst.threads.remove(self.index)
438+
self.task.thread_stop(self)
439+
if self.task.is_sync_and_not_returned():
440+
self.release_some_ready_sibling()
441+
else:
442+
self.parent_lock.release()
443+
raise ThreadExit()
444+
445+
def release_some_ready_sibling(self):
446+
other = random.choice(Thread.ready_list(self.task.inst))
447+
assert(self is not other)
448+
other.stop_waiting()
449+
self.release_sibling(other)
450+
451+
def stop_waiting(self):
452+
assert(self.waiting())
453+
self.ready_func = None
454+
self.task.inst.store.waiting.remove(self)
455+
456+
def release_sibling(self, other: Thread):
457+
assert(other.resume_arg is None)
458+
other.resume_arg = ResumeArg.NOT_CANCELLED
459+
assert(self.parent_lock and not other.parent_lock)
460+
other.parent_lock = self.parent_lock
461+
self.parent_lock = None
462+
other.fiber_lock.release()
463+
464+
def unsuspend(self):
431465
assert(self.suspended())
432466
self.ready_func = lambda: True
433467
self.task.inst.store.waiting.append(self)
@@ -437,8 +471,7 @@ def resume(self, resume_arg):
437471
assert(not self.running())
438472
if self.waiting():
439473
assert(cancelled(resume_arg) or self.ready())
440-
self.ready_func = None
441-
self.task.inst.store.waiting.remove(self)
474+
self.stop_waiting()
442475
assert(self.cancellable or not cancelled(resume_arg))
443476
assert(self.resume_arg is None)
444477
self.resume_arg = resume_arg
@@ -453,7 +486,10 @@ def suspend(self, cancellable) -> ResumeArg:
453486
assert(self.running() and self.task.may_block())
454487
assert(not self.cancellable and self.resume_arg is None)
455488
self.cancellable = cancellable
456-
self.parent_lock.release()
489+
if self.task.is_sync_and_not_returned():
490+
self.release_some_ready_sibling()
491+
else:
492+
self.parent_lock.release()
457493
self.fiber_lock.acquire()
458494
assert(self.running())
459495
self.cancellable = False
@@ -482,16 +518,11 @@ def yield_until(self, ready_func, cancellable) -> ResumeArg:
482518
def yield_(self, cancellable) -> ResumeArg:
483519
return self.yield_until(lambda: True, cancellable)
484520

485-
def switch_to(self, cancellable, other: Thread) -> ResumeArg:
521+
def suspend_to_suspended(self, cancellable, other: Thread) -> ResumeArg:
486522
assert(self.running() and other.suspended())
487-
assert(not self.cancellable and self.resume_arg is None and other.resume_arg is None)
523+
assert(not self.cancellable and self.resume_arg is None)
488524
self.cancellable = cancellable
489-
other.resume_arg = ResumeArg.NOT_CANCELLED
490-
assert(self.parent_lock and not other.parent_lock)
491-
other.parent_lock = self.parent_lock
492-
self.parent_lock = None
493-
assert(not self.running() and other.running())
494-
other.fiber_lock.release()
525+
self.release_sibling(other)
495526
self.fiber_lock.acquire()
496527
assert(self.running())
497528
self.cancellable = False
@@ -501,11 +532,21 @@ def switch_to(self, cancellable, other: Thread) -> ResumeArg:
501532
assert(cancellable or not cancelled(resume_arg))
502533
return resume_arg
503534

504-
def yield_to(self, cancellable, other: Thread) -> ResumeArg:
535+
def yield_to_suspended(self, cancellable, other: Thread) -> ResumeArg:
505536
assert(self.running() and other.suspended())
506537
self.ready_func = lambda: True
507538
self.task.inst.store.waiting.append(self)
508-
return self.switch_to(cancellable, other)
539+
return self.suspend_to_suspended(cancellable, other)
540+
541+
def suspend_to_ready(self, cancellable, other: Thread) -> ResumeArg:
542+
assert(self.running() and other.ready())
543+
other.stop_waiting()
544+
return self.suspend_to_suspended(cancellable, other)
545+
546+
def yield_to_ready(self, cancellable, other: Thread) -> ResumeArg:
547+
assert(self.running() and other.ready())
548+
other.stop_waiting()
549+
return self.yield_to_suspended(cancellable, other)
509550

510551
#### Waitable State
511552

@@ -629,8 +670,11 @@ def thread_stop(self, thread):
629670
def needs_exclusive(self):
630671
return not self.opts.async_ or self.opts.callback
631672

673+
def is_sync_and_not_returned(self):
674+
return not self.ft.async_ and self.state != Task.State.RESOLVED
675+
632676
def may_block(self):
633-
return self.ft.async_ or self.state == Task.State.RESOLVED
677+
return not self.is_sync_and_not_returned() or bool(Thread.ready_list(self.inst))
634678

635679
def enter(self, thread):
636680
assert(thread in self.threads and thread.task is self)
@@ -2518,15 +2562,22 @@ def thread_func(thread):
25182562
new_thread.index = thread.task.inst.threads.add(new_thread)
25192563
return [new_thread.index]
25202564

2521-
### 🧵 `canon thread.resume-later`
2565+
### 🧵 `canon thread.unsuspend`
25222566

2523-
def canon_thread_resume_later(thread, i):
2567+
def canon_thread_unsuspend(thread, i):
25242568
trap_if(not thread.task.inst.may_leave)
25252569
other_thread = thread.task.inst.threads.get(i)
25262570
trap_if(not other_thread.suspended())
2527-
other_thread.resume_later()
2571+
other_thread.unsuspend()
25282572
return []
25292573

2574+
### 🧵 `canon thread.exit`
2575+
2576+
def canon_thread_exit(thread):
2577+
trap_if(not thread.task.inst.may_leave)
2578+
thread.exit()
2579+
assert(False)
2580+
25302581
### 🧵 `canon thread.suspend`
25312582

25322583
def canon_thread_suspend(cancellable, thread):
@@ -2548,28 +2599,55 @@ def canon_thread_yield(cancellable, thread):
25482599
resume_arg = thread.yield_(cancellable)
25492600
return [resume_arg]
25502601

2551-
### 🧵 `canon thread.switch-to`
2602+
### 🧵 `canon thread.suspend-to-suspended`
25522603

2553-
def canon_thread_switch_to(cancellable, thread, i):
2604+
def canon_thread_suspend_to_suspended(cancellable, thread, i):
25542605
trap_if(not thread.task.inst.may_leave)
25552606
other_thread = thread.task.inst.threads.get(i)
25562607
trap_if(not other_thread.suspended())
25572608
if thread.task.deliver_pending_cancel(cancellable):
25582609
resume_arg = ResumeArg.CANCELLED
25592610
else:
2560-
resume_arg = thread.switch_to(cancellable, other_thread)
2611+
resume_arg = thread.suspend_to_suspended(cancellable, other_thread)
25612612
return [resume_arg]
25622613

2563-
### 🧵 `canon thread.yield-to`
2614+
### 🧵 `canon thread.yield-to-suspended`
25642615

2565-
def canon_thread_yield_to(cancellable, thread, i):
2616+
def canon_thread_yield_to_suspended(cancellable, thread, i):
25662617
trap_if(not thread.task.inst.may_leave)
25672618
other_thread = thread.task.inst.threads.get(i)
25682619
trap_if(not other_thread.suspended())
25692620
if thread.task.deliver_pending_cancel(cancellable):
25702621
resume_arg = ResumeArg.CANCELLED
25712622
else:
2572-
resume_arg = thread.yield_to(cancellable, other_thread)
2623+
resume_arg = thread.yield_to_suspended(cancellable, other_thread)
2624+
return [resume_arg]
2625+
2626+
### 🧵 `canon thread.suspend-then-promote`
2627+
2628+
def canon_thread_suspend_then_promote(cancellable, thread, i):
2629+
trap_if(not thread.task.inst.may_leave)
2630+
trap_if(not thread.task.may_block())
2631+
other_thread = thread.task.inst.threads.get(i)
2632+
if thread.task.deliver_pending_cancel(cancellable):
2633+
resume_arg = ResumeArg.CANCELLED
2634+
elif other_thread.ready():
2635+
resume_arg = thread.suspend_to_ready(cancellable, other_thread)
2636+
else:
2637+
resume_arg = thread.suspend(cancellable)
2638+
return [resume_arg]
2639+
2640+
### 🧵 `canon thread.yield-then-promote`
2641+
2642+
def canon_thread_yield_then_promote(cancellable, thread, i):
2643+
trap_if(not thread.task.inst.may_leave)
2644+
other_thread = thread.task.inst.threads.get(i)
2645+
if thread.task.deliver_pending_cancel(cancellable):
2646+
resume_arg = ResumeArg.CANCELLED
2647+
elif other_thread.ready():
2648+
resume_arg = thread.yield_to_ready(cancellable, other_thread)
2649+
else:
2650+
resume_arg = thread.yield_(cancellable)
25732651
return [resume_arg]
25742652

25752653
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)