95 lines
2.9 KiB
Python
95 lines
2.9 KiB
Python
import threading
|
|
import uuid
|
|
from datetime import datetime
|
|
from collections import deque
|
|
|
|
|
|
def _now() -> str:
|
|
return datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
|
|
|
|
|
class JobManager:
|
|
def __init__(self) -> None:
|
|
self._jobs: dict[str, dict] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def create_job(self, kind: str) -> dict:
|
|
job_id = uuid.uuid4().hex
|
|
job = {
|
|
"id": job_id,
|
|
"kind": kind,
|
|
"status": "queued",
|
|
"progress": 0.0,
|
|
"message": None,
|
|
"details": {},
|
|
"logs": [],
|
|
"result": None,
|
|
"created_at": _now(),
|
|
"updated_at": _now(),
|
|
}
|
|
with self._lock:
|
|
self._jobs[job_id] = job
|
|
return job
|
|
|
|
def get_job(self, job_id: str) -> dict:
|
|
with self._lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
raise KeyError("Job not found")
|
|
return dict(job)
|
|
|
|
def update_job(
|
|
self,
|
|
job_id: str,
|
|
status: str | None = None,
|
|
progress: float | None = None,
|
|
message: str | None = None,
|
|
details: dict | None = None,
|
|
log: str | None = None,
|
|
result: dict | None = None,
|
|
) -> dict:
|
|
with self._lock:
|
|
job = self._jobs.get(job_id)
|
|
if not job:
|
|
raise KeyError("Job not found")
|
|
if status is not None:
|
|
job["status"] = status
|
|
if progress is not None:
|
|
job["progress"] = max(0.0, min(1.0, float(progress)))
|
|
if message is not None:
|
|
job["message"] = message
|
|
if details is not None:
|
|
job["details"] = details
|
|
if log is not None:
|
|
logs = deque(job.get("logs", []), maxlen=200)
|
|
logs.append({"at": _now(), "message": log})
|
|
job["logs"] = list(logs)
|
|
if result is not None:
|
|
job["result"] = result
|
|
job["updated_at"] = _now()
|
|
return dict(job)
|
|
|
|
def set_progress(
|
|
self,
|
|
job_id: str,
|
|
progress: float,
|
|
message: str | None = None,
|
|
details: dict | None = None,
|
|
) -> dict:
|
|
return self.update_job(job_id, progress=progress, message=message, details=details)
|
|
|
|
def log(self, job_id: str, message: str) -> dict:
|
|
return self.update_job(job_id, log=message)
|
|
|
|
def run_in_thread(self, job_id: str, fn) -> None:
|
|
def runner():
|
|
try:
|
|
self.update_job(job_id, status="running")
|
|
result = fn()
|
|
self.update_job(job_id, status="completed", progress=1.0, result=result)
|
|
except Exception as exc: # pragma: no cover - used for runtime visibility
|
|
self.update_job(job_id, status="failed", message=str(exc))
|
|
|
|
thread = threading.Thread(target=runner, daemon=True)
|
|
thread.start()
|