Design Scheduler (Distributed)

Requirements

  • Users can Submit Jobs to be run
  • Users can view the status of the job
  • Users can view the output of the job
  • Every submitted job will be a python/js script
    • more languages can be added later
  • Retrigger a job if it has failed
  • Triggers:
    • cron expression
      • Allow disabling jobs
    • On Demand
  • We can set priority of the jobs?
  • Are jobs dependent on each other?
    • Directed Acyclic graph (DAG)
    • Use topological sort
  • System should run almost when the scheduled run is expected (2-4s delay okay)
  • 2 Phase Commit (2PC) or Change Data Capture (CDC - Kafka)

NFR

  • System is expected to be highly available
  • 1 Billion jobs submitted per day (TPS?)
  • jobs takes around 5 minutes to run
  • 16 cores per machine
  • 2 threads per core
  • Input and Output file ~ 5MB
  • Data retention for 1 day
  • In Distributed world, need to make sure same job is not executed by other nodes
  • System should run it at-least-once
  • Observability
    • Logs
    • Monitoring Resources
graph LR
    A[Users / Clients] --> B[Load Balancer]
    B --> S1[App Server #1\n~2000-5000 QPS]
    B --> S2[App Server #2\n~2000-5000 QPS]
    B --> S3[App Server #3\n~2000-5000 QPS]
    B --> S4[App Server #4\n~2000-5000 QPS]

    S1 --> C[Cache]
    S2 --> C
    S3 --> C
    S4 --> C

    C --> D[Database]
    S1 --> D
    S2 --> D
    S3 --> D
    S4 --> D

    %% Note: Scale when QPS per server > 2000–5000, CPU >70%, or latency >200ms

Back of the Envelop Calculation

  • Throughput calculation
    • Expected: 1 Billion jobs per day
    • same as jobs per day
    • 1 day = 84,000 seconds ~ seconds
    • Number of jobs per second = jobs / seconds
    • = = 10,000 jobs per second
  • Blob Storage calculation
    • Assuming each python script is 200 lines long
    • Each line is 50 characters = 50 bytes
    • Total size of each script = ~ 10 MB
    • Per day size requirements = 1 Billion * Size of Each script
    • = bytes
    • = TB

Job Scheduler APIs

  • Enqueue job
    • POST /api/v1/jobs
    • Body:
      • task_id represents the python code stored in Blob Storage
    {
        "task_id": "1234",
        "trigger": {
            "kind": "cron",
            "schedule": "20 * * *"
        },
        "params": {
            "param_1": "value_1",
            "param_2": "value_2",
            ...
        }
    }
    • Response:
      • job_id created in the database
      • status initialized with scheduled
  • Update Job status
    • PUT /api/v1/jobs/{job_id}/status
      {
          "status": "scheduled|pending|running|disabled"
      }
  • Get Job status
    • GET /api/v1/jobs/{job_id}/status

Read/Write Ratio

  • Writes
    • As per throughput: Writes 10,000 operations per second
    • Update status of job after completion
    • Insert into Run table for history
  • Reads
    • Mechanism to check the status is scheduled probably every minute
      • Assuming we have x replicas then each minute they will fetch and cause x queries which is very less
    • If user queries the status of job
      • Pretty much less
  • It is clear Writes >> Reads

Services

  • Job Scheduler Service
    • Schedule Jobs
    • See Job status
  • Database to store Job info
  • Blob Storage to store python script
  • Watcher Service
    • watches the database changes
    • send to Message Queue
    • probably can use Change Data Capture (CDC)
  • Message Queue
  • Executor Service
    • Workers

Triggers

  • cron/time based
  • event based/manual

Job Execution Code

  • Binaries can be uploaded to Blob Storage

Tables

  • Job Schedule — keeps schedule of tasks to be run
  • Run — keeps history of runs

Job Scheduler Table

  • job_id
  • s3_url
  • priority
    • P1
    • P2
    • P3
  • frequency
    • cron expression
  • run_timestamp
    • tells when to run it next
    • If the job failed, then we can reschedule?
    • increase timestamp so that it is not scheduled again and again??
  • status:
    • scheduled
    • running
    • completed
    • failed
  • Initially we need to add next five runs in advance to put in the table
    • When job completes we will push the next run
    • this will make always next 5 runs available
  • Querying Jobs to be run in next 1 minute
    • Use FIFO and order by timestamp of scheduled run
SELECT *
FROM job_scheduler
WHERE current_timestamp > run_timestamp
AND status = 'scheduled'
AND priority = 'P1'
AND ....
ORDER BY run_timestamp
LIMIT 10 -- important to add limit
  • Hence need to index based on status and then run_timestamp

Partitioning Table

  • Partition Table
  • Each partition will be read by one worker node

Database Choice

  • Writes >> Reads
  • Indexing required on run_timestamp and priority
  • Read pattern is a range query
  • partition key + sort key
  • 24 partition keys for each hour
  • partition key = get_floor_hour(run_timestamp)
    • get_floor_hour finds nearest floor hour:
      • floor(run_timestamp / 3600) * 3600
    • Example
      • input = 1759603282 = 2025-10-04 18:41:22
      • eg. floor(1759603282 / 3600) * 3600
      • = 1759600800 = 2025-10-04 18:00:00
  • All tasks with same time in a minute will belong to same partition
  • sort key = run_timestamp+job_id, maybe for only DynamoDB?
  • partition key + sort key needs to be unique in DynamoDB

Consistency Model

  • Eventual Consistency
  • Job status can be little late and we are okay with it

Watcher Service

  • This will poll DB every 1 minute to check for tasks to be run
    • can be implemented using cron
  • Any tasks found will be validated?
  • Tasks are pushed to queue

Queueing Tasks

  • Make use of all the workers efficiently
  • Use Load Balancer
    • Using hashing on job_id, we can distribute tasks to workers
    • Long running tasks can cause issues and prevent scheduling on same hash value of job_id
    • It should also know which worker is idle to intelligently delegate tasks
  • Use Message Queues
    • Kafka
      • It is ordered
      • If long running tasks come then it can get stuck
    • In-memory message broker
      • Recommended, consumers take tasks when idle
  • For priority we can have different queues for each priority level

Executor Service

  • Boots up Docker instance to run the code
    • Running instances aka Workers
    • Workers are separate from Executor Service, could be Elastic Container Service (ECS)
  • Prevent malicious code to run
  • Run in sandboxed environment for safety
  • Update the running status of tasks

Avoid running same jobs at once

  • Use Zookeeper
  • Distributed Locks can be used with TTL, but if TTL is expired another worker can take the job and cause issues
  • Making jobs idempotent could be workaround

Retry mechanism

  • ?

Job Dependencies

  • DAG Table
  • schedule roots of DAG with epoch 1
  • shard on DAG ID
  • MongoDB can be used for data flexibility
  • When you add a job, you can have:
{
  "job_id": 123,
  // ...
  "dependencies": [ 24, 98 ] // dependent job_ids
}