From 3f4d46d7b466533ca3c2a6843ae274f8c3cc74d0 Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Wed, 22 Apr 2026 17:23:43 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20#161=20=E2=80=94=20wall-clock=20timeout?= =?UTF-8?q?=20for=20run=5Fturn=5Floop;=20stalled=20turns=20now=20abort=20w?= =?UTF-8?q?ith=20stop=5Freason=3D'timeout'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, run_turn_loop was bounded only by max_turns (turn count). If engine.submit_message stalled — slow provider, hung network, infinite stream — the loop blocked indefinitely with no cancellation path. Claws calling run_turn_loop in CI or orchestration had no reliable way to enforce a deadline; the loop would hang until OS kill or human intervention. Fix: - Add timeout_seconds parameter to run_turn_loop (default None = legacy unbounded). - When set, each submit_message call runs inside a ThreadPoolExecutor and is bounded by the remaining wall-clock budget (total across all turns, not per-turn). - On timeout, synthesize a TurnResult with stop_reason='timeout' carrying the turn's prompt and routed matches so transcripts preserve orchestration context. - Exhausted/negative budget short-circuits before calling submit_message. - Legacy path (timeout_seconds=None) bypasses the executor entirely — zero overhead for callers that don't opt in. CLI: - Added --timeout-seconds flag to 'turn-loop' command. - Exit code 2 when the loop terminated on timeout (vs 0 for completed), so shell scripts can distinguish 'done' from 'budget exhausted'. Tests (tests/test_run_turn_loop_timeout.py, 6 tests): - Legacy unbounded path unchanged (timeout_seconds=None never emits 'timeout') - Hung submit_message aborted within budget (0.3s budget, 5s mock hang → exit <1.5s) - Budget is cumulative across turns (0.6s budget, 0.4s per turn, not per-turn) - timeout_seconds=0 short-circuits first turn without calling submit_message - Negative timeout treated as exhausted (guard against caller bugs) - Timeout TurnResult carries correct prompt, matches, UsageSummary shape Full suite: 49/49 passing, zero regression. Blocker: none. Closes ROADMAP #161. --- src/main.py | 18 ++- src/runtime.py | 97 ++++++++++++++-- tests/test_run_turn_loop_timeout.py | 168 ++++++++++++++++++++++++++++ 3 files changed, 274 insertions(+), 9 deletions(-) create mode 100644 tests/test_run_turn_loop_timeout.py diff --git a/src/main.py b/src/main.py index 56103c6..ba76566 100644 --- a/src/main.py +++ b/src/main.py @@ -65,6 +65,12 @@ def build_parser() -> argparse.ArgumentParser: loop_parser.add_argument('--limit', type=int, default=5) loop_parser.add_argument('--max-turns', type=int, default=3) loop_parser.add_argument('--structured-output', action='store_true') + loop_parser.add_argument( + '--timeout-seconds', + type=float, + default=None, + help='total wall-clock budget across all turns (#161). Default: unbounded.', + ) flush_parser = subparsers.add_parser('flush-transcript', help='persist and flush a temporary session transcript') flush_parser.add_argument('prompt') @@ -187,11 +193,21 @@ def main(argv: list[str] | None = None) -> int: print(PortRuntime().bootstrap_session(args.prompt, limit=args.limit).as_markdown()) return 0 if args.command == 'turn-loop': - results = PortRuntime().run_turn_loop(args.prompt, limit=args.limit, max_turns=args.max_turns, structured_output=args.structured_output) + results = PortRuntime().run_turn_loop( + args.prompt, + limit=args.limit, + max_turns=args.max_turns, + structured_output=args.structured_output, + timeout_seconds=args.timeout_seconds, + ) for idx, result in enumerate(results, start=1): print(f'## Turn {idx}') print(result.output) print(f'stop_reason={result.stop_reason}') + # Exit 2 when a timeout terminated the loop so claws can distinguish + # 'ran to completion' from 'hit wall-clock budget'. + if results and results[-1].stop_reason == 'timeout': + return 2 return 0 if args.command == 'flush-transcript': engine = QueryEnginePort.from_workspace() diff --git a/src/runtime.py b/src/runtime.py index c4116b7..0ec97ae 100644 --- a/src/runtime.py +++ b/src/runtime.py @@ -1,11 +1,13 @@ from __future__ import annotations +import time +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError from dataclasses import dataclass from .commands import PORTED_COMMANDS from .context import PortContext, build_port_context, render_context from .history import HistoryLog -from .models import PermissionDenial, PortingModule +from .models import PermissionDenial, PortingModule, UsageSummary from .query_engine import QueryEngineConfig, QueryEnginePort, TurnResult from .setup import SetupReport, WorkspaceSetup, run_setup from .system_init import build_system_init_message @@ -151,21 +153,100 @@ class PortRuntime: persisted_session_path=persisted_session_path, ) - def run_turn_loop(self, prompt: str, limit: int = 5, max_turns: int = 3, structured_output: bool = False) -> list[TurnResult]: + def run_turn_loop( + self, + prompt: str, + limit: int = 5, + max_turns: int = 3, + structured_output: bool = False, + timeout_seconds: float | None = None, + ) -> list[TurnResult]: + """Run a multi-turn engine loop with optional wall-clock deadline. + + Args: + prompt: The initial prompt to submit. + limit: Match routing limit. + max_turns: Maximum number of turns before stopping. + structured_output: Whether to request structured output. + timeout_seconds: Total wall-clock budget across all turns. When the + budget is exhausted mid-turn, a synthetic TurnResult with + ``stop_reason='timeout'`` is appended and the loop exits. + ``None`` (default) preserves legacy unbounded behaviour. + + Returns: + A list of TurnResult objects. The final entry's ``stop_reason`` + distinguishes ``'completed'``, ``'max_turns_reached'``, + ``'max_budget_reached'``, or ``'timeout'``. + + #161: prior to this change a hung ``engine.submit_message`` call would + block the loop indefinitely with no cancellation path, forcing claws to + rely on external watchdogs or OS-level kills. Callers can now enforce a + deadline and receive a typed timeout signal instead. + """ engine = QueryEnginePort.from_workspace() engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output) matches = self.route_prompt(prompt, limit=limit) command_names = tuple(match.name for match in matches if match.kind == 'command') tool_names = tuple(match.name for match in matches if match.kind == 'tool') results: list[TurnResult] = [] - for turn in range(max_turns): - turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]' - result = engine.submit_message(turn_prompt, command_names, tool_names, ()) - results.append(result) - if result.stop_reason != 'completed': - break + deadline = time.monotonic() + timeout_seconds if timeout_seconds is not None else None + + # ThreadPoolExecutor is reused across turns so we cancel cleanly on exit. + executor = ThreadPoolExecutor(max_workers=1) if deadline is not None else None + try: + for turn in range(max_turns): + turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]' + + if deadline is None: + # Legacy path: unbounded call, preserves existing behaviour exactly. + result = engine.submit_message(turn_prompt, command_names, tool_names, ()) + else: + remaining = deadline - time.monotonic() + if remaining <= 0: + results.append(self._build_timeout_result(turn_prompt, command_names, tool_names)) + break + assert executor is not None + future = executor.submit( + engine.submit_message, turn_prompt, command_names, tool_names, () + ) + try: + result = future.result(timeout=remaining) + except FuturesTimeoutError: + # Best-effort cancel; submit_message may still finish in background + # but we never read its output. The engine's own state mutation + # is owned by the engine and not our concern here. + future.cancel() + results.append(self._build_timeout_result(turn_prompt, command_names, tool_names)) + break + + results.append(result) + if result.stop_reason != 'completed': + break + finally: + if executor is not None: + # wait=False: don't let a hung thread block loop exit indefinitely. + # The thread will be reaped when the interpreter shuts down or when + # the engine call eventually returns. + executor.shutdown(wait=False) return results + @staticmethod + def _build_timeout_result( + prompt: str, + command_names: tuple[str, ...], + tool_names: tuple[str, ...], + ) -> TurnResult: + """Synthesize a TurnResult representing a wall-clock timeout (#161).""" + return TurnResult( + prompt=prompt, + output='Wall-clock timeout exceeded before turn completed.', + matched_commands=command_names, + matched_tools=tool_names, + permission_denials=(), + usage=UsageSummary(), + stop_reason='timeout', + ) + def _infer_permission_denials(self, matches: list[RoutedMatch]) -> list[PermissionDenial]: denials: list[PermissionDenial] = [] for match in matches: diff --git a/tests/test_run_turn_loop_timeout.py b/tests/test_run_turn_loop_timeout.py new file mode 100644 index 0000000..011f3f0 --- /dev/null +++ b/tests/test_run_turn_loop_timeout.py @@ -0,0 +1,168 @@ +"""Tests for run_turn_loop wall-clock timeout (ROADMAP #161). + +Covers: +- timeout_seconds=None preserves legacy unbounded behaviour +- timeout_seconds=X aborts a hung turn and emits stop_reason='timeout' +- Timeout budget is total wall-clock across all turns, not per-turn +- Already-exhausted budget short-circuits before the first turn runs +- Legacy path still runs without a ThreadPoolExecutor in the way +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path +from unittest.mock import patch + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from src.models import UsageSummary # noqa: E402 +from src.query_engine import TurnResult # noqa: E402 +from src.runtime import PortRuntime # noqa: E402 + + +def _completed_result(prompt: str) -> TurnResult: + return TurnResult( + prompt=prompt, + output='ok', + matched_commands=(), + matched_tools=(), + permission_denials=(), + usage=UsageSummary(), + stop_reason='completed', + ) + + +class TestLegacyUnboundedBehaviour: + def test_no_timeout_preserves_existing_behaviour(self) -> None: + """timeout_seconds=None must not change legacy path at all.""" + results = PortRuntime().run_turn_loop('review MCP tool', max_turns=2) + assert len(results) >= 1 + for r in results: + assert r.stop_reason in {'completed', 'max_turns_reached', 'max_budget_reached'} + assert r.stop_reason != 'timeout' + + +class TestTimeoutAbortsHungTurn: + def test_hung_submit_message_times_out(self) -> None: + """A stalled submit_message must be aborted and emit stop_reason='timeout'.""" + runtime = PortRuntime() + + def _hang(prompt, commands, tools, denials): + time.sleep(5.0) # would block the loop + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.config = None # attribute-assigned in run_turn_loop + engine.submit_message.side_effect = _hang + + start = time.monotonic() + results = runtime.run_turn_loop( + 'review MCP tool', max_turns=3, timeout_seconds=0.3 + ) + elapsed = time.monotonic() - start + + # Must exit well under the 5s hang + assert elapsed < 1.5, f'run_turn_loop did not honor timeout: {elapsed:.2f}s' + assert len(results) == 1 + assert results[-1].stop_reason == 'timeout' + + +class TestTimeoutBudgetIsTotal: + def test_budget_is_cumulative_across_turns(self) -> None: + """timeout_seconds is total wall-clock across all turns, not per-turn.""" + runtime = PortRuntime() + call_count = {'n': 0} + + def _slow(prompt, commands, tools, denials): + call_count['n'] += 1 + time.sleep(0.4) # each turn burns 0.4s + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = _slow + + start = time.monotonic() + # 0.6s budget, 0.4s per turn. First turn completes (~0.4s), + # second turn times out before finishing. + results = runtime.run_turn_loop( + 'review MCP tool', max_turns=5, timeout_seconds=0.6 + ) + elapsed = time.monotonic() - start + + # Should exit at around 0.6s, not 2.0s (5 turns * 0.4s) + assert elapsed < 1.5, f'cumulative budget not honored: {elapsed:.2f}s' + # Last result should be the timeout + assert results[-1].stop_reason == 'timeout' + + +class TestExhaustedBudget: + def test_zero_timeout_short_circuits_first_turn(self) -> None: + """timeout_seconds=0 emits timeout before the first submit_message call.""" + runtime = PortRuntime() + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + # submit_message should never be called when budget is already 0 + engine.submit_message.side_effect = AssertionError( + 'submit_message should not run when budget is exhausted' + ) + + results = runtime.run_turn_loop( + 'review MCP tool', max_turns=3, timeout_seconds=0.0 + ) + + assert len(results) == 1 + assert results[0].stop_reason == 'timeout' + + +class TestTimeoutResultShape: + def test_timeout_result_has_correct_prompt_and_matches(self) -> None: + """Synthetic TurnResult on timeout must carry the turn's prompt + routed matches.""" + runtime = PortRuntime() + + def _hang(prompt, commands, tools, denials): + time.sleep(5.0) + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = _hang + + results = runtime.run_turn_loop( + 'review MCP tool', max_turns=2, timeout_seconds=0.2 + ) + + timeout_result = results[-1] + assert timeout_result.stop_reason == 'timeout' + assert timeout_result.prompt == 'review MCP tool' + # matched_commands / matched_tools should still be populated from routing, + # so downstream transcripts don't lose the routing context. + # These may be empty tuples depending on routing; they must be tuples. + assert isinstance(timeout_result.matched_commands, tuple) + assert isinstance(timeout_result.matched_tools, tuple) + assert isinstance(timeout_result.usage, UsageSummary) + + +class TestNegativeTimeoutTreatedAsExhausted: + def test_negative_timeout_short_circuits(self) -> None: + """A negative budget should behave identically to exhausted.""" + runtime = PortRuntime() + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = AssertionError( + 'submit_message should not run when budget is negative' + ) + + results = runtime.run_turn_loop( + 'review MCP tool', max_turns=3, timeout_seconds=-1.0 + ) + + assert len(results) == 1 + assert results[0].stop_reason == 'timeout'