Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ export const validateHierarchy = {
// waitForCompletion() awaits them all. BaseTracer::WeakRef prevents the abandoned
// SpanImpl from pinning the tracer past end-of-request.

// ---------- Case 7: jsRpcInsideEnterSpan ----------
// The jsRpcSession span created by an RPC binding call must be a child of the
// enterSpan it was called from, not the top-level onset span. This is the RPC
// equivalent of case 3 (fetchInsideEnterSpan).
{
const outer = findSpanByName(state, 'hierarchy-rpc-outer');
assert.strictEqual(outer.case, 'jsRpcInsideEnterSpan');
assert.ok(outer.closed);
const rpcSpan = findSpanByName(
state,
'jsRpcSession',
(s) => s.invocationId === outer.invocationId
);
assertParent(rpcSpan, outer, 'jsRpcInsideEnterSpan');
assertTopLevelParent(outer, 'jsRpcInsideEnterSpan');
}

console.log('All tracing-hierarchy tests passed!');
},
};
11 changes: 9 additions & 2 deletions src/cloudflare/internal/test/tracing/tracing-hierarchy-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

// Minimal fetch target used by tracing-hierarchy-test. Just echoes the request path so
// the runtime-generated "fetch" span has something to observe.
// Minimal fetch + RPC target used by tracing-hierarchy-test.
import { WorkerEntrypoint } from 'cloudflare:workers';

export default {
async fetch(request) {
return new Response('ok', { status: 200 });
},
};

export class RpcTarget extends WorkerEntrypoint {
async ping() {
return 'pong';
}
}
14 changes: 14 additions & 0 deletions src/cloudflare/internal/test/tracing/tracing-hierarchy-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ export const abandonedPromiseSpan = {
});
},
};

