Claude
Skills
Sign in
Back

scheduler-and-jobs

Included with Lifetime
$97 forever

Production-grade Frappe background jobs and scheduler — queue selection, idempotency, retries, distributed locks, monitoring stuck jobs, debugging worker hangs, chunked long-running jobs. Use when designing or debugging frappe.enqueue / scheduler_events code, troubleshooting stuck jobs, or making jobs robust against retries and worker restarts. Complements server-scripts (which covers basic syntax) with the production reality.

General

What this skill does


# Scheduler & Background Jobs (Production)

Reference for making Frappe background jobs and scheduled tasks robust under real-world load. The `server-scripts` skill covers basic `frappe.enqueue` and `scheduler_events` syntax; this is its production-grade companion.

## Queue selection

Frappe ships three RQ queues with different timeout defaults:

| Queue | Default timeout | Use for |
|-------|-----------------|---------|
| `short` | 60 seconds | Quick fire-and-forget: send one email, update one cache key, recompute one denormalized field |
| `default` | 300 seconds (5 min) | Most normal jobs: process one document, sync a small batch |
| `long` | 1500 seconds (25 min) | Bulk imports, monthly reports, heavy data migrations |

```python
frappe.enqueue("my_app.tasks.send_one_email", queue="short", recipient="[email protected]")
frappe.enqueue("my_app.tasks.process_invoice", queue="default", invoice="SINV-001")
frappe.enqueue("my_app.tasks.monthly_aging_report", queue="long")
```

**Mismatch consequences:**
- Job on `short` that runs >60s: **killed mid-execution** with no rollback. Half-written state persists.
- Job on `long` that takes 5s: hogs a long-queue worker slot another truly-long job needs. Smaller blast radius.

When in doubt, set an explicit `timeout` and use `default`:

```python
frappe.enqueue(
    "my_app.tasks.process_invoice",
    queue="default",
    timeout=180,
    invoice="SINV-001",
)
```

## `now=True`, `enqueue_after_commit`, `is_async`

```python
frappe.enqueue(method, queue="default", now=False, enqueue_after_commit=False, is_async=True, **kwargs)
```

| Flag | What it does | When to use |
|------|--------------|-------------|
| `now=True` | Runs the function in the current process synchronously, no Redis | **Tests only.** In production it defeats the purpose of enqueueing — the request thread blocks. |
| `enqueue_after_commit=True` | Defers the enqueue until the current DB transaction commits | When you enqueue from a `validate` / `before_save` hook on a doc that's about to be saved. Without this flag, the worker may pick up the job and try to read the doc before the request thread commits — race condition. |
| `is_async=False` | Runs in the current process (similar to `now=True`) | Almost never. Use `now=True` for tests; otherwise let it queue. |

**The `enqueue_after_commit` rule:** if you call `frappe.enqueue` from any document lifecycle hook (validate, before_save, on_update, on_submit, on_cancel) and the job needs the doc to exist or have its latest state, set `enqueue_after_commit=True`. Otherwise the job races the commit.

```python
class SalesInvoice(Document):
    def on_submit(self):
        # Worker needs to see the submitted invoice — wait for commit
        frappe.enqueue(
            "my_app.tasks.send_invoice_email",
            queue="short",
            timeout=60,
            enqueue_after_commit=True,
            invoice_name=self.name,
        )
```

## Cron expressions in `scheduler_events`

The team convention is to prefer `cron:` over the named keys (`hourly`, `daily`, `weekly`, `monthly`) — explicit cron expressions are more obvious to read and grep for. The named keys are still fine for "I genuinely want this every hour" but `cron` is preferred when the cadence is anything else or when you want to see the schedule at a glance.

```python
# hooks.py — pattern from edu_quality
scheduler_events = {
    "all": [
        "edu_quality.api.student_application.get_and_schedule_pending_walkouts",
        "edu_quality.overrides_hooks.item.upload_all_imported_to_drive",
    ],
    "cron": {
        "0 * * * *":    ["edu_quality.tasks.cron"],
        "0 19 * * *":   ["edu_quality.tasks.send_bulk_notification_cmap_to_guardian"],
        "0 1 * * *":    ["edu_quality.edu_quality.scheduled_tasks.assessment_pdf_scheduler.attach_daily_assessment_pdfs"],
        "0 6 * * *":    [
            "edu_quality.tasks.schedule_birthday_greeting",
            "edu_quality.tasks.event_reminder",
        ],
        "* * * * *":    [
            "edu_quality.cmap_jobs.send_ptm_notifications_to_students",
            "edu_quality.cmap_jobs.notify_teacher_before_one_hour_job",
        ],
    },
}
```

