Office
Autonomous AI Agent Background Jobs: A Self-Healing bgmon Workflow for Codex
Learn how to run autonomous AI agent workflows as durable background jobs that survive chat disconnects. Full code for bgmon, queue orchestration, auto-heal sub-agents, and operational safeguards.
Published: 2026-02-18 · Last updated: 2026-02-18
The exact problem this page solves is not model quality. It is workflow fragility.
Most teams lose progress when terminal sessions drop, browser tabs refresh, SSH gets interrupted, or long commands are tied to one chat context. The model may be fine. The logic may be fine. The plan may be fine. But execution still dies because work is coupled to a fragile interactive shell.
If you are building serious AI operations, that coupling is operational debt.
This article is a full implementation guide for moving from fragile chat-driven execution to resilient autonomous operations using:
bgmonfor detached process lifecycle- a work-order queue for dependency-aware step execution
- a completion sub-agent for audit and auto-repair
- durable job state, durable logs, and durable heartbeat files
The core goal is simple:
chat can disconnect, and the work continues safely.
The second goal is equally important:
when you reconnect, truth is cheap.
No archaeology. No guessing. No “I think it was still running.” You can inspect status, logs, state, and repair history in seconds.
Why this title and URL slug
You asked for a URL tag aligned with pages already getting impressions in Search Console. Your current winning pattern is concise, intent-forward slugs and practical topics:
That tells us two things:
- Search visibility is strongest when the page name is short and concrete.
- The audience intent is practical operations, not generic AI commentary.
So this page is tuned around the keyword cluster:
- primary:
autonomous ai agents - primary:
ai background jobs - secondary:
ai monitoring - secondary:
operational risk management tools - secondary:
system prompting tools
And the slug is intentionally concise and intent-matching:
/office/notes/autonomous-ai-agent-background-jobs/
That slug is human-readable, keyword-clear, and consistent with your current top performers.
What this page gives you
By the end of this guide, you will have:
- A detached background job manager (
bgmon) with stale PID detection. - A deterministic queue engine that advances steps only when dependencies pass.
- A sub-agent monitor that audits completed steps and triggers repair loops.
- A clean set of verify commands for fast operator confidence.
- A governance model so autonomy does not drift into chaos.
This is not a toy script collection. It is a reusable execution backbone for long AI workflows.
Control Surface (Pill View)
Architecture Overview
ta_forever_engine_01 (bgmon-managed supervisor)
-> tools/work_order_engine.py
-> ta_step_01 (oneshot strategy run)
-> ta_step_02 (oneshot strategy run)
-> ta_step_03 (oneshot strategy run)
completion_subagent_01 (bgmon-managed monitor)
-> checks RUNNING -> DEAD/STALE transitions
-> validates completion markers and artifacts
-> triggers repair command on failure
-> writes heartbeat and repair notes
state + truth layer
-> ~/.bgmon/jobs/*.json
-> ~/.bgmon/logs/*.log
-> memory/work_order_state_*.json
-> /tmp/*heartbeat*.json
This architecture separates control plane from workload plane.
Control plane:
- process lifecycle
- queue advancement
- health monitoring
- repair logic
Workload plane:
- crawlers
- doc stance runs
- optimizations
- report generation
When these are separated and persisted, chat cutoffs become a nuisance, not an outage.
1) Implement bgmon (detached lifecycle with stale PID protection)
Create tools/bgmon.py:
#!/usr/bin/env python3
import argparse
import json
import os
import signal
import subprocess
import time
from pathlib import Path
ROOT = Path(os.environ.get("BGMON_DIR", str(Path.home() / ".bgmon")))
JOBS = ROOT / "jobs"
LOGS = ROOT / "logs"
JOBS.mkdir(parents=True, exist_ok=True)
LOGS.mkdir(parents=True, exist_ok=True)
def jpath(name: str) -> Path:
return JOBS / f"{name}.json"
def now_ts() -> str:
return time.strftime("%Y%m%dT%H%M%SZ", time.gmtime())
def proc_starttime_jiffies(pid: int):
p = Path(f"/proc/{pid}/stat")
if not p.exists():
return None
try:
parts = p.read_text().strip().split()
if len(parts) < 22:
return None
return parts[21]
except Exception:
return None
def read_job(name: str):
p = jpath(name)
if not p.exists():
return None
return json.loads(p.read_text())
def classify(pid: int, expected_starttime: str):
if pid <= 0:
return "DEAD"
try:
os.kill(pid, 0)
except OSError:
return "DEAD"
cur = proc_starttime_jiffies(pid)
if expected_starttime and cur and cur != expected_starttime:
return "STALE"
return "RUNNING"
def cmd_start(args):
if not args.cmd:
print("missing command")
return 2
cmd = args.cmd[1:] if args.cmd[:1] == ["--"] else args.cmd
name = args.name
log = LOGS / f"{name}.{now_ts()}.log"
with open(log, "ab", buffering=0) as f:
p = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=f,
stderr=subprocess.STDOUT,
start_new_session=True,
close_fds=True,
)
rec = {
"name": name,
"pid": p.pid,
"cmd": " ".join(cmd),
"log": str(log),
"started_at_unix": int(time.time()),
"pid_starttime_jiffies": proc_starttime_jiffies(p.pid),
}
jpath(name).write_text(json.dumps(rec, indent=2))
print(f"started name={name} pid={p.pid}")
print(f"log={log}")
return 0
def cmd_status(args):
names = [args.name] if args.name else sorted([p.stem for p in JOBS.glob("*.json")])
print(f"state_dir={ROOT}")
print("NAME STATUS PID CMD")
for n in names:
rec = read_job(n)
if not rec:
print(f"{n:<30} {'MISSING':<8} {'':<8} ")
continue
pid = int(rec.get("pid", 0) or 0)
st = classify(pid, rec.get("pid_starttime_jiffies"))
print(f"{n:<30} {st:<8} {pid:<8} {rec.get('cmd','')}")
return 0
def cmd_tail(args):
rec = read_job(args.name)
if not rec:
print(f"missing job: {args.name}")
return 2
subprocess.run(["tail", "-n", str(args.n), rec["log"]], check=False)
return 0
def cmd_stop(args):
rec = read_job(args.name)
if not rec:
print(f"missing job: {args.name}")
return 2
pid = int(rec.get("pid", 0) or 0)
if pid > 0:
try:
os.killpg(pid, signal.SIGTERM)
print(f"stopped name={args.name} pid={pid}")
return 0
except Exception as e:
print(f"stop error: {e}")
return 1
return 0
def main():
ap = argparse.ArgumentParser()
sp = ap.add_subparsers(dest="sub", required=True)
p = sp.add_parser("start")
p.add_argument("name")
p.add_argument("cmd", nargs=argparse.REMAINDER)
p = sp.add_parser("status")
p.add_argument("name", nargs="?")
p = sp.add_parser("tail")
p.add_argument("name")
p.add_argument("-n", type=int, default=120)
p = sp.add_parser("stop")
p.add_argument("name")
args = ap.parse_args()
if args.sub == "start":
return cmd_start(args)
if args.sub == "status":
return cmd_status(args)
if args.sub == "tail":
return cmd_tail(args)
if args.sub == "stop":
return cmd_stop(args)
return 1
if __name__ == "__main__":
raise SystemExit(main())
Why this implementation matters
start_new_session=True is the key to surviving terminal disconnects. Output redirection into durable log files is the key to observability after reconnect. PID starttime checks are the key to avoiding stale PID signaling mistakes.
Without all three, teams eventually hit bad failure states.
2) Implement a deterministic work-order engine
Create tools/work_order_engine.py:
#!/usr/bin/env python3
import argparse
import json
import subprocess
import time
from pathlib import Path
def run(cmd):
p = subprocess.run(cmd, text=True, capture_output=True)
return p.returncode, (p.stdout or "") + (p.stderr or "")
def bg_status(job):
rc, out = run(["python3", "tools/bgmon.py", "status", job])
if rc != 0:
return "MISSING"
for ln in out.splitlines():
if ln.startswith(job + " "):
return ln.split()[1]
return "MISSING"
def bg_start(job, shell_cmd):
return run(["python3", "tools/bgmon.py", "start", job, "--", "bash", "-lc", shell_cmd])
def bg_tail(job, n=300):
rc, out = run(["python3", "tools/bgmon.py", "tail", job, "-n", str(n)])
return out if rc == 0 else ""
def deps_ok(step, state):
for dep in step.get("depends_on", []):
if state["steps"].get(dep, {}).get("status") != "succeeded":
return False
return True
def check_markers(step):
text = bg_tail(step["job_name"], 320)
success = all(m in text for m in step.get("success_markers", []))
failed = any(m.lower() in text.lower() for m in step.get("failure_markers", []))
if success and not failed:
return "succeeded", "markers pass"
if failed:
return "failed", "failure marker found"
return "failed", "missing success marker"
def tick(order, state):
now = int(time.time())
for step in order["steps"]:
sid = step["id"]
rec = state["steps"].setdefault(sid, {"status": "blocked", "attempts": 0, "history": []})
if rec["status"] == "retry_wait" and now >= int(rec.get("retry_after", 0)):
rec["status"] = "blocked"
if rec["status"] in ("succeeded", "failed", "running", "retry_wait"):
continue
if not deps_ok(step, state):
rec["status"] = "blocked"
continue
st = bg_status(step["job_name"])
if st == "RUNNING":
rec["status"] = "running"
continue
if rec["attempts"] == 0:
rc, out = bg_start(step["job_name"], step["start_shell"])
rec["attempts"] += 1
rec["status"] = "running" if rc == 0 else "failed"
rec["history"].append({"ts": now, "event": "launch", "rc": rc, "out": out[-600:]})
continue
status, note = check_markers(step)
if status == "succeeded":
rec["status"] = "succeeded"
rec["history"].append({"ts": now, "event": "close", "note": note})
else:
if rec["attempts"] < int(step.get("max_retries", 1)):
rec["attempts"] += 1
rec["status"] = "retry_wait"
rec["retry_after"] = now + int(step.get("backoff_sec", 90))
rec["history"].append({"ts": now, "event": "retry_wait", "note": note})
else:
rec["status"] = "failed"
rec["history"].append({"ts": now, "event": "close", "note": note})
# activate exactly one blocked step per tick
runnable = []
for step in order["steps"]:
sid = step["id"]
rec = state["steps"].get(sid, {})
if rec.get("status") == "blocked" and deps_ok(step, state):
runnable.append((sid, step))
if runnable:
sid, step = runnable[0]
rec = state["steps"][sid]
rc, out = bg_start(step["job_name"], step["start_shell"])
rec["status"] = "running" if rc == 0 else "failed"
rec["attempts"] += 1
rec["history"].append({"ts": int(time.time()), "event": "launch", "rc": rc, "out": out[-600:]})
return state
def done(state):
vals = [s.get("status") for s in state["steps"].values()]
return bool(vals) and all(v in ("succeeded", "failed") for v in vals)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--work-order", required=True)
ap.add_argument("--state", required=True)
ap.add_argument("--once", action="store_true")
ap.add_argument("--interval-sec", type=int, default=20)
args = ap.parse_args()
order = json.loads(Path(args.work_order).read_text())
state_path = Path(args.state)
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {"work_order_id": order["work_order_id"], "steps": {}}
def flush():
state_path.write_text(json.dumps(state, indent=2))
state = tick(order, state)
flush()
print(json.dumps({"event": "work_order_tick", "work_order_id": order["work_order_id"], "done": done(state)}))
if args.once:
return 0
while True:
time.sleep(max(5, args.interval_sec))
state = tick(order, state)
flush()
print(json.dumps({"event": "work_order_tick", "work_order_id": order["work_order_id"], "done": done(state)}))
if __name__ == "__main__":
raise SystemExit(main())
Why this engine design works
The engine does not need a huge framework. It only needs deterministic behavior:
- explicit dependency checks
- explicit success/failure markers
- explicit retries with backoff
- explicit durable state updates
That is enough to eliminate most “silent fail” modes.
3) Implement the completion sub-agent (audit + repair)
Create tools/ta_completion_subagent.py:
#!/usr/bin/env python3
import argparse
import json
import subprocess
import time
from pathlib import Path
ENGINE_JOB = "ta_forever_engine_01"
ENGINE_LOOP_CMD = (
"cd /root/feb4; export PYTHONUNBUFFERED=1; "
"while true; do "
"python3 tools/work_order_engine.py "
"--work-order memory/work_orders/ta_forever_v1.json "
"--state memory/work_order_state_ta_forever_v1.json --once; "
"sleep 20; "
"done"
)
TRACKED_JOBS = ["ta_step_01", "ta_step_02", "ta_step_03"]
def run(cmd):
p = subprocess.run(cmd, text=True, capture_output=True)
return p.returncode, (p.stdout or "") + (p.stderr or "")
def bg_status(job):
rc, out = run(["python3", "tools/bgmon.py", "status", job])
if rc != 0:
return "MISSING"
for ln in out.splitlines():
if ln.startswith(job + " "):
return ln.split()[1]
return "MISSING"
def ensure_engine_running():
if bg_status(ENGINE_JOB) != "RUNNING":
run(["python3", "tools/bgmon.py", "start", ENGINE_JOB, "--", "bash", "-lc", ENGINE_LOOP_CMD])
def tail(job, n=320):
rc, out = run(["python3", "tools/bgmon.py", "tail", job, "-n", str(n)])
return out if rc == 0 else ""
def audit_job(job):
txt = tail(job)
marker_ok = '"phase":"ta_symbol_complete"' in txt
if marker_ok:
return True, "completion marker present"
return False, "completion marker missing"
def repair_tick():
return run([
"python3", "tools/work_order_engine.py",
"--work-order", "memory/work_orders/ta_forever_v1.json",
"--state", "memory/work_order_state_ta_forever_v1.json",
"--once",
])
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--interval-sec", type=int, default=1200)
ap.add_argument("--state-file", default="/tmp/ta_completion_subagent_state.json")
ap.add_argument("--heartbeat-file", default="/tmp/ta_completion_subagent_heartbeat.json")
args = ap.parse_args()
state_path = Path(args.state_file)
heartbeat_path = Path(args.heartbeat_file)
if state_path.exists():
state = json.loads(state_path.read_text())
else:
state = {"jobs": {}, "repairs": []}
while True:
ensure_engine_running()
beat = {"ts": int(time.time()), "jobs": {}, "repairs": []}
for job in TRACKED_JOBS:
cur = bg_status(job)
prev = state["jobs"].get(job, {}).get("status")
row = {"status": cur}
if prev == "RUNNING" and cur in ("DEAD", "STALE", "MISSING"):
ok, note = audit_job(job)
row["audit_ok"] = ok
row["audit_note"] = note
if not ok:
rc, out = repair_tick()
row["repair_triggered"] = True
row["repair_ok"] = (rc == 0)
row["repair_note"] = out[-500:]
beat["repairs"].append({"job": job, "rc": rc})
state["jobs"][job] = row
beat["jobs"][job] = row
state_path.write_text(json.dumps(state, indent=2))
heartbeat_path.write_text(json.dumps(beat, indent=2))
print(json.dumps({"phase": "completion_subagent_tick", "beat": beat}))
time.sleep(max(60, args.interval_sec))
if __name__ == "__main__":
raise SystemExit(main())
Why this is critical
The sub-agent is the bridge from passive monitoring to active reliability. It gives the system agency: detect failure, validate outputs, repair automatically, and document what happened.
Without that loop, you still have detached jobs, but you do not have durable operations.
4) Strategy step wrapper (explicit completion contract)
Create tools/ta_symbol_pipeline.sh:
#!/usr/bin/env bash
set -euo pipefail
SYMBOL="${1:?symbol required}"
RUN_DIR="${2:-/mnt/volume_sfo3_01/feb4_runs/c180_tuned_01}"
OUT_ROOT="/mnt/volume_sfo3_01/feb4_memory/ta_${SYMBOL,,}_$(date -u +%Y%m%d_%H%M%S)"
python3 tools/molleo_setup_opt.py \
--run-dir "${RUN_DIR}" \
--symbols "${SYMBOL}" \
--tf H4 \
--bars 2200 \
--trials 420 \
--seed 77 \
--objective-profile equity_driver \
--eval-mode engine \
--parallel-trials 4 \
--fixed-signal-source direct_docs \
--exit-governance ta_percent \
--regime-influence direction_bias \
--force-ta-entry-mode ma_sr_pivot \
--asset-leverage-map /root/feb4/config/leverage/fx_asset_leverage_v1.json \
--default-asset-leverage 30.0 \
--out-root "${OUT_ROOT}"
# explicit completion marker for queue audit
printf '{"phase":"ta_symbol_complete","symbol":"%s","out_root":"%s","status":"ok"}\n' "${SYMBOL}" "${OUT_ROOT}"
Make it executable:
chmod +x tools/ta_symbol_pipeline.sh
5) Work order spec (agent-readable, machine-executable)
Create memory/work_orders/ta_forever_v1.json:
{
"work_order_id": "ta_forever_v1",
"description": "Autonomous TA optimization chain with retries and marker audits",
"steps": [
{
"id": "s1_usdjpy",
"kind": "bgmon",
"mode": "oneshot",
"job_name": "ta_step_01",
"start_shell": "cd /root/feb4; bash tools/ta_symbol_pipeline.sh USDJPY",
"success_markers": ["\"phase\":\"ta_symbol_complete\"", "\"symbol\":\"USDJPY\""],
"failure_markers": ["traceback", "runtimeerror", "systemexit", "segmentation fault"],
"max_retries": 2,
"backoff_sec": 120
},
{
"id": "s2_cadjpy",
"kind": "bgmon",
"mode": "oneshot",
"depends_on": ["s1_usdjpy"],
"job_name": "ta_step_02",
"start_shell": "cd /root/feb4; bash tools/ta_symbol_pipeline.sh CADJPY",
"success_markers": ["\"phase\":\"ta_symbol_complete\"", "\"symbol\":\"CADJPY\""],
"failure_markers": ["traceback", "runtimeerror", "systemexit", "segmentation fault"],
"max_retries": 2,
"backoff_sec": 120
},
{
"id": "s3_usdchf",
"kind": "bgmon",
"mode": "oneshot",
"depends_on": ["s2_cadjpy"],
"job_name": "ta_step_03",
"start_shell": "cd /root/feb4; bash tools/ta_symbol_pipeline.sh USDCHF",
"success_markers": ["\"phase\":\"ta_symbol_complete\"", "\"symbol\":\"USDCHF\""],
"failure_markers": ["traceback", "runtimeerror", "systemexit", "segmentation fault"],
"max_retries": 2,
"backoff_sec": 120
}
]
}
This JSON is intentionally explicit so both humans and agents can reason about it.
6) Launch the supervisors
cd /root/feb4
python3 tools/bgmon.py start ta_forever_engine_01 -- \
bash -lc 'cd /root/feb4; export PYTHONUNBUFFERED=1; while true; do python3 tools/work_order_engine.py --work-order memory/work_orders/ta_forever_v1.json --state memory/work_order_state_ta_forever_v1.json --once; sleep 20; done'
python3 tools/bgmon.py start ta_completion_subagent_01 -- \
bash -lc 'cd /root/feb4; export PYTHONUNBUFFERED=1; python3 tools/ta_completion_subagent.py --interval-sec 1200'
7) Verify quickly (operator playbook)
python3 tools/bgmon.py status ta_forever_engine_01
python3 tools/bgmon.py status ta_completion_subagent_01
python3 tools/bgmon.py status ta_step_01
python3 tools/bgmon.py status ta_step_02
python3 tools/bgmon.py status ta_step_03
python3 tools/bgmon.py tail ta_forever_engine_01 -n 200
python3 tools/bgmon.py tail ta_completion_subagent_01 -n 200
cat /tmp/ta_completion_subagent_heartbeat.json
cat memory/work_order_state_ta_forever_v1.json
You should be able to answer these five questions in under one minute:
- What is currently running?
- What finished recently?
- What failed and why?
- What retried?
- What is scheduled next?
If you cannot, your control plane is incomplete.
8) Add a ledger so incidents are reviewable
Create tools/append_ledger.py:
#!/usr/bin/env python3
import json
import sys
import time
from pathlib import Path
if len(sys.argv) < 3:
raise SystemExit("usage: append_ledger.py <path> <json_event>")
path = Path(sys.argv[1])
evt = json.loads(sys.argv[2])
evt["ts"] = int(time.time())
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(evt) + "\n")
print("ok")
Record at minimum:
- launch events
- close events
- repair events
- promote/reject decisions
This is the difference between “we think we fixed it” and “we can prove what happened.”
9) Governance and risk boundaries
Autonomous execution without boundaries is reckless. Add these hard constraints:
- allowlisted shell prefixes only
- bounded retries and cool-down
- immutable run directories (never overwrite)
- explicit success markers and artifact checks
- one-command supervisor stop
- heartbeat freshness checks
Example allowlist:
ALLOWED_PREFIXES = [
"cd /root/feb4; bash tools/ta_symbol_pipeline.sh",
"cd /root/feb4; python3 tools/work_order_engine.py"
]
def command_allowed(shell_cmd: str) -> bool:
return any(shell_cmd.startswith(p) for p in ALLOWED_PREFIXES)
10) Common failure modes and concrete repairs
Failure: process appears alive but wrong process has PID
Cause: PID reuse after long runtime.
Fix: compare expected and current /proc/<pid>/stat starttime and mark STALE.
Failure: process exits 0 but outputs are invalid
Cause: success was inferred from exit code only.
Fix: success markers plus artifact existence and schema checks.
Failure: queue launches overlapping jobs
Cause: no single-step activation rule.
Fix: queue tick starts at most one runnable blocked step.
Failure: retry storms
Cause: immediate retries without bounds.
Fix: max_retries, backoff_sec, and terminal fail state.
Failure: nobody can tell system state after reconnect
Cause: no durable heartbeat or state file.
Fix: write heartbeat and queue state every tick.
11) Why this beats naive cron orchestration
Cron can launch commands, but it does not give you:
- dependency semantics
- step-level pass/fail markers
- transition-based audits
- recovery notes
- queue-state continuity
You can still use cron to ensure supervisors restart on host reboot. But pipeline semantics should live in your engine and sub-agent, not in scattered cron lines.
12) Operational runbook (daily, incident, promotion)
Daily
- check supervisor status
- check heartbeat timestamp
- review repairs in last 24h
- inspect queue state for blocked/failed
- verify latest output integrity before using results
Incident
- stop duplicate supervisors
- snapshot status and logs
- identify first failing step and marker
- run repair command
- relaunch from dependency boundary (not from scratch)
Promotion
- verify objective and risk guardrails
- verify artifact completeness
- verify output format normalization
- write promotion record to ledger
13) Lightweight ops snapshot command
Create tools/ops_snapshot.sh:
#!/usr/bin/env bash
set -euo pipefail
python3 tools/bgmon.py status
echo "--- engine tail ---"
python3 tools/bgmon.py tail ta_forever_engine_01 -n 60 || true
echo "--- subagent tail ---"
python3 tools/bgmon.py tail ta_completion_subagent_01 -n 60 || true
echo "--- heartbeat ---"
cat /tmp/ta_completion_subagent_heartbeat.json || true
echo "--- queue state ---"
cat memory/work_order_state_ta_forever_v1.json || true
This single command dramatically reduces ambiguity in multi-hour workflows.
14) How this pattern supports autonomous strategy research
Once the execution backbone is reliable, you can scale strategy exploration safely:
- queue hypothesis variants
- run one-at-a-time or controlled parallel lanes
- collect outcomes in a shared ledger
- feed winners into strategy catalogs
- trigger downstream reporting automatically
This is where agent autonomy becomes useful rather than noisy. The agent is not just running commands. It is running constrained experiments with durable accountability.
15) Design principles to keep as north star
- state over memory
- markers over assumptions
- deterministic logs over chat context
- bounded autonomy over unconstrained automation
- one-click status over manual investigation
If you keep these principles, your workflows improve continuously. If you violate them, workflows drift back into brittle, terminal-bound behavior.
16) What most teams miss
The technical parts are easy. The hard part is discipline.
Teams often implement detached jobs but skip:
- run naming conventions
- success marker contracts
- repair loops
- ledgering
- clear kill/restart procedures
Then they assume the system is autonomous when it is only asynchronous.
Asynchronous is not autonomous.
Autonomous means the system can detect and handle expected failure modes without hand-holding.
17) Security notes for production environments
If you move this beyond dev environments:
- run under least-privilege service users
- isolate secrets in env-managed stores, not JSON specs
- enforce command allowlists
- add log rotation and retention policy
- alert on stale heartbeat and repeated repair loops
- block raw user-provided shell in work order files
Reliability and safety must rise together.
18) Performance notes
For high-throughput pipelines:
- keep queue tick lightweight
- avoid expensive full-log scans; tail only last N lines
- store structured completion events if possible
- rotate logs by run and by day
- precompute key status summaries for dashboards
The goal is cheap observability. If status checks are expensive, teams stop running them.
19) Integrating with strategy dashboards
Your strategy dashboard should consume this control plane, not bypass it.
At minimum expose:
- active supervisor states
- current queue step
- last completed step
- last repair event
- current heartbeat age
This prevents a common blind spot where trading UI looks “active” but pipeline logic is dead.
20) Practical checklist before you trust it overnight
- kill your terminal and confirm jobs survive
- reboot your shell session and confirm state persists
- force a known failure marker and confirm repair trigger
- verify stale PID detection by restarting unrelated processes
- verify heartbeat freshness alerting
- verify queue does not launch duplicate steps
If all six pass, your automation is materially safer.
Closing: from fragile sessions to durable operations
A lot of AI teams are “one disconnect away” from losing execution continuity.
This guide replaces that fragility with an explicit, testable control loop:
- detached job manager
- dependency queue
- monitor sub-agent
- audit + repair
- persistent state and logs
That stack is enough to survive chat cutoffs, terminal drops, and overnight runtime surprises while keeping operators in control.
When this is in place, the conversation can shift from “is it still running?” to “what did we learn?”
That is the shift that unlocks real autonomous AI operations.
Internal links
- Index: /office/
- Sibling: How to keep Codex background jobs running without stale terminals
- Sibling: AI monitoring
- Sibling: System prompting tools
- Contact: /contact/
Build narrative
Follow a coherent path from thesis to lab notes to proof-of-work instead of isolated pages.