Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f4e560d
Add GetAwaiter to YieldInstruction and StreamYieldInstruction
MaxHeimbrock May 21, 2026
5c4e1be
Make Stage 1 awaiter test deterministic instead of FFI-flaky
MaxHeimbrock Jun 9, 2026
503c4f5
Add optional UniTask surface behind a version-define-gated asmdef
MaxHeimbrock May 21, 2026
4f9d97f
Add IUniTaskAsyncEnumerable adapter for incremental byte/text streams
MaxHeimbrock Jun 9, 2026
632297b
Document async/await + optional UniTask integration in the README
MaxHeimbrock Jun 9, 2026
6c3dcea
Docs: check IsError after UniTask.WhenAll in the parallel example
MaxHeimbrock Jun 18, 2026
90fbb8e
Make the await/UniTask path throw on failure
MaxHeimbrock Jun 18, 2026
bb57212
Install UniTask in the Meet test project so CI exercises it
MaxHeimbrock Jun 18, 2026
78ad5c2
Revert "Make the await/UniTask path throw on failure"
MaxHeimbrock Jun 18, 2026
4149d02
Resume awaiter continuations on the Unity main thread
MaxHeimbrock Jun 18, 2026
c3af3ba
Implement ICriticalNotifyCompletion on the awaiters
MaxHeimbrock Jun 18, 2026
9cbcb21
Dispose AsUniTask cancellation registration on the cancel path
MaxHeimbrock Jun 18, 2026
8164681
Docs: clarify the await vs await-foreach error model
MaxHeimbrock Jun 18, 2026
a6bff73
Remove implementation-process references from code comments
MaxHeimbrock Jun 18, 2026
8f6d0fa
Fix stream-error swallow on inline completion; narrow LatestChunk
MaxHeimbrock Jun 18, 2026
4949383
Align UniTask asmdef rootNamespace with declared namespaces
MaxHeimbrock Jun 18, 2026
c515702
Pin UniTask to 2.5.11 in the Meet sample
MaxHeimbrock Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,70 @@ Debug.Log("Connected to " + room.Name);



## Asynchronous programming: coroutines, async/await, and UniTask

The SDK exposes three interchangeable styles for awaiting asynchronous operations. Coroutines, async/await and UniTask.

**1. Coroutines (default, no dependency)** — shown throughout this README.

**2. async/await (no dependency)** — every operation returns an awaitable instruction (`ConnectInstruction`, `PublishTrackInstruction`, `PerformRpcInstruction`, the stream read instructions, …), so you can `await` it directly. As with coroutines, you inspect success/failure on the instruction (`IsError`) — `await` does not throw. Continuations resume on Unity's main thread.

```cs
async void Start()
{
var room = new Room();
var connect = room.Connect("ws://localhost:7880", "<join-token>", new RoomOptions());
await connect;
if (!connect.IsError)
Debug.Log("Connected to " + room.Name);
}
```

> Use `async void` only for top-level event handlers (e.g. button callbacks); its exceptions surface to Unity's log rather than to a caller. Prefer `async Task`/`async UniTaskVoid` elsewhere.

