Skip to content

Commit 28e4e30

Browse files
committed
Improve and add cooperative thread built-ins
1 parent d1088c8 commit 28e4e30

2 files changed

Lines changed: 253 additions & 45 deletions

File tree

design/mvp/canonical-abi/definitions.py

Lines changed: 108 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class Trap(BaseException): pass
1818
class CoreWebAssemblyException(BaseException): pass
19+
class ThreadExit(BaseException): pass
1920

2021
def trap():
2122
raise Trap()
@@ -400,6 +401,9 @@ def waiting(self):
400401
def ready(self):
401402
return self.waiting() and self.ready_func()
402403

404+
def ready_list(inst: ComponentInstance) -> list[Thread]:
405+
return [t for t in inst.threads.array if t and t.ready()]
406+
403407
def __init__(self, task, thread_func):
404408
self.task = task
405409
self.fiber_lock = threading.Lock()
@@ -415,18 +419,48 @@ def fiber_func():
415419
self.fiber_lock.acquire()
416420
assert(self.running() and not cancelled(self.resume_arg))
417421
self.resume_arg = None
418-
thread_func(self)
419-
assert(self.running())
420-
self.task.thread_stop(self)
421-
if self.index is not None:
422-
self.task.inst.threads.remove(self.index)
423-
self.parent_lock.release()
422+
try:
423+
thread_func(self)
424+
self.exit()
425+
except ThreadExit:
426+
return
427+
assert(False)
424428
self.fiber = threading.Thread(target = fiber_func)
425429
self.fiber.start()
426430
self.task.thread_start(self)
427431
assert(self.suspended())
428432

429-
def resume_later(self):
433+
def exit(self):
434+
assert(self.running() and self.task.may_block())
435+
if self.index is not None:
436+
self.task.inst.threads.remove(self.index)
437+
self.task.thread_stop(self)
438+
if self.task.is_sync_before_return():
439+
self.release_some_ready_sibling()
440+
else:
441+
self.parent_lock.release()
442+
raise ThreadExit()
443+
444+
def release_some_ready_sibling(self):
445+
other = random.choice(Thread.ready_list(self.task.inst))
446+
assert(self is not other)
447+
other.stop_waiting()
448+
self.release_sibling(other)
449+
450+
def stop_waiting(self):
451+
assert(self.waiting())
452+
self.ready_func = None
453+
self.task.inst.store.waiting.remove(self)
454+
455+
def release_sibling(self, other: Thread):
456+
assert(other.resume_arg is None)
457+
other.resume_arg = ResumeArg.NOT_CANCELLED
458+
assert(self.parent_lock and not other.parent_lock)
459+
other.parent_lock = self.parent_lock
460+
self.parent_lock = None
461+
other.fiber_lock.release()
462+
463+
def unsuspend(self):
430464
assert(self.suspended())
431465
self.ready_func = lambda: True
432466
self.task.inst.store.waiting.append(self)
@@ -435,8 +469,7 @@ def resume(self, resume_arg):
435469
assert(not self.running())
436470
if self.waiting():
437471
assert(cancelled(resume_arg) or self.ready())
438-
self.ready_func = None
439-
self.task.inst.store.waiting.remove(self)
472+
self.stop_waiting()
440473
assert(self.cancellable or not cancelled(resume_arg))
441474
assert(self.resume_arg is None)
442475
self.resume_arg = resume_arg
@@ -451,7 +484,10 @@ def suspend(self, cancellable) -> ResumeArg:
451484
assert(self.running() and self.task.may_block())
452485
assert(not self.cancellable and self.resume_arg is None)
453486
self.cancellable = cancellable
454-
self.parent_lock.release()
487+
if self.task.is_sync_before_return():
488+
self.release_some_ready_sibling()
489+
else:
490+
self.parent_lock.release()
455491
self.fiber_lock.acquire()
456492
assert(self.running())
457493
self.cancellable = False
@@ -479,16 +515,11 @@ def yield_until(self, ready_func, cancellable) -> ResumeArg:
479515
def yield_(self, cancellable) -> ResumeArg:
480516
return self.yield_until(lambda: True, cancellable)
481517

482-
def switch_to(self, cancellable, other: Thread) -> ResumeArg:
518+
def suspend_to_suspended(self, cancellable, other: Thread) -> ResumeArg:
483519
assert(self.running() and other.suspended())
484-
assert(not self.cancellable and self.resume_arg is None and other.resume_arg is None)
520+
assert(not self.cancellable and self.resume_arg is None)
485521
self.cancellable = cancellable
486-
other.resume_arg = ResumeArg.NOT_CANCELLED
487-
assert(self.parent_lock and not other.parent_lock)
488-
other.parent_lock = self.parent_lock
489-
self.parent_lock = None
490-
assert(not self.running() and other.running())
491-
other.fiber_lock.release()
522+
self.release_sibling(other)
492523
self.fiber_lock.acquire()
493524
assert(self.running())
494525
self.cancellable = False
@@ -498,11 +529,21 @@ def switch_to(self, cancellable, other: Thread) -> ResumeArg:
498529
assert(cancellable or not cancelled(resume_arg))
499530
return resume_arg
500531