Common patterns:

| Cron | Meaning |
|------|---------|
| `*/15 * * * *` | Every 15 minutes |
| `0 * * * *` | Top of every hour |
| `0 9 * * 1-5` | 9 AM Monday–Friday |
| `0 0 * * 0` | Midnight Sunday |
| `0 0 1 * *` | Midnight on the 1st of every month |
| `*/30 9-17 * * 1-5` | Every 30 minutes, 9 AM–5 PM, weekdays |

Scheduler runs in the `default` queue. **For long scheduled jobs, the scheduler entry point should `frappe.enqueue` to the `long` queue rather than blocking the scheduler thread** — this is the team's standard two-step pattern (see `assessment_pdf_scheduler.py`):

```python
def attach_daily_assessment_pdfs():
    """Scheduler entry point — fast handoff."""
    frappe.enqueue(
        _process_daily_assessment_pdfs,
        queue="long",
        timeout=3600,  # 1 hour
        job_name="daily_assessment_pdf_attachment",
    )
    return {"success": True, "message": "Daily Assessment PDF attachment job has been queued"}


def _process_daily_assessment_pdfs():
    # The actual work
    ...
```

Note the deterministic `job_name` — running the scheduler twice in the same window is a no-op because `frappe.enqueue` dedups jobs with the same name in the queue.

Common patterns:

| Cron | Meaning |
|------|---------|
| `*/15 * * * *` | Every 15 minutes |
| `0 * * * *` | Top of every hour |
| `0 9 * * 1-5` | 9 AM Monday–Friday |
| `0 0 * * 0` | Midnight Sunday |
| `0 0 1 * *` | Midnight on the 1st of every month |
| `*/30 9-17 * * 1-5` | Every 30 minutes, 9 AM–5 PM, weekdays |

Scheduler runs in the `default` queue. For long scheduled jobs, the scheduler invocation should `frappe.enqueue` to the `long` queue rather than blocking the scheduler thread:

```python
def daily_cleanup():
    """Scheduler entry point — fast: just enqueue the real work."""
    frappe.enqueue("my_app.tasks._do_cleanup", queue="long", timeout=1800)

def _do_cleanup():
    # The actual work
    ...
```

## Idempotency

A job is idempotent if running it twice produces the same end state as running it once. Workers crash, retries happen, scheduler windows can overlap — assume your job WILL run twice.

### Pattern 1: check before mutate

```python
def sync_customer_to_external(customer_id):
    # Has this customer already been synced today?
    today = frappe.utils.nowdate()
    if frappe.db.exists("External Sync Log", {
        "customer": customer_id,
        "sync_date": today,
        "status": "Success",
    }):
        return  # Already done — exit cleanly

    # ... do the sync ...

    frappe.get_doc({
        "doctype": "External Sync Log",
        "customer": customer_id,
        "sync_date": today,
        "status": "Success",
    }).insert(ignore_permissions=True)
    frappe.db.commit()
```

### Pattern 2: dedup via `job_name`

`frappe.enqueue` accepts a `job_name`. If a job with the same name is already in the queue or running, the new enqueue is dropped (returns `None`):

```python
frappe.enqueue(
    "my_app.tasks.recompute_customer_balance",
    queue="short",
    job_name=f"recompute-balance-{customer_id}",
    customer=customer_id,
)
```

Use stable, deterministic job names. Random suffixes defeat the dedup.

### Pattern 3: optimistic update with version check

```python
def update_inventory(item_code, expected_qty, new_qty):
    rows = frappe.db.sql(
        """UPDATE `tabBin`
           SET actual_qty = %s
           WHERE item_code = %s AND actual_qty = %s""",
        (new_qty, item_code, expected_qty),
    )
    if not rows:
        # Someone else updated it first; this run is stale
        return
```

## Distributed locks

When two workers might run the same critical section concurrently (e.g. two webhooks triggering the same recalculation), use Frappe's cache as a lock:

```python
def with_lock(key, ttl=300):
    """Poor man's lock via Redis cache

Related in General