Files
2026-06-02 18:59:31 +02:00

95 lines
2.9 KiB
Python

import threading
import uuid
from datetime import datetime
from collections import deque
def _now() -> str:
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
class JobManager:
def __init__(self) -> None:
self._jobs: dict[str, dict] = {}
self._lock = threading.Lock()
def create_job(self, kind: str) -> dict:
job_id = uuid.uuid4().hex
job = {
"id": job_id,
"kind": kind,
"status": "queued",
"progress": 0.0,
"message": None,
"details": {},
"logs": [],
"result": None,
"created_at": _now(),
"updated_at": _now(),
}
with self._lock:
self._jobs[job_id] = job
return job
def get_job(self, job_id: str) -> dict:
with self._lock:
job = self._jobs.get(job_id)
if not job:
raise KeyError("Job not found")
return dict(job)
def update_job(
self,
job_id: str,
status: str | None = None,
progress: float | None = None,
message: str | None = None,
details: dict | None = None,
log: str | None = None,
result: dict | None = None,
) -> dict:
with self._lock:
job = self._jobs.get(job_id)
if not job:
raise KeyError("Job not found")
if status is not None:
job["status"] = status
if progress is not None:
job["progress"] = max(0.0, min(1.0, float(progress)))
if message is not None:
job["message"] = message
if details is not None:
job["details"] = details
if log is not None:
logs = deque(job.get("logs", []), maxlen=200)
logs.append({"at": _now(), "message": log})
job["logs"] = list(logs)
if result is not None:
job["result"] = result
job["updated_at"] = _now()
return dict(job)
def set_progress(
self,
job_id: str,
progress: float,
message: str | None = None,
details: dict | None = None,
) -> dict:
return self.update_job(job_id, progress=progress, message=message, details=details)
def log(self, job_id: str, message: str) -> dict:
return self.update_job(job_id, log=message)
def run_in_thread(self, job_id: str, fn) -> None:
def runner():
try:
self.update_job(job_id, status="running")
result = fn()
self.update_job(job_id, status="completed", progress=1.0, result=result)
except Exception as exc: # pragma: no cover - used for runtime visibility
self.update_job(job_id, status="failed", message=str(exc))
thread = threading.Thread(target=runner, daemon=True)
thread.start()