diff --git a/packages/optimization/src/ldai_optimization/client.py b/packages/optimization/src/ldai_optimization/client.py index 6d74481..9f51e5c 100644 --- a/packages/optimization/src/ldai_optimization/client.py +++ b/packages/optimization/src/ldai_optimization/client.py @@ -33,6 +33,7 @@ OptimizationResultPayload, ) from ldai_optimization.prompts import ( + _acceptance_criteria_implies_duration_optimization, build_message_history_text, build_new_variation_prompt, build_reasoning_history, @@ -80,6 +81,12 @@ def _compute_validation_count(pool_size: int) -> int: # the variation step is treated as a failure. _MAX_VARIATION_RETRIES = 3 +# Duration gate: a candidate must be at least this much faster than the baseline +# (history[0].duration_ms) to pass the duration check when acceptance criteria +# imply a latency optimization goal. 0.80 means the candidate must clock in at +# under 80% of the baseline — i.e. at least 20% improvement. +_DURATION_TOLERANCE = 0.80 + # Maps SDK status strings to the API status/activity values expected by # agent_optimization_result records. Defined at module level to avoid # allocating the dict on every on_status_update invocation. @@ -328,6 +335,7 @@ async def _call_judges( variables: Optional[Dict[str, Any]] = None, agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, + agent_duration_ms: Optional[float] = None, ) -> Dict[str, JudgeResult]: """ Call all judges in parallel (auto-path). @@ -344,6 +352,9 @@ async def _call_judges( :param agent_tools: Normalised list of tool dicts that were available to the agent :param expected_response: Optional ground truth expected response. When provided, judges are instructed to factor it into their scoring alongside acceptance criteria. + :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. + Forwarded to acceptance judges whose statement implies a latency goal so they + can mention the duration change in their rationale. :return: Dictionary of judge results (score and rationale) """ if not self._options.judges: @@ -396,6 +407,7 @@ async def _call_judges( variables=resolved_variables, agent_tools=resolved_agent_tools, expected_response=expected_response, + agent_duration_ms=agent_duration_ms, ) judge_results[judge_key] = result @@ -613,6 +625,7 @@ async def _evaluate_acceptance_judge( variables: Optional[Dict[str, Any]] = None, agent_tools: Optional[List[ToolDefinition]] = None, expected_response: Optional[str] = None, + agent_duration_ms: Optional[float] = None, ) -> JudgeResult: """ Evaluate using an acceptance statement judge. @@ -627,6 +640,9 @@ async def _evaluate_acceptance_judge( :param agent_tools: Normalised list of tool dicts that were available to the agent :param expected_response: Optional ground truth expected response. When provided, injected into instructions and judge message so the judge can score actual vs. expected. + :param agent_duration_ms: Wall-clock duration of the agent call in milliseconds. + When the acceptance statement implies a latency goal, the judge is instructed + to mention the duration change in its rationale. :return: The judge result with score and rationale """ if not optimization_judge.acceptance_statement: @@ -662,6 +678,32 @@ async def _evaluate_acceptance_judge( 'Example: {"score": 0.8, "rationale": "The response matches the acceptance statement well."}' ) + if ( + agent_duration_ms is not None + and _acceptance_criteria_implies_duration_optimization( + {judge_key: optimization_judge} + ) + ): + baseline_ms = ( + self._history[0].duration_ms + if self._history and self._history[0].duration_ms is not None + else None + ) + instructions += ( + f"\n\nThe acceptance criteria for this judge includes a latency/duration goal. " + f"The agent's response took {agent_duration_ms:.0f}ms to generate. " + ) + if baseline_ms is not None: + delta_ms = agent_duration_ms - baseline_ms + direction = "faster" if delta_ms < 0 else "slower" + instructions += ( + f"The baseline duration (first iteration) was {baseline_ms:.0f}ms. " + f"This response was {abs(delta_ms):.0f}ms {direction} than the baseline. " + ) + instructions += ( + "Please mention the duration and any change from baseline in your rationale." + ) + if resolved_variables: instructions += f"\n\nThe following variables were available to the agent: {json.dumps(resolved_variables)}" @@ -911,6 +953,11 @@ async def _run_ground_truth_optimization( else: sample_passed = self._evaluate_response(optimize_context) + if sample_passed and _acceptance_criteria_implies_duration_optimization( + self._options.judges + ): + sample_passed = self._evaluate_duration(optimize_context) + if not sample_passed: logger.info( "[GT Attempt %d] -> Sample %d/%d FAILED", @@ -1147,6 +1194,9 @@ async def _generate_new_variation( ) self._safe_status_update("generating variation", status_ctx, iteration) + optimize_for_duration = _acceptance_criteria_implies_duration_optimization( + self._options.judges + ) instructions = build_new_variation_prompt( self._history, self._options.judges, @@ -1156,6 +1206,7 @@ async def _generate_new_variation( self._options.model_choices, self._options.variable_choices, self._initial_instructions, + optimize_for_duration=optimize_for_duration, ) # Create a flat history list (without nested history) to avoid exponential growth @@ -1486,6 +1537,7 @@ async def _execute_agent_turn( variables=optimize_context.current_variables, agent_tools=agent_tools, expected_response=expected_response, + agent_duration_ms=agent_duration_ms, ) return dataclasses.replace( @@ -1523,6 +1575,38 @@ def _evaluate_response(self, optimize_context: OptimizationContext) -> bool: return True + def _evaluate_duration(self, optimize_context: OptimizationContext) -> bool: + """ + Check whether the candidate's duration meets the improvement target vs. the baseline. + + The baseline is history[0].duration_ms — the very first completed iteration, + representing the original unoptimized configuration's latency. The candidate + must be at least _DURATION_TOLERANCE faster (default: 20% improvement). + + Returns True without blocking when no baseline is available (empty history or + history[0].duration_ms is None), or when the candidate's duration_ms was not + captured. This avoids penalising configurations when timing data is missing. + + :param optimize_context: The completed turn context containing duration_ms + :return: True if the duration requirement is met or cannot be checked + """ + if not self._history or self._history[0].duration_ms is None: + return True + if optimize_context.duration_ms is None: + return True + baseline = self._history[0].duration_ms + passed = optimize_context.duration_ms < baseline * _DURATION_TOLERANCE + if not passed: + logger.warning( + "[Iteration %d] -> Duration check failed: %.0fms >= baseline %.0fms * %.0f%% (%.0fms)", + optimize_context.iteration, + optimize_context.duration_ms, + baseline, + _DURATION_TOLERANCE * 100, + baseline * _DURATION_TOLERANCE, + ) + return passed + def _handle_success( self, optimize_context: OptimizationContext, iteration: int ) -> Any: @@ -1691,6 +1775,11 @@ async def _run_validation_phase( else: sample_passed = self._evaluate_response(val_ctx) + if sample_passed and _acceptance_criteria_implies_duration_optimization( + self._options.judges + ): + sample_passed = self._evaluate_duration(val_ctx) + last_ctx = val_ctx if not sample_passed: @@ -1798,6 +1887,11 @@ async def _run_optimization( iteration, ) + if initial_passed and _acceptance_criteria_implies_duration_optimization( + self._options.judges + ): + initial_passed = self._evaluate_duration(optimize_context) + if initial_passed: all_valid, last_ctx = await self._run_validation_phase( optimize_context, iteration diff --git a/packages/optimization/src/ldai_optimization/prompts.py b/packages/optimization/src/ldai_optimization/prompts.py index c8631c5..bc10d81 100644 --- a/packages/optimization/src/ldai_optimization/prompts.py +++ b/packages/optimization/src/ldai_optimization/prompts.py @@ -1,5 +1,6 @@ """Prompt-building functions for LaunchDarkly AI optimization.""" +import re from typing import Any, Dict, List, Optional from ldai_optimization.dataclasses import ( @@ -7,6 +8,35 @@ OptimizationJudge, ) +_DURATION_KEYWORDS = re.compile( + r"\b(fast|faster|quickly|quick|latency|low-latency|duration|response\s+time|" + r"time\s+to\s+respond|milliseconds|performant|snappy|efficient|seconds)\b|" + r"(? bool: + """Return True if any judge acceptance statement implies a latency optimization goal. + + Scans each judge's acceptance_statement for latency-related keywords. The + check is case-insensitive. Returns False when judges is None or no judge + carries an acceptance statement. + + :param judges: Judge configuration dict from OptimizationOptions, or None. + :return: True if duration optimization should be applied. + """ + if not judges: + return False + for judge in judges.values(): + if judge.acceptance_statement and _DURATION_KEYWORDS.search( + judge.acceptance_statement + ): + return True + return False + def build_message_history_text( history: List[OptimizationContext], @@ -82,6 +112,7 @@ def build_new_variation_prompt( model_choices: List[str], variable_choices: List[Dict[str, Any]], initial_instructions: str, + optimize_for_duration: bool = False, ) -> str: """ Build the LLM prompt for generating an improved agent configuration. @@ -99,6 +130,8 @@ def build_new_variation_prompt( :param model_choices: List of model IDs the LLM may select from :param variable_choices: List of variable dicts (used to derive placeholder names) :param initial_instructions: The original unmodified instructions template + :param optimize_for_duration: When True, appends a duration optimization section + instructing the LLM to prefer faster models and simpler instructions. :return: The assembled prompt string """ sections = [ @@ -112,6 +145,7 @@ def build_new_variation_prompt( variation_prompt_improvement_instructions( history, model_choices, variable_choices, initial_instructions ), + variation_prompt_duration_optimization(model_choices) if optimize_for_duration else "", ] return "\n\n".join(s for s in sections if s) @@ -211,6 +245,8 @@ def variation_prompt_configuration( if previous_ctx.user_input: lines.append(f"User question: {previous_ctx.user_input}") lines.append(f"Agent response: {previous_ctx.completion_response}") + if previous_ctx.duration_ms is not None: + lines.append(f"Agent duration: {previous_ctx.duration_ms:.0f}ms") return "\n".join(lines) else: return "\n".join( @@ -262,6 +298,8 @@ def variation_prompt_feedback( if result.rationale: feedback_line += f"\n Reasoning: {result.rationale}" lines.append(feedback_line) + if ctx.duration_ms is not None: + lines.append(f"Agent duration: {ctx.duration_ms:.0f}ms") return "\n".join(lines) @@ -487,3 +525,33 @@ def variation_prompt_improvement_instructions( parameters_instructions, ] ) + + +def variation_prompt_duration_optimization(model_choices: List[str]) -> str: + """ + Duration optimization section of the variation prompt. + + Included when acceptance criteria imply a latency reduction goal. Instructs + the LLM to treat response speed as a secondary objective — quality criteria + must still be met first — and provides concrete guidance on how to reduce + latency through model selection and instruction simplification. + + :param model_choices: List of model IDs the LLM may select from, so it can + apply its own knowledge of which models tend to be faster. + :return: The duration optimization prompt block. + """ + return "\n".join( + [ + "## Duration Optimization:", + "The acceptance criteria for this optimization implies that response latency should be reduced.", + "In addition to improving quality, generate a variation that aims to reduce the agent's response time.", + "You may:", + "- Select a faster model from the available choices if quality requirements can still be met.", + f" Available models: {model_choices}", + " Use your knowledge of these models to prefer those that are known to respond more quickly.", + "- Simplify or shorten the instructions where this does not compromise the acceptance criteria.", + " Shorter prompts reduce input token counts and typically yield faster responses.", + "- Avoid increasing max_tokens or other parameters that extend generation time.", + "Quality criteria remain the primary objective — do not sacrifice passing scores to achieve lower latency.", + ] + ) diff --git a/packages/optimization/tests/test_client.py b/packages/optimization/tests/test_client.py index 992dc4c..8cec72f 100644 --- a/packages/optimization/tests/test_client.py +++ b/packages/optimization/tests/test_client.py @@ -24,6 +24,7 @@ ToolDefinition, ) from ldai_optimization.prompts import ( + _acceptance_criteria_implies_duration_optimization, build_new_variation_prompt, variation_prompt_acceptance_criteria, variation_prompt_improvement_instructions, @@ -459,6 +460,123 @@ async def test_variables_in_context(self): _, _, ctx = call_args.args assert ctx.variables == variables + async def test_duration_context_added_to_instructions_when_latency_keyword_present(self): + """When acceptance statement has a latency keyword and agent_duration_ms is provided, + the instructions mention the duration.""" + judge = OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="speed", + optimization_judge=judge, + completion_response="Here is the answer.", + iteration=2, + reasoning_history="", + user_input="Tell me something.", + agent_duration_ms=1500.0, + ) + _, config, _ = self.handle_judge_call.call_args.args + assert "1500ms" in config.instructions + assert "mention the duration" in config.instructions + + async def test_duration_context_includes_baseline_comparison_when_history_present(self): + """When history[0] has a duration, the judge instructions include a baseline comparison.""" + self.client._history = [ + OptimizationContext( + scores={}, + completion_response="old response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=1, + duration_ms=2000.0, + ) + ] + judge = OptimizationJudge( + threshold=0.8, + acceptance_statement="Responses should have low latency.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="latency", + optimization_judge=judge, + completion_response="Here is the answer.", + iteration=2, + reasoning_history="", + user_input="Tell me something.", + agent_duration_ms=1500.0, + ) + _, config, _ = self.handle_judge_call.call_args.args + assert "1500ms" in config.instructions + assert "2000ms" in config.instructions + assert "faster" in config.instructions + + async def test_duration_context_says_slower_when_candidate_is_slower(self): + """When the candidate is slower than baseline, the instructions say 'slower'.""" + self.client._history = [ + OptimizationContext( + scores={}, + completion_response="old response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=1, + duration_ms=1000.0, + ) + ] + judge = OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="speed", + optimization_judge=judge, + completion_response="Here is the answer.", + iteration=2, + reasoning_history="", + user_input="Tell me something.", + agent_duration_ms=1800.0, + ) + _, config, _ = self.handle_judge_call.call_args.args + assert "slower" in config.instructions + + async def test_duration_context_not_added_when_no_latency_keyword(self): + """When acceptance statement has no latency keyword, duration is not injected.""" + judge = OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="accuracy", + optimization_judge=judge, + completion_response="Paris.", + iteration=1, + reasoning_history="", + user_input="Capital of France?", + agent_duration_ms=2000.0, + ) + _, config, _ = self.handle_judge_call.call_args.args + assert "2000ms" not in config.instructions + assert "duration" not in config.instructions.lower() or "acceptance" in config.instructions.lower() + + async def test_duration_context_not_added_when_agent_duration_ms_is_none(self): + """When agent_duration_ms is None, no duration block is added even if keyword matches.""" + judge = OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ) + await self.client._evaluate_acceptance_judge( + judge_key="speed", + optimization_judge=judge, + completion_response="Here is the answer.", + iteration=1, + reasoning_history="", + user_input="Tell me something.", + agent_duration_ms=None, + ) + _, config, _ = self.handle_judge_call.call_args.args + assert "mention the duration" not in config.instructions + async def test_returns_zero_score_on_missing_acceptance_statement(self): judge = OptimizationJudge(threshold=0.8, acceptance_statement=None) result = await self.client._evaluate_acceptance_judge( @@ -2391,3 +2509,454 @@ async def test_optimize_from_config_dispatches_to_gt_run(self): assert isinstance(result, list) assert len(result) == 2 + + +# --------------------------------------------------------------------------- +# _acceptance_criteria_implies_duration_optimization +# --------------------------------------------------------------------------- + + +class TestAcceptanceCriteriaImpliesDurationOptimization: + def test_returns_false_when_judges_is_none(self): + assert _acceptance_criteria_implies_duration_optimization(None) is False + + def test_returns_false_when_judges_is_empty(self): + assert _acceptance_criteria_implies_duration_optimization({}) is False + + def test_returns_false_when_no_acceptance_statements(self): + judges = {"quality": OptimizationJudge(threshold=0.8, judge_key="judge-1")} + assert _acceptance_criteria_implies_duration_optimization(judges) is False + + def test_returns_false_when_acceptance_statement_has_no_latency_keywords(self): + judges = { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate and complete.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is False + + def test_detects_fast_keyword(self): + judges = { + "speed": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_faster_keyword(self): + judges = { + "speed": OptimizationJudge( + threshold=0.8, + acceptance_statement="The agent should respond faster.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_latency_keyword(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The agent must have low latency.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_duration_keyword(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="Minimize the duration of each response.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_ms_keyword(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="Responses should complete in under 500ms.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_response_time_phrase(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response time should be minimized.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_efficient_keyword(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The model must be efficient.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_detects_snappy_keyword(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="Responses should feel snappy.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_case_insensitive_match(self): + judges = { + "perf": OptimizationJudge( + threshold=0.8, + acceptance_statement="The model must be EFFICIENT and FAST.", + ) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_returns_true_when_any_judge_matches(self): + judges = { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ), + "speed": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ), + } + assert _acceptance_criteria_implies_duration_optimization(judges) is True + + def test_returns_false_when_acceptance_statement_is_none(self): + judges = { + "quality": OptimizationJudge(threshold=0.8, acceptance_statement=None) + } + assert _acceptance_criteria_implies_duration_optimization(judges) is False + + +# --------------------------------------------------------------------------- +# _evaluate_duration +# --------------------------------------------------------------------------- + + +class TestEvaluateDuration: + def setup_method(self): + self.client = _make_client() + self.client._options = _make_options() + self.client._agent_config = _make_agent_config() + self.client._initialize_class_members_from_config(_make_agent_config()) + + def _ctx(self, duration_ms, iteration=1): + return OptimizationContext( + scores={}, + completion_response="response", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + duration_ms=duration_ms, + ) + + def test_returns_true_when_history_is_empty(self): + self.client._history = [] + assert self.client._evaluate_duration(self._ctx(5000)) is True + + def test_returns_true_when_baseline_duration_is_none(self): + self.client._history = [self._ctx(None, iteration=1)] + assert self.client._evaluate_duration(self._ctx(5000, iteration=2)) is True + + def test_returns_true_when_candidate_duration_is_none(self): + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(None, iteration=2)) is True + + def test_passes_when_candidate_is_more_than_20_percent_faster(self): + # baseline=2000ms, threshold=1600ms, candidate=1500ms → 1500 < 1600 → pass + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(1500, iteration=2)) is True + + def test_fails_when_candidate_is_exactly_at_threshold(self): + # baseline=2000ms, threshold=1600ms, candidate=1600ms → not strictly less → fail + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(1600, iteration=2)) is False + + def test_fails_when_improvement_is_less_than_20_percent(self): + # baseline=2000ms, threshold=1600ms, candidate=1800ms → 1800 >= 1600 → fail + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(1800, iteration=2)) is False + + def test_fails_when_candidate_matches_baseline(self): + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(2000, iteration=2)) is False + + def test_fails_when_candidate_is_slower_than_baseline(self): + self.client._history = [self._ctx(2000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(2500, iteration=2)) is False + + def test_uses_history_index_zero_as_baseline_not_last(self): + # history[0] is 2000ms (baseline), history[-1] is 500ms (fast, but not the baseline) + first = self._ctx(2000, iteration=1) + later = self._ctx(500, iteration=2) + self.client._history = [first, later] + # candidate=1500ms < 2000 * 0.80 = 1600ms → pass (uses history[0], not history[-1]) + assert self.client._evaluate_duration(self._ctx(1500, iteration=3)) is True + + def test_pass_boundary_just_below_threshold(self): + # baseline=1000ms, threshold=800ms, candidate=799ms → pass + self.client._history = [self._ctx(1000, iteration=1)] + assert self.client._evaluate_duration(self._ctx(799, iteration=2)) is True + + +# --------------------------------------------------------------------------- +# Duration optimization — chaos mode wiring +# --------------------------------------------------------------------------- + + +class TestDurationOptimizationChaosMode: + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + def _duration_judges(self, statement="The response must be fast."): + return { + "speed": OptimizationJudge( + threshold=0.8, + acceptance_statement=statement, + ) + } + + def _ctx_with(self, duration_ms, score=1.0, iteration=1): + return OptimizationContext( + scores={"speed": JudgeResult(score=score)}, + completion_response="answer", + current_instructions="Do X.", + current_parameters={}, + current_variables={"language": "English"}, + iteration=iteration, + duration_ms=duration_ms, + ) + + async def test_duration_gate_triggers_variation_when_not_fast_enough(self): + """Judge passes but duration fails threshold → variation generated → second attempt succeeds.""" + client = _make_client(self.mock_ldai) + + # Iter 1: judge fails → history[0].duration_ms = 2000 + # Iter 2: judge passes, duration 1800ms ≥ 2000 * 0.80 = 1600ms → duration fails → variation + # Iter 3: judge passes, duration 1500ms < 1600ms → passes → validation → success + execute_side_effects = [ + self._ctx_with(duration_ms=2000, score=0.2, iteration=1), # iter 1: judge fails + self._ctx_with(duration_ms=1800, score=1.0, iteration=2), # iter 2: judge passes, duration fails + self._ctx_with(duration_ms=1500, score=1.0, iteration=3), # iter 3: both pass + self._ctx_with(duration_ms=1500, score=1.0, iteration=4), # validation + ] + + handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) + opts = _make_options( + handle_agent_call=handle_agent_call, + judges=self._duration_judges(), + max_attempts=5, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + result = await client.optimize_from_options("test-agent", opts) + + assert result.duration_ms == 1500 + # 2 variations generated (after iter 1 judge fail, after iter 2 duration fail) + assert handle_agent_call.call_count == 2 + assert mock_execute.call_count == 4 + + async def test_duration_check_skipped_on_first_iteration_no_baseline(self): + """First iteration has no history → duration check always skipped → succeeds even if slow.""" + client = _make_client(self.mock_ldai) + + # Iter 1 (no history): judge passes, duration check skipped → validation + # Validation: judge passes, duration check still uses history[0] = None since nothing appended yet + execute_side_effects = [ + self._ctx_with(duration_ms=9999, score=1.0, iteration=1), # iter 1: would fail if checked + self._ctx_with(duration_ms=9999, score=1.0, iteration=2), # validation + ] + + opts = _make_options( + handle_agent_call=AsyncMock(return_value=OptimizationResponse(output="answer")), + judges=self._duration_judges(), + max_attempts=3, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + result = await client.optimize_from_options("test-agent", opts) + + # Succeeds because history is empty and duration check is skipped + assert result.duration_ms == 9999 + + async def test_no_duration_gate_when_acceptance_criteria_has_no_latency_keywords(self): + """Acceptance statement with no latency keywords → duration gate never applied.""" + client = _make_client(self.mock_ldai) + + # Judge passes on first try; duration would fail if gate were applied (same as baseline) + # but since acceptance criteria has no latency keywords, it should succeed anyway + execute_side_effects = [ + self._ctx_with(duration_ms=2000, score=1.0, iteration=1), + self._ctx_with(duration_ms=2000, score=1.0, iteration=2), # validation + ] + + non_latency_judges = { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate and complete.", + ) + } + opts = _make_options( + handle_agent_call=AsyncMock(return_value=OptimizationResponse(output="answer")), + judges=non_latency_judges, + max_attempts=3, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + # Manually seed history so _evaluate_duration would fire if incorrectly triggered + client._history = [self._ctx_with(duration_ms=2000, iteration=0)] + result = await client.optimize_from_options("test-agent", opts) + + assert result is not None + + async def test_evaluate_duration_called_in_validation_phase(self): + """Duration gate also runs on validation samples, not just the primary turn.""" + client = _make_client(self.mock_ldai) + + # Iter 1: judge fails → history[0].duration_ms = 2000 + # Iter 2: judge passes, duration 1500ms → primary passes + # Validation sample: judge passes, duration 1800ms ≥ 1600ms → validation fails → variation + # Iter 3: judge passes, duration 1500ms → primary passes + # Validation: judge passes, duration 1500ms → validation passes → success + execute_side_effects = [ + self._ctx_with(duration_ms=2000, score=0.2, iteration=1), # iter 1: judge fails + self._ctx_with(duration_ms=1500, score=1.0, iteration=2), # iter 2: passes + self._ctx_with(duration_ms=1800, score=1.0, iteration=3), # validation: duration fails + self._ctx_with(duration_ms=1500, score=1.0, iteration=4), # iter 3: passes + self._ctx_with(duration_ms=1500, score=1.0, iteration=5), # validation: passes + ] + + handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) + opts = _make_options( + handle_agent_call=handle_agent_call, + judges=self._duration_judges(), + max_attempts=5, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + result = await client.optimize_from_options("test-agent", opts) + + assert result.duration_ms == 1500 + assert mock_execute.call_count == 5 + + +# --------------------------------------------------------------------------- +# Duration optimization — ground truth mode wiring +# --------------------------------------------------------------------------- + + +class TestDurationOptimizationGroundTruthMode: + def setup_method(self): + self.mock_ldai = _make_ldai_client() + + def _duration_judges(self): + return { + "speed": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be fast.", + ) + } + + def _gt_ctx(self, duration_ms, score=1.0, iteration=1, user_input="q"): + return OptimizationContext( + scores={"speed": JudgeResult(score=score)}, + completion_response="answer", + current_instructions="Do X.", + current_parameters={}, + current_variables={}, + iteration=iteration, + duration_ms=duration_ms, + user_input=user_input, + ) + + async def test_duration_gate_applied_per_sample_in_ground_truth_mode(self): + """In GT mode, the duration check fires per sample, not just once per attempt.""" + client = _make_client(self.mock_ldai) + + # Attempt 1: + # Sample 1: judge fails (score 0.2) → all_passed = False + # Sample 2: judge passes → duration skipped (history empty for sample 2) + # → history extended with attempt 1 results → variation generated + # Attempt 2: + # Sample 1: judge passes, duration 1800ms vs baseline history[0].duration_ms = 2000ms + # → 1800 >= 1600 → duration fails → sample_passed = False → all_passed = False + # (attempt 2 fails due to duration on sample 1) + # → variation generated + # Attempt 3: + # Sample 1: judge passes, duration 1500ms < 1600ms → passes + # Sample 2: judge passes, duration 1500ms (history[0] still 2000ms) → passes + # → all_passed = True → success + execute_side_effects = [ + # Attempt 1 + self._gt_ctx(duration_ms=2000, score=0.2, iteration=1, user_input="q1"), + self._gt_ctx(duration_ms=2000, score=1.0, iteration=2, user_input="q2"), + # Variation (not from _execute_agent_turn, from handle_agent_call) + # Attempt 2 + self._gt_ctx(duration_ms=1800, score=1.0, iteration=3, user_input="q1"), + self._gt_ctx(duration_ms=1800, score=1.0, iteration=4, user_input="q2"), + # Variation + # Attempt 3 + self._gt_ctx(duration_ms=1500, score=1.0, iteration=5, user_input="q1"), + self._gt_ctx(duration_ms=1500, score=1.0, iteration=6, user_input="q2"), + ] + + handle_agent_call = AsyncMock(return_value=OptimizationResponse(output=VARIATION_RESPONSE)) + opts = _make_gt_options( + handle_agent_call=handle_agent_call, + judges=self._duration_judges(), + max_attempts=5, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + assert isinstance(results, list) + for ctx in results: + assert ctx.duration_ms == 1500 + # 2 variations generated + assert handle_agent_call.call_count == 2 + assert mock_execute.call_count == 6 + + async def test_no_duration_gate_in_gt_mode_when_no_latency_keywords(self): + """In GT mode, duration gate is not applied when acceptance criteria has no latency keywords.""" + client = _make_client(self.mock_ldai) + + execute_side_effects = [ + self._gt_ctx(duration_ms=5000, score=1.0, iteration=1, user_input="q1"), + self._gt_ctx(duration_ms=5000, score=1.0, iteration=2, user_input="q2"), + ] + + non_latency_judges = { + "accuracy": OptimizationJudge( + threshold=0.8, + acceptance_statement="The response must be accurate.", + ) + } + opts = _make_gt_options( + handle_agent_call=AsyncMock(return_value=OptimizationResponse(output="answer")), + judges=non_latency_judges, + max_attempts=3, + ) + + with patch.object(client, "_execute_agent_turn", new_callable=AsyncMock) as mock_execute: + mock_execute.side_effect = execute_side_effects + results = await client.optimize_from_ground_truth_options("test-agent", opts) + + # Succeeds on first attempt even with slow duration (no latency keyword → no gate) + assert isinstance(results, list) + assert mock_execute.call_count == 2