**3. UniTask (optional)** — install [UniTask](https://github.com/Cysharp/UniTask) (`com.cysharp.unitask`). The SDK auto-detects it via the `LIVEKIT_UNITASK` scripting define and enables the `LiveKit.UniTask` assembly, which adds `CancellationToken` support, composition, and async streams.

Cancellation (abandon-awaiter semantics — the underlying request is not cancelled on the wire):

```cs
await room.Connect("ws://localhost:7880", "<join-token>", new RoomOptions())
.AsUniTask(cancellationToken);
```

Run operations in parallel. `AsUniTask` does not throw on failure (matching the
coroutine path), so keep the instructions and check `IsError` on each after the
`await` — otherwise a failed operation passes silently:

```cs
var publishCamera = room.LocalParticipant.PublishTrack(cameraTrack, cameraOptions);
var publishMicrophone = room.LocalParticipant.PublishTrack(microphoneTrack, microphoneOptions);

await UniTask.WhenAll(publishCamera.AsUniTask(ct), publishMicrophone.AsUniTask(ct));

if (publishCamera.IsError || publishMicrophone.IsError)
Debug.LogError("Failed to publish one or more tracks");
```

Consume an incremental stream with `await foreach`. The sequence ends at end-of-stream; if the stream ends with an error it throws a `StreamError`:

```cs
try
{
await foreach (var chunk in reader.ReadIncremental().AsAsyncEnumerable(ct))
Process(chunk);
}
catch (StreamError e)
{
Debug.LogError(e.Message);
}
```

> Error-handling differs by API: awaiting an instruction (and `AsUniTask`) never throws on a
> failed operation — you inspect `IsError` after the `await`. The stream enumerable is the
> exception: `await foreach` has no post-loop point to check `IsError`, so a mid-stream failure
> surfaces by throwing `StreamError`.


### Publishing microphone


Expand Down
2 changes: 2 additions & 0 deletions Runtime/Scripts/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@

[assembly: InternalsVisibleTo("EditModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests.UniTask")]
[assembly: InternalsVisibleTo("LiveKit.UniTask")]
15 changes: 13 additions & 2 deletions Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
/// </summary>
public bool IsError => Error != null;

protected TContent LatestChunk
/// <summary>
/// The chunk from the most recent completed read. Throws the captured
/// <see cref="StreamError"/> if the last read errored. Internal so the optional
/// UniTask async-enumerable adapter (which has InternalsVisibleTo access) can read
/// it generically; the typed <c>Bytes</c>/<c>Text</c> accessors on the concrete
/// readers delegate here.
/// </summary>
internal TContent LatestChunk
{
get
{
Expand Down Expand Up @@ -153,11 +160,15 @@ protected void OnEos(Proto.StreamError protoError)
{
lock (_gate)
{
IsEos = true;
// Assign Error before flipping IsEos. The IsEos setter fires the awaiter
// continuation, which inspects IsError/Error on resume; when completion runs
// inline on the main thread, setting IsEos first would let the continuation
// observe IsError == false and silently swallow the stream error.
if (protoError != null)
{
Error = new StreamError(protoError);
}
IsEos = true;
}
}
}
Expand Down
179 changes: 176 additions & 3 deletions Runtime/Scripts/Internal/YieldInstruction.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using LiveKit.Internal;
using UnityEngine;

namespace LiveKit
{
// Resumes awaiter continuations on Unity's main thread. Completion may be signalled on the
// FFI callback thread (operations registered dispatchToMainThread:false, and data-stream
// chunk events), but a custom awaiter otherwise resumes inline on the completing thread —
// leaving callers unable to touch Unity APIs after an await. Posting through the captured
// main-thread SynchronizationContext keeps the await path's threading identical to the
// coroutine path. When already on the main thread (e.g. Connect, which completes there) the
// continuation runs inline to avoid an extra frame of latency.
internal static class AwaiterScheduler
{
internal static void Resume(Action continuation)
{
var context = FfiClient.Instance._context;
if (context == null || SynchronizationContext.Current == context)
continuation();
else
context.Post(static state => ((Action)state)(), continuation);
}
}

public class YieldInstruction : CustomYieldInstruction
{
// Backing fields are volatile because completion may run on the FFI callback
Expand All @@ -13,10 +35,84 @@ public class YieldInstruction : CustomYieldInstruction
private volatile bool _isDone;
private volatile bool _isError;

public bool IsDone { get => _isDone; protected set => _isDone = value; }
// Sentinel published once completion has fired so any continuation registered
// afterwards runs inline instead of being silently dropped.
private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

public bool IsDone
{
get => _isDone;
protected set
{
_isDone = value;
if (value) InvokeContinuation();
}
}
public bool IsError { get => _isError; protected set => _isError = value; }

public override bool keepWaiting => !_isDone;

/// <summary>
/// Returns an awaiter so callers can <c>await</c> this instruction directly.
/// </summary>
/// <remarks>
/// The awaiter completes when <see cref="IsDone"/> becomes true. As with the
/// coroutine path, success vs. failure is inspected on the instruction itself
/// (<see cref="IsError"/> and any subclass-specific result fields); <c>GetResult</c>
/// does not throw.
/// </remarks>
public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
// Race between completion-side (FFI thread writes sentinel) and await-side
// (registers continuation): CompareExchange decides who wrote first.
// null -> we won, completion will invoke our continuation later
// sentinel -> completion already fired; invoke inline
// other -> a second awaiter beat us here, which we don't support
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
AwaiterScheduler.Resume(continuation);
return;
}
throw new InvalidOperationException(
"YieldInstruction does not support multiple awaiters; await it only once.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
AwaiterScheduler.Resume(prev);
}
}
}

public readonly struct YieldInstructionAwaiter : ICriticalNotifyCompletion
{
private readonly YieldInstruction _instruction;

internal YieldInstructionAwaiter(YieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsDone;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

// ICriticalNotifyCompletion lets the async state machine skip ExecutionContext capture
// on the hot path. We don't depend on the flowed context (AwaiterScheduler marshals to
// the main thread on its own), so this is safe and avoids a per-await allocation.
public void UnsafeOnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

// Intentionally a no-op. Parity with the coroutine path: callers inspect IsError
// and subclass-specific result fields on the instruction itself.
public void GetResult() { }
}

public class StreamYieldInstruction : CustomYieldInstruction
Expand All @@ -28,12 +124,37 @@ public class StreamYieldInstruction : CustomYieldInstruction
private volatile bool _isEos;
private volatile bool _isCurrentReadDone;

private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

/// <summary>
/// True if the stream has reached the end.
/// </summary>
public bool IsEos { get => _isEos; protected set => _isEos = value; }
public bool IsEos
{
get => _isEos;
protected set
{
_isEos = value;
if (value) InvokeContinuation();
}
}

internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; }
/// <summary>
/// True once a chunk is ready for the current read (before <see cref="Reset"/> is
/// called for the next one). Public getter mirrors the sibling
/// <c>DataTrack.ReadFrameInstruction.IsCurrentReadDone</c>; the setter stays internal
/// because only the SDK's stream readers advance this state.
/// </summary>
public bool IsCurrentReadDone
{
get => _isCurrentReadDone;
internal set
{
_isCurrentReadDone = value;
if (value) InvokeContinuation();
}
}

public override bool keepWaiting => !_isCurrentReadDone && !_isEos;

Expand All @@ -50,6 +171,58 @@ public override void Reset()
throw new InvalidOperationException("Cannot reset after end of stream");
}
_isCurrentReadDone = false;
// Drop the sentinel published by the previous completion so the next awaiter
// can install a fresh continuation. Safe because Reset is only called after the
// previous read's await has already resumed.
Volatile.Write(ref _continuation, null);
}

/// <summary>
/// Returns an awaiter that completes when the next chunk is ready or the stream ends.
/// Call <see cref="Reset"/> between iterations to await the following chunk.
/// </summary>
public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
AwaiterScheduler.Resume(continuation);
return;
}
throw new InvalidOperationException(
"StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
AwaiterScheduler.Resume(prev);
}
}
}

public readonly struct StreamYieldInstructionAwaiter : ICriticalNotifyCompletion
{
private readonly StreamYieldInstruction _instruction;

internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

// See YieldInstructionAwaiter.UnsafeOnCompleted — skips ExecutionContext capture; the
// continuation is marshalled to the main thread by AwaiterScheduler regardless.
public void UnsafeOnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

public void GetResult() { }
}
}
8 changes: 8 additions & 0 deletions Runtime/Scripts/UniTask.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading