Background Jobs
Some tasks take time—like processing a 100-page PDF or analyzing a video. Background jobs let you start these tasks and check back later.
How It Works
Think of it like ordering food delivery: you place the order (start a job), get a tracking number (job ID), and check the status whenever you want. You don't have to wait around—just check back when convenient!
Supported Job Types
The type field onPOST /v1/jobs must be one of:
rl_training
Start a reinforcement-learning training run on your memory store.
bulk_import
Bulk-ingest documents or memories from an external source URL into a collection.
data_export
Export memories / documents / collections into a downloadable archive.
backup
Snapshot your account's data to durable storage.
consolidation
Compress older memories while preserving important signal. Runs periodically.
cleanup
Purge soft-deleted memories and orphaned index entries.
Job Statuses
Every job goes through these stages:
Job is queued, waiting for a worker to pick it up
Job is currently being processed
Job finished successfully. Results are available in result.
Something went wrong. Check error_message and error_details.
Job was cancelled via POST /v1/jobs/{job_id}/cancel before it finished.
Endpoints
Code Examples
Start a Bulk Import Job
import os
import requests
BASE = "https://api.hebbrix.com/v1"
H = {"Authorization": f"Bearer {os.environ['HEBBRIX_API_KEY']}"}
# POST /v1/jobs — enqueue a long-running background job.
# Job-type-specific inputs go inside "parameters".
r = requests.post(
f"{BASE}/jobs",
headers=H,
json={
"type": "bulk_import",
"parameters": {
"source_url": "https://example.com/large-document.pdf",
"collection_id": "col_abc123",
},
"metadata": {"triggered_by": "nightly-sync"},
},
)
job = r.json()
print(f"Job enqueued — id={job['id']}, status={job['status']}") # "queued" initially
job_id = job["id"]Check Job Status
# GET /v1/jobs/{job_id}
r = requests.get(f"{BASE}/jobs/{job_id}", headers=H)
job = r.json()
# progress is a float (0.0 .. 100.0)
print(f"Status: {job['status']} progress: {job['progress']:.1f}%")
if job["status"] == "completed":
print("Done!", job["result"])
elif job["status"] == "failed":
print("Failed:", job["error_message"], job.get("error_details"))
elif job["status"] == "cancelled":
print("Cancelled before completion")Wait for Completion (Polling)
import time
TERMINAL = {"completed", "failed", "cancelled"}
def wait_for_job(job_id: str, max_wait: int = 300):
"""Poll GET /v1/jobs/{job_id} until terminal state, or timeout."""
start = time.time()
while True:
r = requests.get(f"{BASE}/jobs/{job_id}", headers=H)
job = r.json()
if job["status"] == "completed":
return job["result"]
if job["status"] == "failed":
raise RuntimeError(job["error_message"])
if job["status"] == "cancelled":
raise RuntimeError("Job was cancelled")
if time.time() - start > max_wait:
raise TimeoutError("Job did not finish in time")
print(f"Status: {job['status']} ({job['progress']:.1f}%)")
time.sleep(5)
result = wait_for_job(job_id)
print(f"Imported {result.get('memories_created', 0)} memories!")Cancel a Job
# POST /v1/jobs/{job_id}/cancel — cancel a queued or running job.
# Returns the updated JobResponse with status = "cancelled".
r = requests.post(f"{BASE}/jobs/{job_id}/cancel", headers=H)
r.raise_for_status()
print("Job cancelled:", r.json()["status"])
# Once terminal, DELETE /v1/jobs/{job_id} removes the record:
requests.delete(f"{BASE}/jobs/{job_id}", headers=H)Pro Tips
- Save job IDs! You need them to check status later
- Don't poll too frequently—check every 5-10 seconds is enough
- Jobs older than 30 days are automatically deleted (but results are saved)
- You can have multiple jobs running at once