export const jsRpcInsideEnterSpan = {
async test(ctrl, env, ctx) {
const { withSpan } = env.tracingTest;
// An RPC call inside enterSpan should produce a jsRpcSession user span whose
// parent is the enterSpan, not the top-level onset span. This is the RPC
// equivalent of fetchInsideEnterSpan.
await withSpan('hierarchy-rpc-outer', async (outer) => {
outer.setAttribute('case', 'jsRpcInsideEnterSpan');
const result = await env.rpcTarget.ping();
assert.strictEqual(result, 'pong');
});
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ const unitTests :Workerd.Config = (
modules = [
(name = "worker", esModule = embed "tracing-hierarchy-test.js"),
],
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityFlags = ["experimental", "nodejs_compat", "rpc"],
streamingTails = ["tail"],
bindings = [
(
name = "fetchTarget",
service = "tracing-hierarchy-mock"
),
(
name = "rpcTarget",
service = (name = "tracing-hierarchy-mock", entrypoint = "RpcTarget")
),
(
name = "tracingTest",
wrapped = (
Expand All @@ -29,7 +33,7 @@ const unitTests :Workerd.Config = (
),
( name = "tracing-hierarchy-mock",
worker = (
compatibilityFlags = ["experimental", "nodejs_compat"],
compatibilityFlags = ["experimental", "nodejs_compat", "rpc"],
modules = [
(name = "worker", esModule = embed "tracing-hierarchy-mock.js"),
],
Expand Down
49 changes: 47 additions & 2 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2121,9 +2121,9 @@ kj::Maybe<jsg::Ref<JsRpcProperty>> Fetcher::getRpcMethodInternal(jsg::Lock& js,
rpc::JsRpcTarget::Client Fetcher::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
auto& ioContext = IoContext::current();
auto worker = getClient(ioContext, kj::none, "jsRpcSession"_kjc);
auto [worker, sessionSpan] = getJsRpcClient(ioContext);
auto event = kj::heap<api::JsRpcSessionCustomEvent>(
JsRpcSessionCustomEvent::WORKER_RPC_EVENT_TYPE);
JsRpcSessionCustomEvent::WORKER_RPC_EVENT_TYPE, kj::mv(sessionSpan));

auto result = event->getCap();

Expand Down Expand Up @@ -2407,6 +2407,51 @@ kj::Own<WorkerInterface> Fetcher::getClient(
return clientWithTracing.client.attach(kj::mv(clientWithTracing.traceContext));
}

Fetcher::JsRpcClient Fetcher::getJsRpcClient(IoContext& ioContext) {
// The jsRpcSession span is owned by JsRpcSessionCustomEvent and lives for the session.
// OutgoingFactory variants create their own outer span, so we skip jsRpcSession for them.
auto withSessionSpan = [&](auto startRequest) -> JsRpcClient {
auto sessionSpan = ioContext.getCurrentUserTraceSpan().newChild(
"jsRpcSession"_kjc, ioContext.now());
auto sessionSpanParent = SpanParent(sessionSpan);
auto worker = ioContext.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& channelFactory) {
return startRequest(channelFactory,
IoChannelFactory::SubrequestMetadata{
.parentSpan = tracing.getInternalSpanParent(),
.userSpanParent = sessionSpanParent.addRef(),
});
}, {.inHouse = isInHouse, .wrapMetrics = !isInHouse});
return {kj::mv(worker), kj::mv(sessionSpan)};
};

KJ_SWITCH_ONEOF(channelOrClientFactory) {
// Service binding (e.g. env.MyService) — create jsRpcSession span.
KJ_CASE_ONEOF(channel, uint) {
return withSessionSpan([&](IoChannelFactory& channelFactory,
IoChannelFactory::SubrequestMetadata metadata) {
return channelFactory.startSubrequest(channel, kj::mv(metadata));
});
}
// Direct in-process channel handle — create jsRpcSession span.
KJ_CASE_ONEOF(channel, IoOwn<IoChannelFactory::SubrequestChannel>) {
return withSessionSpan([&](IoChannelFactory&,
IoChannelFactory::SubrequestMetadata metadata) {
return channel->startRequest(kj::mv(metadata));
});
}
// DurableObject stub (env.MyActor.get(id)) — factory creates durable_object_subrequest, skip.
KJ_CASE_ONEOF(outgoingFactory, IoOwn<OutgoingFactory>) {
return {outgoingFactory->newSingleUseClient(kj::none), SpanBuilder(nullptr)};
}
// Cross-process actor — factory creates its own outer span, skip.
KJ_CASE_ONEOF(outgoingFactory, kj::Own<CrossContextOutgoingFactory>) {
return {outgoingFactory->newSingleUseClient(ioContext, kj::none), SpanBuilder(nullptr)};
}
}
KJ_UNREACHABLE;
}

Fetcher::ClientWithTracing Fetcher::getClientWithTracing(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName) {
KJ_SWITCH_ONEOF(channelOrClientFactory) {
Expand Down
12 changes: 12 additions & 0 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ class Fetcher: public JsRpcClientProvider {
kj::Own<WorkerInterface> getClient(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName);

// Worker interface plus the user span representing this jsRpc session, if any. The span is
// created for direct channel variants but not for OutgoingFactory variants (which create their
// own outer span). The span is intended to be transferred to JsRpcSessionCustomEvent.
struct JsRpcClient {
kj::Own<WorkerInterface> worker;
SpanBuilder sessionSpan;
};

// Get a worker interface for a jsRpc session call, along with the jsRpcSession span (if one
// should be created for this Fetcher variant).
JsRpcClient getJsRpcClient(IoContext& ioContext);

// Result of getClient call that includes optional trace context
struct ClientWithTracing {
kj::Own<WorkerInterface> client;
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/worker-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,14 @@ class RpcStubDisposalGroup {
class JsRpcSessionCustomEvent final: public WorkerInterface::CustomEvent {
public:
JsRpcSessionCustomEvent(uint16_t typeId,
SpanBuilder jsRpcSessionSpan = SpanBuilder(nullptr),
kj::Maybe<kj::String> wrapperModule = kj::none,
kj::PromiseFulfillerPair<rpc::JsRpcTarget::Client> paf =
kj::newPromiseAndFulfiller<rpc::JsRpcTarget::Client>())
: capFulfiller(kj::mv(paf.fulfiller)),
clientCap(kj::mv(paf.promise)),
typeId(typeId),
jsRpcSessionSpan(kj::mv(jsRpcSessionSpan)),
wrapperModule(kj::mv(wrapperModule)) {}

~JsRpcSessionCustomEvent() noexcept(false) {
Expand Down Expand Up @@ -529,6 +531,10 @@ class JsRpcSessionCustomEvent final: public WorkerInterface::CustomEvent {
// limited return type.
kj::Maybe<rpc::JsRpcTarget::Client> clientCap;
uint16_t typeId;
// Span representing this jsRpc session. Created before startRequest() so the callee can
// reference its ID for trace context propagation. Lives until this event is destroyed
// (i.e., until the session ends), which gives the correct span lifetime.
SpanBuilder jsRpcSessionSpan;

kj::Maybe<kj::String> wrapperModule;

Expand Down
Loading