mirror of
https://github.com/ultraworkers/claw-code.git
synced 2026-04-24 05:00:25 +08:00
fix: #163 — remove [turn N] suffix pollution from run_turn_loop; file #164 timeout-cancellation followup
#163: run_turn_loop no longer injects f'{prompt} [turn N]' into follow-up prompts. The suffix was never defined or interpreted anywhere — not by the engine, not by the system prompt, not by any LLM. It looked like a real user-typed annotation in the transcript and made replay/analysis fragile. New behaviour: - turn 0 submits the original prompt (unchanged) - turn > 0 submits caller-supplied continuation_prompt if provided, else the loop stops cleanly — no fabricated user turn - added continuation_prompt: str | None = None parameter to run_turn_loop - added --continuation-prompt CLI flag for claws scripting multi-turn loops - zero '[turn' strings ever appear in mutable_messages or stdout now Behaviour change for existing callers: - Before: run_turn_loop(prompt, max_turns=3) submitted 3 turns ('prompt', 'prompt [turn 2]', 'prompt [turn 3]') - After: run_turn_loop(prompt, max_turns=3) submits 1 turn ('prompt') - To preserve old multi-turn behaviour, pass continuation_prompt='Continue.' or any structured follow-up text One existing timeout test (test_budget_is_cumulative_across_turns) updated to pass continuation_prompt so the cumulative-budget contract is actually exercised across turns instead of trivially satisfied by a one-turn loop. #164 filed: addresses reviewer feedback on #161. The wall-clock timeout bounds the caller-facing wait, but the underlying submit_message worker thread keeps running and can mutate engine state after the timeout TurnResult is returned. A cooperative cancel_event pattern is sketched in the pinpoint; real asyncio.Task.cancel() support will come once provider IO is async-native (larger refactor). Tests (tests/test_run_turn_loop_continuation.py, 8 tests): - TestNoTurnSuffixInjection (2): zero '[turn' strings in any submitted prompt, both default and explicit-continuation paths - TestContinuationDefaultStopsAfterTurnZero (2): default loops run exactly one turn; engine.submit_message called exactly once despite max_turns=10 - TestExplicitContinuationBehaviour (2): turn 0 = original, turn N = continuation verbatim; max_turns still respected - TestCLIContinuationFlag (2): CLI default emits only '## Turn 1'; --continuation-prompt wires through to multi-turn behaviour Full suite: 67/67 passing. Closes ROADMAP #163. Files #164.
This commit is contained in:
parent
4813a2b351
commit
79a9f0e6f6
64
ROADMAP.md
64
ROADMAP.md
@ -6230,3 +6230,67 @@ Remove the `[turn N]` suffix entirely. Add an optional `continuation_prompt: str
|
||||
**Blocker.** None.
|
||||
|
||||
**Source.** Jobdori dogfood sweep 2026-04-22 10:06 KST — read `src/runtime.py:154-168`, reproduced the `[turn N]` suffix injection pattern, confirmed no system-prompt or engine-side interpretation of the suffix exists.
|
||||
|
||||
## Pinpoint #164. `run_turn_loop` timeout returns control to caller but does not cancel the underlying `submit_message` work — wedged provider threads leak past the deadline
|
||||
|
||||
**Gap.** The #161 fix bounds the caller-facing wait on `PortRuntime.run_turn_loop` via `ThreadPoolExecutor.submit(...).result(timeout=...)`, but `ThreadPoolExecutor.shutdown(wait=False)` does not actually cancel a thread already running `engine.submit_message`. Python's threading model does not expose safe cooperative cancellation of arbitrary blocking calls (no `pthread_cancel`-equivalent for user code), so once a turn wedges on a slow provider the thread keeps running in the background after `run_turn_loop` returns. Concretely:
|
||||
|
||||
1. **Caller receives `TurnResult(stop_reason='timeout')` on time** — the caller-facing deadline works correctly (confirmed by 6 tests in `tests/test_run_turn_loop_timeout.py`).
|
||||
2. **But the worker thread is still executing `engine.submit_message`** — it will complete (or not) whenever the underlying `_format_output` / projected_usage computation returns, mutating the engine's `mutable_messages`, `transcript_store`, `total_usage` at an unpredictable later time.
|
||||
3. **If the caller reuses the same engine** (e.g., a long-lived CLI session or orchestration harness that pools engines), those deferred mutations land silently on top of fresh turns, corrupting the session in a way that `stop_reason` cannot signal.
|
||||
4. **If the caller spawns many turn loops in parallel**, leaked threads accumulate and the process memory/file-handle footprint grows without bound.
|
||||
|
||||
**Repro (conceptual).**
|
||||
```python
|
||||
import time
|
||||
from src.runtime import PortRuntime
|
||||
from src.query_engine import QueryEnginePort
|
||||
from unittest.mock import patch
|
||||
|
||||
slow_calls = []
|
||||
|
||||
def hang_and_mutate(self, prompt, *args, **kwargs):
|
||||
# Simulates a slow provider that eventually returns and mutates engine state.
|
||||
time.sleep(2.0)
|
||||
self.mutable_messages.append(f'LATE: {prompt}') # silent mutation after timeout
|
||||
slow_calls.append(prompt)
|
||||
return None # irrelevant, caller has already given up
|
||||
|
||||
with patch.object(QueryEnginePort, 'submit_message', hang_and_mutate):
|
||||
runtime = PortRuntime()
|
||||
# Timeout fires at 0.2s, caller gets synthetic timeout result
|
||||
results = runtime.run_turn_loop('x', timeout_seconds=0.2)
|
||||
assert results[-1].stop_reason == 'timeout'
|
||||
# But 2 seconds later the background thread still mutates the engine
|
||||
time.sleep(2.5)
|
||||
assert slow_calls == ['x'] # the "cancelled" turn actually ran to completion
|
||||
```
|
||||
|
||||
**Impact on claws.**
|
||||
- Orchestration harnesses cannot safely reuse `QueryEnginePort` instances across timeouts. Every timeout implicitly requires discarding the engine, which breaks session continuity.
|
||||
- Hung threads leak across long-running claw processes (daemon-mode claws, CI workers, cron harnesses). Resource bounds are the OS's problem, not the harness's.
|
||||
- "Timeout fired, session is clean" is not actually true — `TurnResult(stop_reason='timeout')` only means "the caller got control back in time", not "the turn was cancelled".
|
||||
|
||||
**Root cause.** Two layers:
|
||||
1. `PortRuntime.run_turn_loop` uses `executor.shutdown(wait=False)` which lets the interpreter reap the thread eventually but does not signal cancellation to the running code.
|
||||
2. `QueryEnginePort.submit_message` has no cooperative cancellation hook — no `cancel_event: threading.Event | None = None` parameter, no periodic check inside `_format_output` or the projected-usage computation, no abortable IO wrapper around any future provider calls. Even if the runtime layer wanted to ask the turn to stop, there is no receiver.
|
||||
|
||||
**Fix shape (~30 lines, two-stage).**
|
||||
|
||||
*Stage A — runtime layer (claws benefit immediately).*
|
||||
1. Introduce a `threading.Event` as `cancel_event`. Pass it into `engine.submit_message` via a new optional parameter.
|
||||
2. On timeout in `run_turn_loop`, set `cancel_event` before returning the synthetic timeout result so any check inside the engine can observe it.
|
||||
3. Ensure the worker thread is marked as a daemon (`ThreadPoolExecutor(max_workers=1, thread_name_prefix='claw-turn-cancellable')` — daemon=True is not directly configurable on stdlib Executor, but we can switch to `threading.Thread(daemon=True)` for the single-worker case).
|
||||
|
||||
*Stage B — engine layer (makes Stage A effective).*
|
||||
4. `submit_message` accepts `cancel_event: threading.Event | None = None` and checks `cancel_event.is_set()` at safe cancellation points: before `_format_output`, before each mutation, before `compact_messages_if_needed`. If set, raise a `TurnCancelled` exception (or return an early `TurnResult(stop_reason='cancelled')` — exception is cleaner because it propagates through the Future).
|
||||
5. Any future network/provider call paths wrap their blocking IO in a loop that checks `cancel_event` between retries / chunks, or uses `socket.settimeout` / `httpx.AsyncClient` with a cancellation token.
|
||||
|
||||
*Stage C — contract.*
|
||||
6. Document that `stop_reason='timeout'` now means "the turn was asked to cancel and had a fair chance to observe it". Threads that ignore cancellation (e.g., pure-CPU loops with no check) can still leak, but cooperative paths clean up.
|
||||
|
||||
**Acceptance.** After `run_turn_loop(..., timeout_seconds=0.2)` returns a timeout result, within a bounded grace window (say 100ms) the underlying worker thread has either finished cooperatively or acknowledged the cancel event. `engine.mutable_messages` does not grow after the timeout TurnResult is returned. A reused engine can safely accept a fresh `submit_message` call without inheriting deferred mutations from the cancelled turn.
|
||||
|
||||
**Blocker.** Python threading does not expose preemptive cancellation, so purely CPU-bound stalls inside `_format_output` or provider client libraries cannot be force-killed. The fix makes cancellation *cooperative*, not *guaranteed*. Eventually the engine will need an `asyncio`-native path with `asyncio.Task.cancel()` for real provider IO, but that is a larger refactor.
|
||||
|
||||
**Source.** Jobdori dogfood sweep 2026-04-22 17:36 KST — filed while landing #162, following review feedback on #161 that pointed out the caller-facing timeout and underlying work-cancellation are two different problems. #161 closed the first; #164 is the second.
|
||||
|
||||
10
src/main.py
10
src/main.py
@ -71,6 +71,15 @@ def build_parser() -> argparse.ArgumentParser:
|
||||
default=None,
|
||||
help='total wall-clock budget across all turns (#161). Default: unbounded.',
|
||||
)
|
||||
loop_parser.add_argument(
|
||||
'--continuation-prompt',
|
||||
default=None,
|
||||
help=(
|
||||
'prompt to submit on turns after the first (#163). Default: None '
|
||||
'(loop stops after turn 0). Replaces the deprecated implicit "[turn N]" '
|
||||
'suffix that used to pollute the transcript.'
|
||||
),
|
||||
)
|
||||
|
||||
flush_parser = subparsers.add_parser('flush-transcript', help='persist and flush a temporary session transcript')
|
||||
flush_parser.add_argument('prompt')
|
||||
@ -199,6 +208,7 @@ def main(argv: list[str] | None = None) -> int:
|
||||
max_turns=args.max_turns,
|
||||
structured_output=args.structured_output,
|
||||
timeout_seconds=args.timeout_seconds,
|
||||
continuation_prompt=args.continuation_prompt,
|
||||
)
|
||||
for idx, result in enumerate(results, start=1):
|
||||
print(f'## Turn {idx}')
|
||||
|
||||
@ -160,6 +160,7 @@ class PortRuntime:
|
||||
max_turns: int = 3,
|
||||
structured_output: bool = False,
|
||||
timeout_seconds: float | None = None,
|
||||
continuation_prompt: str | None = None,
|
||||
) -> list[TurnResult]:
|
||||
"""Run a multi-turn engine loop with optional wall-clock deadline.
|
||||
|
||||
@ -172,6 +173,15 @@ class PortRuntime:
|
||||
budget is exhausted mid-turn, a synthetic TurnResult with
|
||||
``stop_reason='timeout'`` is appended and the loop exits.
|
||||
``None`` (default) preserves legacy unbounded behaviour.
|
||||
continuation_prompt: What to send on turns after the first. When
|
||||
``None`` (default, #163), the loop stops after turn 0 and the
|
||||
caller decides how to continue. When set, the same text is
|
||||
submitted for every turn after the first, giving claws a clean
|
||||
hook for structured follow-ups (e.g. ``"Continue."``, a
|
||||
routing-planner instruction, or a tool-output cue). Previously
|
||||
the loop silently appended ``" [turn N]"`` to the original
|
||||
prompt, polluting the transcript with harness-generated
|
||||
annotation the model had no way to interpret.
|
||||
|
||||
Returns:
|
||||
A list of TurnResult objects. The final entry's ``stop_reason``
|
||||
@ -182,6 +192,12 @@ class PortRuntime:
|
||||
block the loop indefinitely with no cancellation path, forcing claws to
|
||||
rely on external watchdogs or OS-level kills. Callers can now enforce a
|
||||
deadline and receive a typed timeout signal instead.
|
||||
|
||||
#163: the old ``f'{prompt} [turn {turn + 1}]'`` suffix was never
|
||||
interpreted by the engine or any system prompt. It looked like a real
|
||||
user turn in ``mutable_messages`` and the transcript, making replay and
|
||||
analysis fragile. Removed entirely; callers supply ``continuation_prompt``
|
||||
for meaningful follow-ups or let the loop stop after turn 0.
|
||||
"""
|
||||
engine = QueryEnginePort.from_workspace()
|
||||
engine.config = QueryEngineConfig(max_turns=max_turns, structured_output=structured_output)
|
||||
@ -195,7 +211,17 @@ class PortRuntime:
|
||||
executor = ThreadPoolExecutor(max_workers=1) if deadline is not None else None
|
||||
try:
|
||||
for turn in range(max_turns):
|
||||
turn_prompt = prompt if turn == 0 else f'{prompt} [turn {turn + 1}]'
|
||||
# #163: no more f'{prompt} [turn N]' suffix injection.
|
||||
# On turn 0 submit the original prompt.
|
||||
# On turn > 0, submit the caller-supplied continuation prompt;
|
||||
# if the caller did not supply one, stop the loop cleanly instead
|
||||
# of fabricating a fake user turn.
|
||||
if turn == 0:
|
||||
turn_prompt = prompt
|
||||
elif continuation_prompt is not None:
|
||||
turn_prompt = continuation_prompt
|
||||
else:
|
||||
break
|
||||
|
||||
if deadline is None:
|
||||
# Legacy path: unbounded call, preserves existing behaviour exactly.
|
||||
|
||||
161
tests/test_run_turn_loop_continuation.py
Normal file
161
tests/test_run_turn_loop_continuation.py
Normal file
@ -0,0 +1,161 @@
|
||||
"""Tests for run_turn_loop continuation contract (ROADMAP #163).
|
||||
|
||||
The deprecated ``f'{prompt} [turn N]'`` suffix injection is gone. Verifies:
|
||||
- No ``[turn N]`` string ever lands in a submitted prompt
|
||||
- Default (``continuation_prompt=None``) stops the loop after turn 0
|
||||
- Explicit ``continuation_prompt`` is submitted verbatim on subsequent turns
|
||||
- The first turn always gets the original prompt, not the continuation
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
|
||||
from src.models import UsageSummary # noqa: E402
|
||||
from src.query_engine import TurnResult # noqa: E402
|
||||
from src.runtime import PortRuntime # noqa: E402
|
||||
|
||||
|
||||
def _completed_result(prompt: str) -> TurnResult:
|
||||
return TurnResult(
|
||||
prompt=prompt,
|
||||
output='ok',
|
||||
matched_commands=(),
|
||||
matched_tools=(),
|
||||
permission_denials=(),
|
||||
usage=UsageSummary(),
|
||||
stop_reason='completed',
|
||||
)
|
||||
|
||||
|
||||
class TestNoTurnSuffixInjection:
|
||||
"""Core acceptance: no prompt submitted to the engine ever contains '[turn N]'."""
|
||||
|
||||
def test_default_path_submits_original_prompt_only(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
submitted: list[str] = []
|
||||
|
||||
def _capture(prompt, commands, tools, denials):
|
||||
submitted.append(prompt)
|
||||
return _completed_result(prompt)
|
||||
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = _capture
|
||||
|
||||
runtime.run_turn_loop('investigate this bug', max_turns=3)
|
||||
|
||||
# Without continuation_prompt, only turn 0 should run
|
||||
assert submitted == ['investigate this bug']
|
||||
# And no '[turn N]' suffix anywhere
|
||||
for p in submitted:
|
||||
assert '[turn' not in p, f'found [turn suffix in submitted prompt: {p!r}'
|
||||
|
||||
def test_with_continuation_prompt_no_turn_suffix(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
submitted: list[str] = []
|
||||
|
||||
def _capture(prompt, commands, tools, denials):
|
||||
submitted.append(prompt)
|
||||
return _completed_result(prompt)
|
||||
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = _capture
|
||||
|
||||
runtime.run_turn_loop(
|
||||
'investigate this bug',
|
||||
max_turns=3,
|
||||
continuation_prompt='Continue.',
|
||||
)
|
||||
|
||||
# Turn 0 = original, turns 1-2 = continuation, verbatim
|
||||
assert submitted == ['investigate this bug', 'Continue.', 'Continue.']
|
||||
# No harness-injected suffix anywhere
|
||||
for p in submitted:
|
||||
assert '[turn' not in p
|
||||
assert not p.endswith(']')
|
||||
|
||||
|
||||
class TestContinuationDefaultStopsAfterTurnZero:
|
||||
def test_default_continuation_returns_one_result(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = lambda p, *_: _completed_result(p)
|
||||
|
||||
results = runtime.run_turn_loop('x', max_turns=5)
|
||||
assert len(results) == 1
|
||||
assert results[0].prompt == 'x'
|
||||
|
||||
def test_default_continuation_does_not_call_engine_twice(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = lambda p, *_: _completed_result(p)
|
||||
|
||||
runtime.run_turn_loop('x', max_turns=10)
|
||||
# Exactly one submit_message call despite max_turns=10
|
||||
assert engine.submit_message.call_count == 1
|
||||
|
||||
|
||||
class TestExplicitContinuationBehaviour:
|
||||
def test_first_turn_always_uses_original_prompt(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
captured: list[str] = []
|
||||
|
||||
def _capture(prompt, *_):
|
||||
captured.append(prompt)
|
||||
return _completed_result(prompt)
|
||||
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = _capture
|
||||
|
||||
runtime.run_turn_loop(
|
||||
'original task', max_turns=2, continuation_prompt='keep going'
|
||||
)
|
||||
|
||||
assert captured[0] == 'original task'
|
||||
assert captured[1] == 'keep going'
|
||||
|
||||
def test_continuation_respects_max_turns(self) -> None:
|
||||
runtime = PortRuntime()
|
||||
with patch('src.runtime.QueryEnginePort.from_workspace') as mock_factory:
|
||||
engine = mock_factory.return_value
|
||||
engine.submit_message.side_effect = lambda p, *_: _completed_result(p)
|
||||
|
||||
runtime.run_turn_loop('x', max_turns=3, continuation_prompt='go')
|
||||
assert engine.submit_message.call_count == 3
|
||||
|
||||
|
||||
class TestCLIContinuationFlag:
|
||||
def test_cli_default_runs_one_turn(self) -> None:
|
||||
"""Without --continuation-prompt, CLI should emit exactly '## Turn 1'."""
|
||||
result = subprocess.run(
|
||||
[sys.executable, '-m', 'src.main', 'turn-loop', 'review MCP tool',
|
||||
'--max-turns', '3', '--structured-output'],
|
||||
check=True, capture_output=True, text=True,
|
||||
)
|
||||
assert '## Turn 1' in result.stdout
|
||||
assert '## Turn 2' not in result.stdout
|
||||
assert '[turn' not in result.stdout
|
||||
|
||||
def test_cli_with_continuation_runs_multiple_turns(self) -> None:
|
||||
"""With --continuation-prompt, CLI should run up to max_turns."""
|
||||
result = subprocess.run(
|
||||
[sys.executable, '-m', 'src.main', 'turn-loop', 'review MCP tool',
|
||||
'--max-turns', '2', '--structured-output',
|
||||
'--continuation-prompt', 'continue'],
|
||||
check=True, capture_output=True, text=True,
|
||||
)
|
||||
assert '## Turn 1' in result.stdout
|
||||
assert '## Turn 2' in result.stdout
|
||||
# The continuation text is visible (it's submitted as the turn prompt)
|
||||
# but no harness-injected [turn N] suffix
|
||||
assert '[turn' not in result.stdout
|
||||
@ -74,7 +74,13 @@ class TestTimeoutAbortsHungTurn:
|
||||
|
||||
class TestTimeoutBudgetIsTotal:
|
||||
def test_budget_is_cumulative_across_turns(self) -> None:
|
||||
"""timeout_seconds is total wall-clock across all turns, not per-turn."""
|
||||
"""timeout_seconds is total wall-clock across all turns, not per-turn.
|
||||
|
||||
#163 interaction: multi-turn behaviour now requires an explicit
|
||||
``continuation_prompt``; otherwise the loop stops after turn 0 and
|
||||
the cumulative-budget contract is trivially satisfied. We supply one
|
||||
here so the test actually exercises the cross-turn deadline.
|
||||
"""
|
||||
runtime = PortRuntime()
|
||||
call_count = {'n': 0}
|
||||
|
||||
@ -91,7 +97,10 @@ class TestTimeoutBudgetIsTotal:
|
||||
# 0.6s budget, 0.4s per turn. First turn completes (~0.4s),
|
||||
# second turn times out before finishing.
|
||||
results = runtime.run_turn_loop(
|
||||
'review MCP tool', max_turns=5, timeout_seconds=0.6
|
||||
'review MCP tool',
|
||||
max_turns=5,
|
||||
timeout_seconds=0.6,
|
||||
continuation_prompt='continue',
|
||||
)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user