diff --git a/crates/trusted-server-adapter-fastly/src/main.rs b/crates/trusted-server-adapter-fastly/src/main.rs index 72cae33e..a4566ad3 100644 --- a/crates/trusted-server-adapter-fastly/src/main.rs +++ b/crates/trusted-server-adapter-fastly/src/main.rs @@ -161,16 +161,20 @@ async fn route_request( } // tsjs endpoints - (Method::GET, "/first-party/proxy") => handle_first_party_proxy(settings, req).await, - (Method::GET, "/first-party/click") => handle_first_party_click(settings, req).await, + (Method::GET, "/first-party/proxy") => { + handle_first_party_proxy(settings, runtime_services, req).await + } + (Method::GET, "/first-party/click") => { + handle_first_party_click(settings, runtime_services, req).await + } (Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => { - handle_first_party_proxy_sign(settings, req).await + handle_first_party_proxy_sign(settings, runtime_services, req).await } (Method::POST, "/first-party/proxy-rebuild") => { - handle_first_party_proxy_rebuild(settings, req).await + handle_first_party_proxy_rebuild(settings, runtime_services, req).await } (m, path) if integration_registry.has_route(&m, path) => integration_registry - .handle_proxy(&m, path, settings, req) + .handle_proxy(&m, path, settings, runtime_services, req) .await .unwrap_or_else(|| { Err(Report::new(TrustedServerError::BadRequest { diff --git a/crates/trusted-server-adapter-fastly/src/platform.rs b/crates/trusted-server-adapter-fastly/src/platform.rs index 0b75f3ab..f696e058 100644 --- a/crates/trusted-server-adapter-fastly/src/platform.rs +++ b/crates/trusted-server-adapter-fastly/src/platform.rs @@ -187,45 +187,147 @@ impl PlatformBackend for FastlyPlatformBackend { } // --------------------------------------------------------------------------- -// FastlyPlatformHttpClient +// FastlyPlatformHttpClient — helpers // --------------------------------------------------------------------------- -/// Placeholder Fastly implementation of [`PlatformHttpClient`]. +/// Convert a platform-neutral [`edgezero_core::http::Request`] to a [`fastly::Request`]. +/// +/// Only buffered `Body::Once` bodies are supported on this path. /// -/// The Fastly-backed `send` / `send_async` / `select` behavior lands in a -/// follow-up PR once the orchestrator migration is complete. Until then all -/// methods return [`PlatformError::Unsupported`]. +/// # Errors /// -/// Implementation lands in #487 (PR 6: Backend + HTTP client traits). +/// Returns [`PlatformError::HttpClient`] when the request body is streaming. +fn edge_request_to_fastly( + request: edgezero_core::http::Request, +) -> Result> { + let (parts, body) = request.into_parts(); + let mut fastly_req = fastly::Request::new(parts.method, parts.uri.to_string()); + for (name, value) in parts.headers.iter() { + fastly_req.append_header(name.as_str(), value.as_bytes()); + } + match body { + edgezero_core::body::Body::Once(bytes) => { + if !bytes.is_empty() { + fastly_req.set_body(bytes.to_vec()); + } + } + edgezero_core::body::Body::Stream(_) => { + return Err(Report::new(PlatformError::HttpClient) + .attach("streaming request body is not supported by Fastly request conversion")); + } + } + Ok(fastly_req) +} + +/// Convert a [`fastly::Response`] to a [`PlatformResponse`] with the given backend name. +fn fastly_response_to_platform( + mut resp: fastly::Response, + backend_name: impl Into, +) -> Result> { + let status = resp.get_status(); + let mut builder = edgezero_core::http::response_builder().status(status); + for (name, value) in resp.get_headers() { + builder = builder.header(name.as_str(), value.as_bytes()); + } + let body_bytes = resp.take_body_bytes(); + let edge_response = builder + .body(edgezero_core::body::Body::from(body_bytes)) + .change_context(PlatformError::HttpClient)?; + Ok(PlatformResponse::new(edge_response).with_backend_name(backend_name)) +} + +// --------------------------------------------------------------------------- +// FastlyPlatformHttpClient +// --------------------------------------------------------------------------- + +/// Fastly implementation of [`PlatformHttpClient`]. +/// +/// - [`send`](PlatformHttpClient::send) — converts the platform request to a +/// `fastly::Request`, calls `.send()`, and wraps the response. +/// - [`send_async`](PlatformHttpClient::send_async) — same conversion but +/// calls `.send_async()` and wraps the `fastly::PendingRequest`. +/// - [`select`](PlatformHttpClient::select) — downcasts each +/// [`PlatformPendingRequest`] back to `fastly::PendingRequest` and calls +/// `fastly::http::request::select()`. pub struct FastlyPlatformHttpClient; #[async_trait::async_trait(?Send)] impl PlatformHttpClient for FastlyPlatformHttpClient { async fn send( &self, - _request: PlatformHttpRequest, + request: PlatformHttpRequest, ) -> Result> { - log::warn!("FastlyPlatformHttpClient::send called before #487 lands"); - Err(Report::new(PlatformError::Unsupported) - .attach("FastlyPlatformHttpClient::send is not yet implemented")) + let backend_name = request.backend_name.clone(); + let fastly_req = edge_request_to_fastly(request.request)?; + let fastly_resp = fastly_req + .send(&backend_name) + .change_context(PlatformError::HttpClient)?; + fastly_response_to_platform(fastly_resp, backend_name) } async fn send_async( &self, - _request: PlatformHttpRequest, + request: PlatformHttpRequest, ) -> Result> { - log::warn!("FastlyPlatformHttpClient::send_async called before #487 lands"); - Err(Report::new(PlatformError::Unsupported) - .attach("FastlyPlatformHttpClient::send_async is not yet implemented")) + let backend_name = request.backend_name.clone(); + let fastly_req = edge_request_to_fastly(request.request)?; + let pending = fastly_req + .send_async(&backend_name) + .change_context(PlatformError::HttpClient)?; + Ok(PlatformPendingRequest::new(pending).with_backend_name(backend_name)) } async fn select( &self, - _pending_requests: Vec, + pending_requests: Vec, ) -> Result> { - log::warn!("FastlyPlatformHttpClient::select called before #487 lands"); - Err(Report::new(PlatformError::Unsupported) - .attach("FastlyPlatformHttpClient::select is not yet implemented")) + use fastly::http::request::{select, PendingRequest}; + + if pending_requests.is_empty() { + return Err(Report::new(PlatformError::HttpClient) + .attach("select called with an empty pending_requests list")); + } + + let mut fastly_pending: Vec = Vec::with_capacity(pending_requests.len()); + + for platform_req in pending_requests { + let inner = platform_req.downcast::().map_err(|platform_req| { + let backend_name = platform_req.backend_name().unwrap_or(""); + Report::new(PlatformError::HttpClient).attach(format!( + "PlatformPendingRequest inner type is not fastly::PendingRequest for backend '{backend_name}'" + )) + })?; + fastly_pending.push(inner); + } + + let (result, remaining_fastly) = select(fastly_pending); + + // Fastly's select() does not preserve input order for remaining requests, + // so positional backend-name re-association is unreliable. Backend names + // are re-derived from get_backend_name() when each remaining request completes. + let remaining: Vec = remaining_fastly + .into_iter() + .map(PlatformPendingRequest::new) + .collect(); + + let ready = match result { + Ok(fastly_resp) => { + let backend_name = fastly_resp + .get_backend_name() + .unwrap_or_else(|| { + log::warn!("select: response has no backend name, correlation will fail"); + "" + }) + .to_string(); + fastly_response_to_platform(fastly_resp, backend_name) + } + Err(e) => { + Err(Report::new(PlatformError::HttpClient) + .attach(format!("fastly select error: {e}"))) + } + }; + + Ok(PlatformSelectResult { ready, remaining }) } } @@ -296,9 +398,12 @@ pub fn open_kv_store(store_name: &str) -> Result, KvErr #[cfg(test)] mod tests { + use std::io; use std::sync::Arc; use std::time::Duration; + use edgezero_core::body::Body; + use edgezero_core::http::request_builder; use edgezero_core::key_value_store::NoopKvStore; use super::*; @@ -417,4 +522,132 @@ mod tests { "should preserve client_ip through clone" ); } + + // --- FastlyPlatformHttpClient ------------------------------------------- + + #[test] + fn fastly_platform_http_client_send_returns_error_for_unregistered_backend() { + let client = FastlyPlatformHttpClient; + let request = request_builder() + .method("GET") + .uri("https://example.com/") + .body(Body::empty()) + .expect("should build test request"); + let err = futures::executor::block_on( + client.send(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should return error for unregistered backend"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_send_async_returns_error_for_unregistered_backend() { + let client = FastlyPlatformHttpClient; + let request = request_builder() + .method("GET") + .uri("https://example.com/") + .body(Body::empty()) + .expect("should build test request"); + let err = futures::executor::block_on( + client.send_async(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should return error for unregistered backend"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_select_returns_error_for_empty_list() { + let client = FastlyPlatformHttpClient; + let err = futures::executor::block_on(client.select(vec![])) + .expect_err("should return error for empty pending list"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + } + + #[test] + fn fastly_platform_http_client_select_returns_error_for_wrong_inner_type() { + let client = FastlyPlatformHttpClient; + // Wrap a non-PendingRequest type to trigger the downcast failure. + let wrong = PlatformPendingRequest::new(42u32).with_backend_name("origin-a"); + let err = futures::executor::block_on(client.select(vec![wrong])) + .expect_err("should return error for wrong inner type"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + assert!( + format!("{err:?}").contains("origin-a"), + "should include backend name in error report: {err:?}" + ); + } + + #[test] + fn fastly_platform_http_client_send_returns_error_for_streaming_body() { + let client = FastlyPlatformHttpClient; + let request = request_builder() + .method("POST") + .uri("https://example.com/") + .body(Body::from_stream(futures::stream::empty::< + Result<_, io::Error>, + >())) + .expect("should build streaming test request"); + + let err = futures::executor::block_on( + client.send(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should reject streaming request bodies before sending"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + assert!( + format!("{err:?}").contains("streaming request body"), + "should describe the unsupported streaming body: {err:?}" + ); + } + + #[test] + fn fastly_platform_http_client_send_async_returns_error_for_streaming_body() { + let client = FastlyPlatformHttpClient; + let request = request_builder() + .method("POST") + .uri("https://example.com/") + .body(Body::from_stream(futures::stream::empty::< + Result<_, io::Error>, + >())) + .expect("should build streaming test request"); + + let err = futures::executor::block_on( + client.send_async(PlatformHttpRequest::new(request, "nonexistent-backend")), + ) + .expect_err("should reject streaming request bodies before launching async send"); + + assert!( + matches!(err.current_context(), &PlatformError::HttpClient), + "should be HttpClient error, got: {:?}", + err.current_context() + ); + assert!( + format!("{err:?}").contains("streaming request body"), + "should describe the unsupported streaming body: {err:?}" + ); + } } diff --git a/crates/trusted-server-core/src/auction/README.md b/crates/trusted-server-core/src/auction/README.md index d6f4483a..bac2f3c3 100644 --- a/crates/trusted-server-core/src/auction/README.md +++ b/crates/trusted-server-core/src/auction/README.md @@ -257,18 +257,18 @@ The trusted-server handles several types of routes defined in `crates/trusted-se | Route | Method | Handler | Purpose | Line | |---------------------------|--------|--------------------------------|--------------------------------------------------|------| -| `/auction` | POST | `handle_auction()` | Main auction endpoint (Prebid.js/tsjs format) | 84 | -| `/first-party/proxy` | GET | `handle_first_party_proxy()` | Proxy creatives through first-party domain | 84 | -| `/first-party/click` | GET | `handle_first_party_click()` | Track clicks on ads | 85 | -| `/first-party/sign` | GET/POST | `handle_first_party_proxy_sign()` | Generate signed URLs for creatives | 86 | -| `/first-party/proxy-rebuild` | POST | `handle_first_party_proxy_rebuild()` | Rebuild creative HTML with new settings | 89 | -| `/static/tsjs=*` | GET | `handle_tsjs_dynamic()` | Serve tsjs library (Prebid.js alternative) | 66 | -| `/.well-known/ts.jwks.json` | GET | `handle_jwks_endpoint()` | Public key distribution for request signing | 71 | -| `/verify-signature` | POST | `handle_verify_signature()` | Verify signed requests | 74 | -| `/admin/keys/rotate` | POST | `handle_rotate_key()` | Rotate signing keys (admin only) | 77 | -| `/admin/keys/deactivate` | POST | `handle_deactivate_key()` | Deactivate signing keys (admin only) | 78 | -| `/integrations/*` | * | Integration Registry | Provider-specific endpoints (Prebid, etc.) | 92 | -| `*` (fallback) | * | `handle_publisher_request()` | Proxy to publisher origin | 108 | +| `/auction` | POST | `handle_auction()` | Main auction endpoint (Prebid.js/tsjs format) | 162 | +| `/first-party/proxy` | GET | `handle_first_party_proxy()` | Proxy creatives through first-party domain | 167 | +| `/first-party/click` | GET | `handle_first_party_click()` | Track clicks on ads | 170 | +| `/first-party/sign` | GET/POST | `handle_first_party_proxy_sign()` | Generate signed URLs for creatives | 173 | +| `/first-party/proxy-rebuild` | POST | `handle_first_party_proxy_rebuild()` | Rebuild creative HTML with new settings | 176 | +| `/static/tsjs=*` | GET | `handle_tsjs_dynamic()` | Serve tsjs library (Prebid.js alternative) | 145 | +| `/.well-known/trusted-server.json` | GET | `handle_trusted_server_discovery()` | Public key distribution for request signing | 149 | +| `/verify-signature` | POST | `handle_verify_signature()` | Verify signed requests | 154 | +| `/admin/keys/rotate` | POST | `handle_rotate_key()` | Rotate signing keys (admin only) | 158 | +| `/admin/keys/deactivate` | POST | `handle_deactivate_key()` | Deactivate signing keys (admin only) | 159 | +| `/integrations/*` | * | Integration Registry | Provider-specific endpoints (Prebid, etc.) | 179 | +| `*` (fallback) | * | `handle_publisher_request()` | Proxy to publisher origin | 195 | ### How Routing Works @@ -277,22 +277,50 @@ The Fastly Compute entrypoint uses pattern matching on `(Method, path)` tuples: ```rust let result = match (method, path.as_str()) { - // Auction endpoint + (Method::GET, path) if path.starts_with("/static/tsjs=") => { + handle_tsjs_dynamic(&req, integration_registry) + } + (Method::GET, "/.well-known/trusted-server.json") => { + handle_trusted_server_discovery(settings, runtime_services, req) + } + (Method::POST, "/verify-signature") => handle_verify_signature(settings, req), + (Method::POST, "/admin/keys/rotate") => handle_rotate_key(settings, req), + (Method::POST, "/admin/keys/deactivate") => handle_deactivate_key(settings, req), (Method::POST, "/auction") => { - handle_auction(&settings, &orchestrator, &runtime_services, req).await - }, - - // First-party endpoints - (Method::GET, "/first-party/proxy") => handle_first_party_proxy(&settings, req).await, - - // Integration registry (dynamic routes) - (m, path) if integration_registry.has_route(&m, path) => { - integration_registry.handle_proxy(&m, path, &settings, req).await + match runtime_services_for_consent_route(settings, runtime_services) { + Ok(auction_services) => { + handle_auction(settings, orchestrator, &auction_services, req).await + } + Err(e) => Err(e), + } + } + (Method::GET, "/first-party/proxy") => { + handle_first_party_proxy(settings, runtime_services, req).await + } + (Method::GET, "/first-party/click") => { + handle_first_party_click(settings, runtime_services, req).await + } + (Method::GET, "/first-party/sign") | (Method::POST, "/first-party/sign") => { + handle_first_party_proxy_sign(settings, runtime_services, req).await + } + (Method::POST, "/first-party/proxy-rebuild") => { + handle_first_party_proxy_rebuild(settings, runtime_services, req).await + } + (m, path) if integration_registry.has_route(&m, path) => integration_registry + .handle_proxy(&m, path, settings, runtime_services, req) + .await + .unwrap_or_else(|| { + Err(Report::new(TrustedServerError::BadRequest { + message: format!("Unknown integration route: {path}"), + })) + }), + _ => match runtime_services_for_consent_route(settings, runtime_services) { + Ok(publisher_services) => { + handle_publisher_request(settings, integration_registry, &publisher_services, req) + } + Err(e) => Err(e), }, - - // Fallback to publisher origin - _ => handle_publisher_request(&settings, &integration_registry, &runtime_services, req), -} +}; ``` #### 2. Integration Registry (Dynamic Routes) diff --git a/crates/trusted-server-core/src/auction/endpoints.rs b/crates/trusted-server-core/src/auction/endpoints.rs index 018b2b38..4ce4440e 100644 --- a/crates/trusted-server-core/src/auction/endpoints.rs +++ b/crates/trusted-server-core/src/auction/endpoints.rs @@ -8,7 +8,6 @@ use crate::consent; use crate::cookies::handle_request_cookies; use crate::edge_cookie::get_or_generate_ec_id; use crate::error::TrustedServerError; -use crate::geo::GeoInfo; use crate::platform::RuntimeServices; use crate::settings::Settings; @@ -32,7 +31,7 @@ use super::AuctionOrchestrator; pub async fn handle_auction( settings: &Settings, orchestrator: &AuctionOrchestrator, - runtime_services: &RuntimeServices, + services: &RuntimeServices, mut req: Request, ) -> Result> { // Parse request body @@ -49,15 +48,21 @@ pub async fn handle_auction( // Generate EC ID early so the consent pipeline can use it for // KV Store fallback/write operations. - let ec_id = - get_or_generate_ec_id(settings, &req).change_context(TrustedServerError::Auction { + let ec_id = get_or_generate_ec_id(settings, services, &req).change_context( + TrustedServerError::Auction { message: "Failed to generate EC ID".to_string(), - })?; + }, + )?; // Extract consent from request cookies, headers, and geo. let cookie_jar = handle_request_cookies(&req)?; - #[allow(deprecated)] - let geo = GeoInfo::from_request(&req); + let geo = services + .geo() + .lookup(services.client_info.client_ip) + .unwrap_or_else(|e| { + log::warn!("geo lookup failed: {e}"); + None + }); let consent_context = consent::build_consent_context(&consent::ConsentPipelineInput { jar: cookie_jar.as_ref(), req: &req, @@ -68,24 +73,32 @@ pub async fn handle_auction( .consent .consent_store .as_deref() - .map(|_| runtime_services.kv_store()), + .map(|_| services.kv_store()), }); // Convert tsjs request format to auction request - let auction_request = - convert_tsjs_to_auction_request(&body, settings, &req, consent_context, &ec_id)?; + let auction_request = convert_tsjs_to_auction_request( + &body, + settings, + services, + &req, + consent_context, + &ec_id, + geo, + )?; // Create auction context let context = AuctionContext { settings, request: &req, + client_info: &services.client_info, timeout_ms: settings.auction.timeout_ms, provider_responses: None, }; // Run the auction let result = orchestrator - .run_auction(&auction_request, &context) + .run_auction(&auction_request, &context, services) .await .change_context(TrustedServerError::Auction { message: "Auction orchestration failed".to_string(), diff --git a/crates/trusted-server-core/src/auction/formats.rs b/crates/trusted-server-core/src/auction/formats.rs index 1f557a17..5237921a 100644 --- a/crates/trusted-server-core/src/auction/formats.rs +++ b/crates/trusted-server-core/src/auction/formats.rs @@ -18,8 +18,8 @@ use crate::constants::{HEADER_X_TS_EC, HEADER_X_TS_EC_FRESH}; use crate::creative; use crate::edge_cookie::generate_ec_id; use crate::error::TrustedServerError; -use crate::geo::GeoInfo; use crate::openrtb::{to_openrtb_i32, OpenRtbBid, OpenRtbResponse, ResponseExt, SeatBid, ToExt}; +use crate::platform::{GeoInfo, RuntimeServices}; use crate::settings::Settings; use super::orchestrator::OrchestrationResult; @@ -83,14 +83,17 @@ pub struct BannerUnit { pub fn convert_tsjs_to_auction_request( body: &AdRequest, settings: &Settings, + services: &RuntimeServices, req: &Request, consent: ConsentContext, ec_id: &str, + geo: Option, ) -> Result> { let ec_id = ec_id.to_owned(); - let fresh_id = generate_ec_id(settings, req).change_context(TrustedServerError::Auction { - message: "Failed to generate fresh EC ID".to_string(), - })?; + let fresh_id = + generate_ec_id(settings, services).change_context(TrustedServerError::Auction { + message: "Failed to generate fresh EC ID".to_string(), + })?; // Convert ad units to slots let mut slots = Vec::new(); @@ -137,9 +140,8 @@ pub fn convert_tsjs_to_auction_request( user_agent: req .get_header_str("user-agent") .map(std::string::ToString::to_string), - ip: req.get_client_ip_addr().map(|ip| ip.to_string()), - #[allow(deprecated)] - geo: GeoInfo::from_request(req), + ip: services.client_info.client_ip.map(|ip| ip.to_string()), + geo, }); // Forward allowed config entries from the JS request into the context map. diff --git a/crates/trusted-server-core/src/auction/orchestrator.rs b/crates/trusted-server-core/src/auction/orchestrator.rs index 0fc6ee9f..3bee3117 100644 --- a/crates/trusted-server-core/src/auction/orchestrator.rs +++ b/crates/trusted-server-core/src/auction/orchestrator.rs @@ -1,12 +1,13 @@ //! Auction orchestrator for managing multi-provider auctions. use error_stack::{Report, ResultExt}; -use fastly::http::request::{select, PendingRequest}; use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use crate::error::TrustedServerError; +use crate::platform::{PlatformPendingRequest, RuntimeServices}; +use crate::proxy::platform_response_to_fastly; use super::config::AuctionConfig; use super::provider::AuctionProvider; @@ -65,6 +66,7 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { let start_time = Instant::now(); @@ -72,12 +74,13 @@ impl AuctionOrchestrator { let (strategy_name, result) = if self.config.has_mediator() { ( "parallel_mediation", - self.run_parallel_mediation(request, context).await?, + self.run_parallel_mediation(request, context, services) + .await?, ) } else { ( "parallel_only", - self.run_parallel_only(request, context).await?, + self.run_parallel_only(request, context, services).await?, ) }; @@ -102,9 +105,12 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { let mediation_start = Instant::now(); - let provider_responses = self.run_providers_parallel(request, context).await?; + let provider_responses = self + .run_providers_parallel(request, context, services) + .await?; let floor_prices = self.floor_prices_by_slot(request); let (mediator_response, winning_bids) = if let Some(mediator_name) = &self.config.mediator { @@ -139,6 +145,7 @@ impl AuctionOrchestrator { let mediator_context = AuctionContext { settings: context.settings, request: context.request, + client_info: context.client_info, timeout_ms: remaining_ms, provider_responses: Some(&provider_responses), }; @@ -150,9 +157,21 @@ impl AuctionOrchestrator { message: format!("Mediator {} failed to launch", mediator.provider_name()), })?; - let backend_response = pending.wait().change_context(TrustedServerError::Auction { - message: format!("Mediator {} request failed", mediator.provider_name()), - })?; + let platform_resp = services + .http_client() + .wait(PlatformPendingRequest::new(pending)) + .await + .change_context(TrustedServerError::Auction { + message: format!("Mediator {} request failed", mediator.provider_name()), + })?; + let backend_response = platform_response_to_fastly(platform_resp).change_context( + TrustedServerError::Auction { + message: format!( + "Mediator {} returned an unsupported response body", + mediator.provider_name() + ), + }, + )?; let response_time_ms = start_time.elapsed().as_millis() as u64; let mediator_resp = mediator @@ -205,8 +224,11 @@ impl AuctionOrchestrator { &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result> { - let provider_responses = self.run_providers_parallel(request, context).await?; + let provider_responses = self + .run_providers_parallel(request, context, services) + .await?; let floor_prices = self.floor_prices_by_slot(request); let winning_bids = self.select_winning_bids(&provider_responses, &floor_prices); @@ -221,12 +243,14 @@ impl AuctionOrchestrator { /// Run all providers in parallel and collect responses. /// - /// Uses `fastly::http::request::select()` to process responses as they - /// become ready, rather than waiting for each response sequentially. + /// Uses [`RuntimeServices::http_client`] and + /// [`crate::platform::PlatformHttpClient::select`] to process responses as + /// they become ready, rather than waiting for each response sequentially. async fn run_providers_parallel( &self, request: &AuctionRequest, context: &AuctionContext<'_>, + services: &RuntimeServices, ) -> Result, Report> { let provider_names = self.config.provider_names(); @@ -248,7 +272,7 @@ impl AuctionOrchestrator { // Maps backend_name -> (provider_name, start_time, provider) let mut backend_to_provider: HashMap = HashMap::new(); - let mut pending_requests: Vec = Vec::new(); + let mut pending_requests: Vec = Vec::new(); for provider_name in provider_names { let provider = match self.providers.get(provider_name) { @@ -300,6 +324,7 @@ impl AuctionOrchestrator { let provider_context = AuctionContext { settings: context.settings, request: context.request, + client_info: context.client_info, timeout_ms: effective_timeout, provider_responses: context.provider_responses, }; @@ -315,10 +340,11 @@ impl AuctionOrchestrator { match provider.request_bids(request, &provider_context) { Ok(pending) => { backend_to_provider.insert( - backend_name, + backend_name.clone(), (provider.provider_name(), start_time, provider.as_ref()), ); - pending_requests.push(pending); + pending_requests + .push(PlatformPendingRequest::new(pending).with_backend_name(backend_name)); log::debug!( "Request to '{}' launched successfully", provider.provider_name() @@ -353,33 +379,54 @@ impl AuctionOrchestrator { let mut remaining = pending_requests; while !remaining.is_empty() { - let (result, rest) = select(remaining); - remaining = rest; + let select_result = services + .http_client() + .select(remaining) + .await + .change_context(TrustedServerError::Auction { + message: "HTTP select failed".to_string(), + })?; + remaining = select_result.remaining; - match result { - Ok(response) => { + match select_result.ready { + Ok(platform_response) => { // Identify the provider from the backend name - let backend_name = response.get_backend_name().unwrap_or_default().to_string(); + let backend_name = platform_response.backend_name.clone().unwrap_or_default(); if let Some((provider_name, start_time, provider)) = backend_to_provider.remove(&backend_name) { let response_time_ms = start_time.elapsed().as_millis() as u64; - match provider.parse_response(response, response_time_ms) { - Ok(auction_response) => { - log::info!( - "Provider '{}' returned {} bids (status: {:?}, time: {}ms)", - auction_response.provider, - auction_response.bids.len(), - auction_response.status, - auction_response.response_time_ms - ); - responses.push(auction_response); + match platform_response_to_fastly(platform_response) { + Ok(response) => { + match provider.parse_response(response, response_time_ms) { + Ok(auction_response) => { + log::info!( + "Provider '{}' returned {} bids (status: {:?}, time: {}ms)", + auction_response.provider, + auction_response.bids.len(), + auction_response.status, + auction_response.response_time_ms + ); + responses.push(auction_response); + } + Err(e) => { + log::warn!( + "Provider '{}' failed to parse response: {:?}", + provider_name, + e + ); + responses.push(AuctionResponse::error( + provider_name, + response_time_ms, + )); + } + } } Err(e) => { log::warn!( - "Provider '{}' failed to parse response: {:?}", + "Provider '{}' returned an unsupported response body: {:?}", provider_name, e ); @@ -587,6 +634,16 @@ mod tests { use crate::auction::types::{ AdFormat, AdSlot, AuctionContext, AuctionRequest, Bid, MediaType, PublisherInfo, UserInfo, }; + + // All-None ClientInfo used across tests that don't need real IP/TLS data. + // Defined as a const so &EMPTY_CLIENT_INFO has 'static lifetime, avoiding + // the temporary-lifetime issue that arises with &ClientInfo::default(). + const EMPTY_CLIENT_INFO: crate::platform::ClientInfo = crate::platform::ClientInfo { + client_ip: None, + tls_protocol: None, + tls_cipher: None, + }; + use crate::platform::test_support::noop_services; use crate::test_support::tests::crate_test_settings_str; use fastly::Request; use std::collections::{HashMap, HashSet}; @@ -643,10 +700,12 @@ mod tests { fn create_test_context<'a>( settings: &'a crate::settings::Settings, req: &'a Request, + client_info: &'a crate::platform::ClientInfo, ) -> AuctionContext<'a> { AuctionContext { settings, request: req, + client_info, timeout_ms: 2000, provider_responses: None, } @@ -739,9 +798,11 @@ mod tests { let request = create_test_auction_request(); let settings = create_test_settings(); let req = Request::get("https://test.com/test"); - let context = create_test_context(&settings, &req); + let context = create_test_context(&settings, &req, &EMPTY_CLIENT_INFO); - let result = orchestrator.run_auction(&request, &context).await; + let result = orchestrator + .run_auction(&request, &context, &noop_services()) + .await; assert!(result.is_err()); let err = result.unwrap_err(); diff --git a/crates/trusted-server-core/src/auction/types.rs b/crates/trusted-server-core/src/auction/types.rs index aa863d61..82538206 100644 --- a/crates/trusted-server-core/src/auction/types.rs +++ b/crates/trusted-server-core/src/auction/types.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use crate::auction::context::ContextValue; use crate::geo::GeoInfo; +use crate::platform::ClientInfo; use crate::settings::Settings; /// Represents a unified auction request across all providers. @@ -102,6 +103,7 @@ pub struct SiteInfo { pub struct AuctionContext<'a> { pub settings: &'a Settings, pub request: &'a Request, + pub client_info: &'a ClientInfo, pub timeout_ms: u32, /// Provider responses from the bidding phase, used by mediators. /// This is `None` for regular bidders and `Some` when calling a mediator. diff --git a/crates/trusted-server-core/src/backend.rs b/crates/trusted-server-core/src/backend.rs index 3df84a7a..468a3f83 100644 --- a/crates/trusted-server-core/src/backend.rs +++ b/crates/trusted-server-core/src/backend.rs @@ -34,7 +34,7 @@ fn compute_host_header(scheme: &str, host: &str, port: u16) -> String { } /// Default first-byte timeout for backends (15 seconds). -const DEFAULT_FIRST_BYTE_TIMEOUT: Duration = Duration::from_secs(15); +pub(crate) const DEFAULT_FIRST_BYTE_TIMEOUT: Duration = Duration::from_secs(15); /// Configuration for creating a dynamic Fastly backend. /// diff --git a/crates/trusted-server-core/src/edge_cookie.rs b/crates/trusted-server-core/src/edge_cookie.rs index 063c3fdd..7d2094e3 100644 --- a/crates/trusted-server-core/src/edge_cookie.rs +++ b/crates/trusted-server-core/src/edge_cookie.rs @@ -14,6 +14,7 @@ use sha2::Sha256; use crate::constants::{COOKIE_TS_EC, HEADER_X_TS_EC}; use crate::cookies::{ec_id_has_only_allowed_chars, handle_request_cookies}; use crate::error::TrustedServerError; +use crate::platform::RuntimeServices; use crate::settings::Settings; type HmacSha256 = Hmac; @@ -67,12 +68,13 @@ fn generate_random_suffix(length: usize) -> String { /// - [`TrustedServerError::Ec`] if HMAC generation fails pub fn generate_ec_id( settings: &Settings, - req: &Request, + services: &RuntimeServices, ) -> Result> { // Fallback to "unknown" when client IP is unavailable (e.g., local testing). // All such requests share the same HMAC base; the random suffix provides uniqueness. - let client_ip = req - .get_client_ip_addr() + let client_ip = services + .client_info + .client_ip .map(normalize_ip) .unwrap_or_else(|| "unknown".to_string()); @@ -146,6 +148,7 @@ pub fn get_ec_id(req: &Request) -> Result, Report Result> { if let Some(id) = get_ec_id(req)? { @@ -153,7 +156,7 @@ pub fn get_or_generate_ec_id( } // If no existing EC ID found, generate a fresh one - let ec_id = generate_ec_id(settings, req)?; + let ec_id = generate_ec_id(settings, services)?; log::trace!("No existing EC ID, generated: {}", ec_id); Ok(ec_id) } @@ -164,6 +167,7 @@ mod tests { use fastly::http::{HeaderName, HeaderValue}; use std::net::{Ipv4Addr, Ipv6Addr}; + use crate::platform::test_support::{noop_services, noop_services_with_client_ip}; use crate::test_support::tests::create_test_settings; #[test] @@ -236,9 +240,8 @@ mod tests { #[test] fn test_generate_ec_id() { let settings: Settings = create_test_settings(); - let req = create_test_request(vec![]); - let ec_id = generate_ec_id(&settings, &req).expect("should generate EC ID"); + let ec_id = generate_ec_id(&settings, &noop_services()).expect("should generate EC ID"); log::debug!("Generated EC ID: {}", ec_id); assert!( is_ec_id_format(&ec_id), @@ -246,6 +249,25 @@ mod tests { ); } + #[test] + fn test_generate_ec_id_uses_client_ip() { + let settings = create_test_settings(); + let ip = IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)); + + let id_with_ip = generate_ec_id(&settings, &noop_services_with_client_ip(ip)) + .expect("should generate EC ID with client IP"); + let id_without_ip = generate_ec_id(&settings, &noop_services()) + .expect("should generate EC ID without client IP"); + + let hmac_with_ip = id_with_ip.split_once('.').expect("should contain dot").0; + let hmac_without_ip = id_without_ip.split_once('.').expect("should contain dot").0; + + assert_ne!( + hmac_with_ip, hmac_without_ip, + "should produce different HMAC when client IP differs" + ); + } + #[test] fn test_is_ec_id_format_accepts_valid_value() { let value = format!("{}.{}", "a".repeat(64), "Ab12z9"); @@ -290,7 +312,8 @@ mod tests { let ec_id = get_ec_id(&req).expect("should get EC ID"); assert_eq!(ec_id, Some("existing_ec_id".to_string())); - let ec_id = get_or_generate_ec_id(&settings, &req).expect("should reuse header EC ID"); + let ec_id = get_or_generate_ec_id(&settings, &noop_services(), &req) + .expect("should reuse header EC ID"); assert_eq!(ec_id, "existing_ec_id"); } @@ -305,7 +328,8 @@ mod tests { let ec_id = get_ec_id(&req).expect("should get EC ID"); assert_eq!(ec_id, Some("existing_cookie_id".to_string())); - let ec_id = get_or_generate_ec_id(&settings, &req).expect("should reuse cookie EC ID"); + let ec_id = get_or_generate_ec_id(&settings, &noop_services(), &req) + .expect("should reuse cookie EC ID"); assert_eq!(ec_id, "existing_cookie_id"); } @@ -321,7 +345,8 @@ mod tests { let settings = create_test_settings(); let req = create_test_request(vec![]); - let ec_id = get_or_generate_ec_id(&settings, &req).expect("should get or generate EC ID"); + let ec_id = get_or_generate_ec_id(&settings, &noop_services(), &req) + .expect("should get or generate EC ID"); assert!(!ec_id.is_empty()); } @@ -348,7 +373,7 @@ mod tests { let settings = create_test_settings(); let req = create_test_request(vec![(HEADER_X_TS_EC, "evil;injected")]); - let ec_id = get_or_generate_ec_id(&settings, &req) + let ec_id = get_or_generate_ec_id(&settings, &noop_services(), &req) .expect("should generate fresh ID on invalid header"); assert_ne!( ec_id, "evil;injected", diff --git a/crates/trusted-server-core/src/html_processor.rs b/crates/trusted-server-core/src/html_processor.rs index 540ab29d..ab5ff72a 100644 --- a/crates/trusted-server-core/src/html_processor.rs +++ b/crates/trusted-server-core/src/html_processor.rs @@ -1,6 +1,22 @@ -//! Simplified HTML processor that combines URL replacement and integration injection +//! Simplified HTML processor that combines URL replacement and integration injection. //! -//! This module provides a `StreamProcessor` implementation for HTML content. +//! This module provides a [`StreamProcessor`] implementation for HTML content. +//! It handles `