501-
def yield_to(self, cancellable, other: Thread) -> ResumeArg:
532+
def yield_to_suspended(self, cancellable, other: Thread) -> ResumeArg:
502533
assert(self.running() and other.suspended())
503534
self.ready_func = lambda: True
504535
self.task.inst.store.waiting.append(self)
505-
return self.switch_to(cancellable, other)
536+
return self.suspend_to_suspended(cancellable, other)
537+
538+
def suspend_to_ready(self, cancellable, other: Thread) -> ResumeArg:
539+
assert(self.running() and other.ready())
540+
other.stop_waiting()
541+
return self.suspend_to_suspended(cancellable, other)
542+
543+
def yield_to_ready(self, cancellable, other: Thread) -> ResumeArg:
544+
assert(self.running() and other.ready())
545+
other.stop_waiting()
546+
return self.yield_to_suspended(cancellable, other)
506547

507548
#### Waitable State
508549

@@ -626,8 +667,11 @@ def thread_stop(self, thread):
626667
def needs_exclusive(self):
627668
return not self.opts.async_ or self.opts.callback
628669

670+
def is_sync_before_return(self):
671+
return not self.ft.async_ and self.state != Task.State.RESOLVED
672+
629673
def may_block(self):
630-
return self.ft.async_ or self.state == Task.State.RESOLVED
674+
return not self.is_sync_before_return() or bool(Thread.ready_list(self.inst))
631675

632676
def enter(self, thread):
633677
assert(thread in self.threads and thread.task is self)
@@ -2515,15 +2559,22 @@ def thread_func(thread):
25152559
new_thread.index = thread.task.inst.threads.add(new_thread)
25162560
return [new_thread.index]
25172561

2518-
### 🧵 `canon thread.resume-later`
2562+
### 🧵 `canon thread.unsuspend`
25192563

2520-
def canon_thread_resume_later(thread, i):
2564+
def canon_thread_unsuspend(thread, i):
25212565
trap_if(not thread.task.inst.may_leave)
25222566
other_thread = thread.task.inst.threads.get(i)
25232567
trap_if(not other_thread.suspended())
2524-
other_thread.resume_later()
2568+
other_thread.unsuspend()
25252569
return []
25262570

2571+
### 🧵 `canon thread.exit`
2572+
2573+
def canon_thread_exit(thread):
2574+
trap_if(not thread.task.inst.may_leave)
2575+
thread.exit()
2576+
assert(False)
2577+
25272578
### 🧵 `canon thread.suspend`
25282579

25292580
def canon_thread_suspend(cancellable, thread):
@@ -2545,28 +2596,55 @@ def canon_thread_yield(cancellable, thread):
25452596
resume_arg = thread.yield_(cancellable)
25462597
return [resume_arg]
25472598

2548-
### 🧵 `canon thread.switch-to`
2599+
### 🧵 `canon thread.suspend-to-suspended`
25492600

2550-
def canon_thread_switch_to(cancellable, thread, i):
2601+
def canon_thread_suspend_to_suspended(cancellable, thread, i):
25512602
trap_if(not thread.task.inst.may_leave)
25522603
other_thread = thread.task.inst.threads.get(i)
25532604
trap_if(not other_thread.suspended())
25542605
if thread.task.deliver_pending_cancel(cancellable):
25552606
resume_arg = ResumeArg.CANCELLED
25562607
else:
2557-
resume_arg = thread.switch_to(cancellable, other_thread)
2608+
resume_arg = thread.suspend_to_suspended(cancellable, other_thread)
25582609
return [resume_arg]
25592610

2560-
### 🧵 `canon thread.yield-to`
2611+
### 🧵 `canon thread.yield-to-suspended`
25612612

2562-
def canon_thread_yield_to(cancellable, thread, i):
2613+
def canon_thread_yield_to_suspended(cancellable, thread, i):
25632614
trap_if(not thread.task.inst.may_leave)
25642615
other_thread = thread.task.inst.threads.get(i)
25652616
trap_if(not other_thread.suspended())
25662617
if thread.task.deliver_pending_cancel(cancellable):
25672618
resume_arg = ResumeArg.CANCELLED
25682619
else:
2569-
resume_arg = thread.yield_to(cancellable, other_thread)
2620+
resume_arg = thread.yield_to_suspended(cancellable, other_thread)
2621+
return [resume_arg]
2622+
2623+
### 🧵 `canon thread.suspend-then-promote`
2624+
2625+
def canon_thread_suspend_then_promote(cancellable, thread, i):
2626+
trap_if(not thread.task.inst.may_leave)
2627+
trap_if(not thread.task.may_block())
2628+
other_thread = thread.task.inst.threads.get(i)
2629+
if thread.task.deliver_pending_cancel(cancellable):
2630+
resume_arg = ResumeArg.CANCELLED
2631+
elif other_thread.ready():
2632+
resume_arg = thread.suspend_to_ready(cancellable, other_thread)
2633+
else:
2634+
resume_arg = thread.suspend(cancellable)
2635+
return [resume_arg]
2636+
2637+
### 🧵 `canon thread.yield-then-promote`
2638+
2639+
def canon_thread_yield_then_promote(cancellable, thread, i):
2640+
trap_if(not thread.task.inst.may_leave)
2641+
other_thread = thread.task.inst.threads.get(i)
2642+
if thread.task.deliver_pending_cancel(cancellable):
2643+
resume_arg = ResumeArg.CANCELLED
2644+
elif other_thread.ready():
2645+
resume_arg = thread.yield_to_ready(cancellable, other_thread)
2646+
else:
2647+
resume_arg = thread.yield_(cancellable)
25702648
return [resume_arg]
25712649

25722650
### 📝 `canon error-context.new`

0 commit comments

Comments
 (0)