From 79a9f0e6f6595bf876ef9183c4a58053d7a313e3 Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Wed, 22 Apr 2026 17:37:22 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20#163=20=E2=80=94=20remove=20[turn=20N]?= =?UTF-8?q?=20suffix=20pollution=20from=20run=5Fturn=5Floop;=20file=20#164?= =?UTF-8?q?=20timeout-cancellation=20followup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #163: run_turn_loop no longer injects f'{prompt} [turn N]' into follow-up prompts. The suffix was never defined or interpreted anywhere — not by the engine, not by the system prompt, not by any LLM. It looked like a real user-typed annotation in the transcript and made replay/analysis fragile. New behaviour: - turn 0 submits the original prompt (unchanged) - turn > 0 submits caller-supplied continuation_prompt if provided, else the loop stops cleanly — no fabricated user turn - added continuation_prompt: str | None = None parameter to run_turn_loop - added --continuation-prompt CLI flag for claws scripting multi-turn loops - zero '[turn' strings ever appear in mutable_messages or stdout now Behaviour change for existing callers: - Before: run_turn_loop(prompt, max_turns=3) submitted 3 turns ('prompt', 'prompt [turn 2]', 'prompt [turn 3]') - After: run_turn_loop(prompt, max_turns=3) submits 1 turn ('prompt') - To preserve old multi-turn behaviour, pass continuation_prompt='Continue.' or any structured follow-up text One existing timeout test (test_budget_is_cumulative_across_turns) updated to pass continuation_prompt so the cumulative-budget contract is actually exercised across turns instead of trivially satisfied by a one-turn loop. #164 filed: addresses reviewer feedback on #161. The wall-clock timeout bounds the caller-facing wait, but the underlying submit_message worker thread keeps running and can mutate engine state after the timeout TurnResult is returned. A cooperative cancel_event pattern is sketched in the pinpoint; real asyncio.Task.cancel() support will come once provider IO is async-native (larger refactor). Tests (tests/test_run_turn_loop_continuation.py, 8 tests): - TestNoTurnSuffixInjection (2): zero '[turn' strings in any submitted prompt, both default and explicit-continuation paths - TestContinuationDefaultStopsAfterTurnZero (2): default loops run exactly one turn; engine.submit_message called exactly once despite max_turns=10 - TestExplicitContinuationBehaviour (2): turn 0 = original, turn N = continuation verbatim; max_turns still respected - TestCLIContinuationFlag (2): CLI default emits only '## Turn 1'; --continuation-prompt wires through to multi-turn behaviour Full suite: 67/67 passing. Closes ROADMAP #163. Files #164. --- ROADMAP.md | 64 +++++++++ src/main.py | 10 ++ src/runtime.py | 28 +++- tests/test_run_turn_loop_continuation.py | 161 +++++++++++++++++++++++ tests/test_run_turn_loop_timeout.py | 13 +- 5 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 tests/test_run_turn_loop_continuation.py diff --git a/ROADMAP.md b/ROADMAP.md index 291ba8a..4cb03e4 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -6230,3 +6230,67 @@ Remove the `[turn N]` suffix entirely. Add an optional `continuation_prompt: str **Blocker.** None. **Source.** Jobdori dogfood sweep 2026-04-22 10:06 KST — read `src/runtime.py:154-168`, reproduced the `[turn N]` suffix injection pattern, confirmed no system-prompt or engine-side interpretation of the suffix exists. + +## Pinpoint #164. `run_turn_loop` timeout returns control to caller but does not cancel the underlying `submit_message` work — wedged provider threads leak past the deadline + +**Gap.** The #161 fix bounds the caller-facing wait on `PortRuntime.run_turn_loop` via `ThreadPoolExecutor.submit(...).result(timeout=...)`, but `ThreadPoolExecutor.shutdown(wait=False)` does not actually cancel a thread already running `engine.submit_message`. Python's threading model does not expose safe cooperative cancellation of arbitrary blocking calls (no `pthread_cancel`-equivalent for user code), so once a turn wedges on a slow provider the thread keeps running in the background after `run_turn_loop` returns. Concretely: + +1. **Caller receives `TurnResult(stop_reason='timeout')` on time** — the caller-facing deadline works correctly (confirmed by 6 tests in `tests/test_run_turn_loop_timeout.py`). +2. **But the worker thread is still executing `engine.submit_message`** — it will complete (or not) whenever the underlying `_format_output` / projected_usage computation returns, mutating the engine's `mutable_messages`, `transcript_store`, `total_usage` at an unpredictable later time. +3. **If the caller reuses the same engine** (e.g., a long-lived CLI session or orchestration harness that pools engines), those deferred mutations land silently on top of fresh turns, corrupting the session in a way that `stop_reason` cannot signal. +4. **If the caller spawns many turn loops in parallel**, leaked threads accumulate and the process memory/file-handle footprint grows without bound. + +**Repro (conceptual).** +```python +import time +from src.runtime import PortRuntime +from src.query_engine import QueryEnginePort +from unittest.mock import patch + +slow_calls = [] + +def hang_and_mutate(self, prompt, *args, **kwargs): + # Simulates a slow provider that eventually returns and mutates engine state. + time.sleep(2.0) + self.mutable_messages.append(f'LATE: {prompt}') # silent mutation after timeout + slow_calls.append(prompt) + return None # irrelevant, caller has already given up + +with patch.object(QueryEnginePort, 'submit_message', hang_and_mutate): + runtime = PortRuntime() + # Timeout fires at 0.2s, caller gets synthetic timeout result + results = runtime.run_turn_loop('x', timeout_seconds=0.2) + assert results[-1].stop_reason == 'timeout' + # But 2 seconds later the background thread still mutates the engine + time.sleep(2.5) + assert slow_calls == ['x'] # the "cancelled" turn actually ran to completion +``` + +**Impact on claws.** +- Orchestration harnesses cannot safely reuse `QueryEnginePort` instances across timeouts. Every timeout implicitly requires discarding the engine, which breaks session continuity. +- Hung threads leak across long-running claw processes (daemon-mode claws, CI workers, cron harnesses). Resource bounds are the OS's problem, not the harness's. +- "Timeout fired, session is clean" is not actually true — `TurnResult(stop_reason='timeout')` only means "the caller got control back in time", not "the turn was cancelled". + +**Root cause.** Two layers: +1. `PortRuntime.run_turn_loop` uses `executor.shutdown(wait=False)` which lets the interpreter reap the thread eventually but does not signal cancellation to the running code. +2. `QueryEnginePort.submit_message` has no cooperative cancellation hook — no `cancel_event: threading.Event | None = None` parameter, no periodic check inside `_format_output` or the projected-usage computation, no abortable IO wrapper around any future provider calls. Even if the runtime layer wanted to ask the turn to stop, there is no receiver. + +**Fix shape (~30 lines, two-stage).** + +*Stage A — runtime layer (claws benefit immediately).* +1. Introduce a `threading.Event` as `cancel_event`. Pass it into `engine.submit_message` via a new optional parameter. +2. On timeout in `run_turn_loop`, set `cancel_event` before returning the synthetic timeout result so any check inside the engine can observe it. +3. Ensure the worker thread is marked as a daemon (`ThreadPoolExecutor(max_workers=1, thread_name_prefix='claw-turn-cancellable')` — daemon=True is not directly configurable on stdlib Executor, but we can switch to `threading.Thread(daemon=True)` for the single-worker case). + +*Stage B — engine layer (makes Stage A effective).* +4. `submit_message` accepts `cancel_event: threading.Event | None = None` and checks `cancel_event.is_set()` at safe cancellation points: before `_format_output`, before each mutation, before `compact_messages_if_needed`. If set, raise a `TurnCancelled` exception (or return an early `TurnResult(stop_reason='cancelled')` — exception is cleaner because it propagates through the Future). +5. Any future network/provider call paths wrap their blocking IO in a loop that checks `cancel_event` between retries / chunks, or uses `socket.settimeout` / `httpx.AsyncClient` with a cancellation token. + +*Stage C — contract.* +6. Document that `stop_reason='timeout'` now means "the turn was asked to cancel and had a fair chance to observe it". Threads that ignore cancellation (e.g., pure-CPU loops with no check) can still leak, but cooperative paths clean up. + +**Acceptance.** After `run_turn_loop(..., timeout_seconds=0.2)` returns a timeout result, within a bounded grace window (say 100ms) the underlying worker thread has either finished cooperatively or acknowledged the cancel event. `engine.mutable_messages` does not grow after the timeout TurnResult is returned. A reused engine can safely accept a fresh `submit_message` call without inheriting deferred mutations from the cancelled turn. + +**Blocker.** Python threading does not expose preemptive cancellation, so purely CPU-bound stalls inside `_format_output` or provider client libraries cannot be force-killed. The fix makes cancellation *cooperative*, not *guaranteed*. Eventually the engine will need an `asyncio`-native path with `asyncio.Task.cancel()` for real provider IO, but that is a larger refactor. + +**Source.** Jobdori dogfood sweep 2026-04-22 17:36 KST — filed while landing #162, following review feedback on #161 that pointed out the caller-facing timeout and underlying work-cancellation are two different problems. #161 closed the first; #164 is the second. diff --git a/src/main.py b/src/main.py index ba76566..e1a5663 100644 --- a/src/main.py +++ b/src/main.py @@ -71,6 +71,15 @@ def build_parser() -> argparse.ArgumentParser: default=None, help='total wall-clock budget across all turns (#161). Default: unbounded.', ) + loop_parser.add_argument( + '--continuation-prompt', + default=None, + help=( + 'prompt to submit on turns after the first (#163). Default: None ' + '(loop stops after turn 0). Replaces the deprecated implicit "[turn N]" ' + 'suffix that used to pollute the transcript.' + ), + ) flush_parser = subparsers.add_parser('flush-transcript', help='persist and flush a temporary session transcript') flush_parser.add_argument('prompt') @@ -199,6 +208,7 @@ def main(argv: list[str] | None = None) -> int: max_turns=args.max_turns, structured_output=args.structured_output, timeout_seconds=args.timeout_seconds, + continuation_prompt=args.continuation_prompt, ) for idx, result in enumerate(results, start=1): print(f'## Turn {idx}') diff --git a/src/runtime.py b/src/runtime.py index 0ec97ae..6028271 100644 --- a/src/runtime.py +++ b/src/runtime.py @@ -160,6 +160,7 @@ class PortRuntime: max_turns: int = 3, structured_output: bool = False, timeout_seconds: float | None = None, + continuation_prompt: str | None = None, ) -> list[TurnResult]: """Run a multi-turn engine loop with optional wall-clock deadline. @@ -172,6 +173,15 @@ class PortRuntime: budget is exhausted mid-turn, a synthetic TurnResult with ``stop_reason='timeout'`` is appended and the loop exits. ``None`` (default) preserves legacy unbounded behaviour. + continuation_prompt: What to send on turns after the first. When + ``None`` (default, #163), the loop stops after turn 0 and the + caller decides how to continue. When set, the same text is + submitted for every turn after the first, giving claws a clean + hook for structured follow-ups (e.g. ``"Continue."``, a + routing-planner instruction, or a tool-output cue). Previously + the loop silently appended ``" [turn N]"`` to the original + prompt, polluting the transcript with harness-generated + annotation the model had no way to interpret. Returns: A list of TurnResult objects. The final entry's ``stop_reason`` @@ -182,6 +192,12 @@ class PortRuntime: block the loop indefinitely with no cancellation path, forcing claws to rely on external watchdogs or OS-level kills. Callers can now enforce a deadline and receive a typed timeout signal instead. + + #163: the old ``f'{prompt} [turn {turn + 1}]'`` suffix was never + interpreted by the engine or any system prompt. It looked like a real + user turn in ``mutable_messages`` and the transcript, making replay and + analysis fragile. Removed entirely; callers supply ``continuation_prompt`` + for meaningful follow-ups or let the loop stop after turn 0. """ engine = QueryEnginePort.from_workspace() engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output) @@ -195,7 +211,17 @@ class PortRuntime: executor = ThreadPoolExecutor(max_workers=1) if deadline is not None else None try: for turn in range(max_turns): - turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]' + # #163: no more f'{prompt} [turn N]' suffix injection. + # On turn 0 submit the original prompt. + # On turn > 0, submit the caller-supplied continuation prompt; + # if the caller did not supply one, stop the loop cleanly instead + # of fabricating a fake user turn. + if turn == 0: + turn_prompt = prompt + elif continuation_prompt is not None: + turn_prompt = continuation_prompt + else: + break if deadline is None: # Legacy path: unbounded call, preserves existing behaviour exactly. diff --git a/tests/test_run_turn_loop_continuation.py b/tests/test_run_turn_loop_continuation.py new file mode 100644 index 0000000..43f6bda --- /dev/null +++ b/tests/test_run_turn_loop_continuation.py @@ -0,0 +1,161 @@ +"""Tests for run_turn_loop continuation contract (ROADMAP #163). + +The deprecated ``f'{prompt} [turn N]'`` suffix injection is gone. Verifies: +- No ``[turn N]`` string ever lands in a submitted prompt +- Default (``continuation_prompt=None``) stops the loop after turn 0 +- Explicit ``continuation_prompt`` is submitted verbatim on subsequent turns +- The first turn always gets the original prompt, not the continuation +""" + +from __future__ import annotations + +import subprocess +import sys +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from src.models import UsageSummary # noqa: E402 +from src.query_engine import TurnResult # noqa: E402 +from src.runtime import PortRuntime # noqa: E402 + + +def _completed_result(prompt: str) -> TurnResult: + return TurnResult( + prompt=prompt, + output='ok', + matched_commands=(), + matched_tools=(), + permission_denials=(), + usage=UsageSummary(), + stop_reason='completed', + ) + + +class TestNoTurnSuffixInjection: + """Core acceptance: no prompt submitted to the engine ever contains '[turn N]'.""" + + def test_default_path_submits_original_prompt_only(self) -> None: + runtime = PortRuntime() + submitted: list[str] = [] + + def _capture(prompt, commands, tools, denials): + submitted.append(prompt) + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = _capture + + runtime.run_turn_loop('investigate this bug', max_turns=3) + + # Without continuation_prompt, only turn 0 should run + assert submitted == ['investigate this bug'] + # And no '[turn N]' suffix anywhere + for p in submitted: + assert '[turn' not in p, f'found [turn suffix in submitted prompt: {p!r}' + + def test_with_continuation_prompt_no_turn_suffix(self) -> None: + runtime = PortRuntime() + submitted: list[str] = [] + + def _capture(prompt, commands, tools, denials): + submitted.append(prompt) + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = _capture + + runtime.run_turn_loop( + 'investigate this bug', + max_turns=3, + continuation_prompt='Continue.', + ) + + # Turn 0 = original, turns 1-2 = continuation, verbatim + assert submitted == ['investigate this bug', 'Continue.', 'Continue.'] + # No harness-injected suffix anywhere + for p in submitted: + assert '[turn' not in p + assert not p.endswith(']') + + +class TestContinuationDefaultStopsAfterTurnZero: + def test_default_continuation_returns_one_result(self) -> None: + runtime = PortRuntime() + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = lambda p, *_: _completed_result(p) + + results = runtime.run_turn_loop('x', max_turns=5) + assert len(results) == 1 + assert results[0].prompt == 'x' + + def test_default_continuation_does_not_call_engine_twice(self) -> None: + runtime = PortRuntime() + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = lambda p, *_: _completed_result(p) + + runtime.run_turn_loop('x', max_turns=10) + # Exactly one submit_message call despite max_turns=10 + assert engine.submit_message.call_count == 1 + + +class TestExplicitContinuationBehaviour: + def test_first_turn_always_uses_original_prompt(self) -> None: + runtime = PortRuntime() + captured: list[str] = [] + + def _capture(prompt, *_): + captured.append(prompt) + return _completed_result(prompt) + + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = _capture + + runtime.run_turn_loop( + 'original task', max_turns=2, continuation_prompt='keep going' + ) + + assert captured[0] == 'original task' + assert captured[1] == 'keep going' + + def test_continuation_respects_max_turns(self) -> None: + runtime = PortRuntime() + with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory: + engine = mock_factory.return_value + engine.submit_message.side_effect = lambda p, *_: _completed_result(p) + + runtime.run_turn_loop('x', max_turns=3, continuation_prompt='go') + assert engine.submit_message.call_count == 3 + + +class TestCLIContinuationFlag: + def test_cli_default_runs_one_turn(self) -> None: + """Without --continuation-prompt, CLI should emit exactly '## Turn 1'.""" + result = subprocess.run( + [sys.executable, '-m', 'src.main', 'turn-loop', 'review MCP tool', + '--max-turns', '3', '--structured-output'], + check=True, capture_output=True, text=True, + ) + assert '## Turn 1' in result.stdout + assert '## Turn 2' not in result.stdout + assert '[turn' not in result.stdout + + def test_cli_with_continuation_runs_multiple_turns(self) -> None: + """With --continuation-prompt, CLI should run up to max_turns.""" + result = subprocess.run( + [sys.executable, '-m', 'src.main', 'turn-loop', 'review MCP tool', + '--max-turns', '2', '--structured-output', + '--continuation-prompt', 'continue'], + check=True, capture_output=True, text=True, + ) + assert '## Turn 1' in result.stdout + assert '## Turn 2' in result.stdout + # The continuation text is visible (it's submitted as the turn prompt) + # but no harness-injected [turn N] suffix + assert '[turn' not in result.stdout diff --git a/tests/test_run_turn_loop_timeout.py b/tests/test_run_turn_loop_timeout.py index 011f3f0..8a24dae 100644 --- a/tests/test_run_turn_loop_timeout.py +++ b/tests/test_run_turn_loop_timeout.py @@ -74,7 +74,13 @@ class TestTimeoutAbortsHungTurn: class TestTimeoutBudgetIsTotal: def test_budget_is_cumulative_across_turns(self) -> None: - """timeout_seconds is total wall-clock across all turns, not per-turn.""" + """timeout_seconds is total wall-clock across all turns, not per-turn. + + #163 interaction: multi-turn behaviour now requires an explicit + ``continuation_prompt``; otherwise the loop stops after turn 0 and + the cumulative-budget contract is trivially satisfied. We supply one + here so the test actually exercises the cross-turn deadline. + """ runtime = PortRuntime() call_count = {'n': 0} @@ -91,7 +97,10 @@ class TestTimeoutBudgetIsTotal: # 0.6s budget, 0.4s per turn. First turn completes (~0.4s), # second turn times out before finishing. results = runtime.run_turn_loop( - 'review MCP tool', max_turns=5, timeout_seconds=0.6 + 'review MCP tool', + max_turns=5, + timeout_seconds=0.6, + continuation_prompt='continue', ) elapsed = time.monotonic() - start