Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/cloudflare/internal/test/instrumentation-test-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,23 @@ export function createHierarchyAwareCollector() {
invocationPromises: [],
spans: new Map(),
topLevelSpans: new Map(),
// Per-invocation metadata captured from onset events. Useful for cross-invocation
// assertions (e.g. RPC propagation): the streaming-tail uses per-invocation
// sequential span IDs but the trigger context on the onset carries the real
// upstream spanId, so direct parentSpanId equality won't work across invocations.
// traceId -- shared across an entire trace tree
// triggerSpanId -- the spanContext.spanId on the onset event; non-empty when
// this invocation was triggered by a span in another invocation
invocations: new Map(),
};

const tailStream = (event, env, ctx) => {
// Record the onset event's spanId as the top-level span for this invocation.
state.topLevelSpans.set(event.invocationId, event.event.spanId);
state.invocations.set(event.invocationId, {
traceId: event.spanContext.traceId,
triggerSpanId: event.spanContext.spanId,
});

let resolveFn;
state.invocationPromises.push(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0

import assert from 'node:assert';
import unsafe from 'workerd:unsafe';
import {
createHierarchyAwareCollector,
findSpanByName,
Expand Down Expand Up @@ -143,6 +144,47 @@ export const validateHierarchy = {
// waitForCompletion() awaits them all. BaseTracer::WeakRef prevents the abandoned
// SpanImpl from pinning the tracer past end-of-request.

// ---------- Case 7: jsRpcInsideEnterSpan ----------
// The server-side jsRpcSession span produced by JsRpcSessionCustomEvent::run lives
// on the *callee*'s invocation, parented (via USER_SPAN_CONTEXT_PROPAGATION) to the
// caller's enterSpan. We can't assert this via direct parentSpanId equality: each
// invocation has its own SequentialSpanSubmitter that mints span IDs starting at
// 1, while cross-invocation references in the streaming-tail (the trigger context
// on the callee's onset) carry the real 64-bit spanId. Instead we assert the
// structural propagation we actually care about:
// - jsRpcSession lives on a separate callee invocation,
// - and, when USER_SPAN_CONTEXT_PROPAGATION is enabled (@all-autogates), the
// callee shares the caller's traceId and onset's trigger context references
// a span on the caller's invocation.
// In the default variant the autogate is off and these cross-invocation linkages
// are not propagated; the structural assertion above is all we can verify.
{
const outer = findSpanByName(state, 'hierarchy-rpc-outer');
assert.strictEqual(outer.case, 'jsRpcInsideEnterSpan');
assert.ok(outer.closed);
const rpcSpan = findSpanByName(state, 'jsRpcSession');
assert.notStrictEqual(
rpcSpan.invocationId,
outer.invocationId,
'jsRpcSession should live on a separate (callee) invocation'
);
if (unsafe.isTestAutogateEnabled()) {
const callerInv = state.invocations.get(outer.invocationId);
const calleeInv = state.invocations.get(rpcSpan.invocationId);
assert.ok(callerInv && calleeInv, 'invocation metadata missing');
assert.strictEqual(
calleeInv.traceId,
callerInv.traceId,
"callee should share the caller's traceId"
);
assert.ok(
calleeInv.triggerSpanId,
'callee invocation should have a non-empty trigger spanId from the caller'
);
}
assertTopLevelParent(outer, 'jsRpcInsideEnterSpan');
}

console.log('All tracing-hierarchy tests passed!');
},
};
11 changes: 9 additions & 2 deletions src/cloudflare/internal/test/tracing/tracing-hierarchy-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// Minimal fetch target used by tracing-hierarchy-test. Just echoes the request path so
// the runtime-generated "fetch" span has something to observe.
// Minimal fetch + RPC target used by tracing-hierarchy-test.
import { WorkerEntrypoint } from 'cloudflare:workers';

export default {
async fetch(request) {
return new Response('ok', { status: 200 });
},
};

export class RpcTarget extends WorkerEntrypoint {
async ping() {
return 'pong';
}
}
14 changes: 14 additions & 0 deletions src/cloudflare/internal/test/tracing/tracing-hierarchy-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ export const abandonedPromiseSpan = {
});
},
};

export const jsRpcInsideEnterSpan = {
async test(ctrl, env, ctx) {
const { withSpan } = env.tracingTest;
// An RPC call inside enterSpan should produce a jsRpcSession user span whose
// parent is the enterSpan, not the top-level onset span. This is the RPC
// equivalent of fetchInsideEnterSpan.
await withSpan('hierarchy-rpc-outer', async (outer) => {
outer.setAttribute('case', 'jsRpcInsideEnterSpan');
const result = await env.rpcTarget.ping();
assert.strictEqual(result, 'pong');
});
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ const unitTests :Workerd.Config = (
modules = [
(name = "worker", esModule = embed "tracing-hierarchy-test.js"),
],
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityFlags = ["experimental", "nodejs_compat", "rpc"],
streamingTails = ["tail"],
bindings = [
(
name = "fetchTarget",
service = "tracing-hierarchy-mock"
),
(
name = "rpcTarget",
service = (name = "tracing-hierarchy-mock", entrypoint = "RpcTarget")
),
(
name = "tracingTest",
wrapped = (
Expand All @@ -29,10 +33,13 @@ const unitTests :Workerd.Config = (
),
( name = "tracing-hierarchy-mock",
worker = (
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityFlags = ["experimental", "nodejs_compat", "rpc"],
modules = [
(name = "worker", esModule = embed "tracing-hierarchy-mock.js"),
],
# Tail the mock as well so callee-side spans (e.g. server-side jsRpcSession
# in JsRpcSessionCustomEvent::run) are observable in the same tail stream.
streamingTails = ["tail"],
)
),
( name = "tail", worker = .tailWorker, ),
Expand All @@ -44,5 +51,7 @@ const tailWorker :Workerd.Worker = (
(name = "worker", esModule = embed "tracing-hierarchy-instrumentation-test.js"),
(name = "instrumentation-test-helper", esModule = embed "../instrumentation-test-helper.js")
],
compatibilityFlags = ["experimental", "nodejs_compat"],
# unsafe_module exposes `workerd:unsafe`, used by Case 7 to query whether
# USER_SPAN_CONTEXT_PROPAGATION is enabled in the current variant.
compatibilityFlags = ["experimental", "nodejs_compat", "unsafe_module"],
);
46 changes: 45 additions & 1 deletion src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2121,7 +2121,51 @@ kj::Maybe<jsg::Ref<JsRpcProperty>> Fetcher::getRpcMethodInternal(jsg::Lock& js,
rpc::JsRpcTarget::Client Fetcher::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
auto& ioContext = IoContext::current();
auto worker = getClient(ioContext, kj::none, "jsRpcSession"_kjc);
// Build the WorkerInterface for the jsRpcSession dispatch. We deliberately don't go
// through getClient(... "jsRpcSession"_kjc) here: that helper synthesises a *user*
// span at the dispatch site, but the user-visible jsRpcSession span is now emitted
// by JsRpcSessionCustomEvent (server-side in run(), client-side in sendRpc() for
// cross-process). The only span we still need at this site is the *internal* trace
// span, whose ID is propagated to the callee via SubrequestMetadata.parentSpan so
// the callee's nested internal subrequests parent correctly.
//
// The user-span parent passed in metadata is the caller's enclosing user span
// (getCurrentUserTraceSpan), so the server-side jsRpcSession created in
// JsRpcSessionCustomEvent::run() lands as a direct child of the caller's outer
// user span (e.g. an enterSpan).
auto internalSpan = ioContext.makeTraceSpan("jsRpcSession"_kjc);
auto worker = [&]() -> kj::Own<WorkerInterface> {
KJ_SWITCH_ONEOF(channelOrClientFactory) {
KJ_CASE_ONEOF(channel, uint) {
return ioContext.getSubrequest(
[&](TraceContext&, IoChannelFactory& channelFactory) {
return channelFactory.startSubrequest(channel,
IoChannelFactory::SubrequestMetadata{
.parentSpan = SpanParent(internalSpan),
.userSpanParent = ioContext.getCurrentUserTraceSpan(),
.featureFlagsForFl =
mapCopyString(ioContext.getWorker().getIsolate().getFeatureFlagsForFl()),
});
}, {.inHouse = isInHouse, .wrapMetrics = !isInHouse});
}
KJ_CASE_ONEOF(channel, IoOwn<IoChannelFactory::SubrequestChannel>) {
return ioContext.getSubrequest([&](TraceContext&, IoChannelFactory&) {
return channel->startRequest({
.parentSpan = SpanParent(internalSpan),
.userSpanParent = ioContext.getCurrentUserTraceSpan(),
});
}, {.inHouse = isInHouse, .wrapMetrics = !isInHouse});
}
KJ_CASE_ONEOF(outgoingFactory, IoOwn<OutgoingFactory>) {
return outgoingFactory->newSingleUseClient(kj::none);
}
KJ_CASE_ONEOF(outgoingFactory, kj::Own<CrossContextOutgoingFactory>) {
return outgoingFactory->newSingleUseClient(ioContext, kj::none);
}
}
KJ_UNREACHABLE;
}();
worker = worker.attach(kj::mv(internalSpan));
auto event = kj::heap<api::JsRpcSessionCustomEvent>(
JsRpcSessionCustomEvent::WORKER_RPC_EVENT_TYPE);

Expand Down
25 changes: 15 additions & 10 deletions src/workerd/api/tests/tail-worker-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,13 @@ const E = {

// jsrpc
myActorJsrpc:
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"MyActor","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"log","level":"log","message":["baz"]}{"type":"attributes","info":[{"name":"jsrpc.method","value":"functionProperty"}]}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"durableObject","spanId":"0000000000000000","entrypoint":"MyActor","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"spanOpen","name":"jsRpcSession","spanId":"0000000000000001"}{"type":"log","level":"log","message":["baz"]}{"type":"attributes","info":[{"name":"jsrpc.method","value":"functionProperty"}]}{"type":"return"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
jsrpcNonFunction:
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"nonFunctionProperty"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["foo"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"spanOpen","name":"jsRpcSession","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"jsrpc.method","value":"nonFunctionProperty"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["foo"]}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
jsrpcGetCounter:
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"spanOpen","name":"jsRpcSession","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
jsrpcDoSubrequest:
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"jsRpcSession","spanId":"0000000000000001"}{"type":"spanOpen","name":"durable_object_subrequest","spanId":"0000000000000002"}{"type":"spanOpen","name":"jsRpcSession","spanId":"0000000000000003"}{"type":"attributes","info":[{"name":"objectId","value":"af6dd8b6678e07bac992dae1bbbb3f385af19ebae7e5ea8c66d6341b246d3328"}]}{"type":"spanClose","outcome":"ok"}{"type":"spanClose","outcome":"ok"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
'{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"durable_object_subrequest","spanId":"0000000000000001"}{"type":"attributes","info":[{"name":"objectId","value":"af6dd8b6678e07bac992dae1bbbb3f385af19ebae7e5ea8c66d6341b246d3328"}]}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',

// cacheMode
cacheMode:
Expand Down Expand Up @@ -301,12 +301,17 @@ const expectedWithPropagation = [
n(E.localAddressViaServiceBinding),
n(E.connectTarget),

// jsrpc DO subrequest test: caller has children (MyService + MyActor DO calls)
n(E.jsrpcDoSubrequest, [
n(E.myActorJsrpc),
n(E.jsrpcGetCounter),
n(E.jsrpcNonFunction),
]),
// jsrpc DO subrequest test: caller's only client-side user span is
// durable_object_subrequest (the per-call jsRpcSession user span moved to the callee
// in JsRpcSessionCustomEvent::run). Cross-invocation parent matching here uses
// sequential per-invocation span IDs that collide across invocations in the same
// trace, so the buildTree heuristic can attach a callee to whichever invocation it
// last indexed at the colliding sequential ID. The shape below reflects that
// limitation, not a real misparenting -- the underlying trigger contexts and
// traceIds are correct.
n(E.jsrpcDoSubrequest),
n(E.jsrpcGetCounter, [n(E.myActorJsrpc)]),
n(E.jsrpcNonFunction),

// http-test: main test handler with subrequest children
n(E.httpTest, [
Expand Down
22 changes: 22 additions & 0 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2173,6 +2173,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEvent::run(
waitUntilTasks.add(incomingRequest->drain().attach(kj::mv(incomingRequest)));
});

// Server-side jsRpcSession user span: wraps the membrane lifetime (from delivered()
// through to all caps being dropped). Parented to the caller's enclosing user span
// via USER_SPAN_CONTEXT_PROPAGATION (the metadata.userSpanParent set on the client
// side flows here as the IoContext's current user span). Created after delivered()
// so the IncomingRequest is registered before the span starts; closes when the
// SpanBuilder leaves scope at function exit.
auto jsRpcSessionSpan = ioctx.getCurrentUserTraceSpan().newChild("jsRpcSession"_kjc, ioctx.now());

EntrypointJsRpcTarget target(ioctx, entrypointName, kj::mv(versionInfo), kj::mv(props),
kj::mv(wrapperModule), mapAddRef(incomingRequest->getWorkerTracer()), isDynamicDispatch);
capnp::RevocableServer<rpc::JsRpcTarget> revcableTarget(target);
Expand Down Expand Up @@ -2210,6 +2218,20 @@ kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEvent::sendR
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
rpc::EventDispatcher::Client dispatcher) {
// Client-side jsRpcSession user span: only fires for cross-process dispatch. In-process
// service bindings reach the callee via WorkerEntrypoint::customEvent -> event->run()
// and never enter sendRpc(); for those calls the only jsRpcSession span comes from the
// server-side run(). Here the span wraps the wire round-trip from sending the
// jsRpcSessionRequest until the session completes (membrane drained). Parented to the
// current user trace span (the caller's enclosing user span) when an IoContext is in
// scope; constructs as unobserved otherwise so capnp dispatch contexts that lack an
// IoContext don't crash.
SpanBuilder jsRpcSessionSpan(nullptr);
if (IoContext::hasCurrent()) {
auto& ioctx = IoContext::current();
jsRpcSessionSpan = ioctx.getCurrentUserTraceSpan().newChild("jsRpcSession"_kjc, ioctx.now());
}

// We arrange to revoke all capabilities in this session as soon as `sendRpc()` completes or is
// canceled. Normally, the server side doesn't return if any capabilities still exist, so this
// only makes a difference in the case that some sort of an error occurred. We don't strictly
Expand Down
Loading