Design Scheduler (Distributed)
Requirements
Users can Submit Jobs to be run
Users can view the status of the job
Users can view the output of the job
Every submitted job will be a python/js script
more languages can be added later
Retrigger a job if it has failed
Triggers:
cron expression
On Demand
We can set priority of the jobs?
Are jobs dependent on each other?
Directed Acyclic graph (DAG)
Use topological sort
System should run almost when the scheduled run is expected (2-4s delay okay)
2 Phase Commit (2PC) or Change Data Capture (CDC - Kafka)
NFR
System is expected to be highly available
1 Billion jobs submitted per day (TPS?)
jobs takes around 5 minutes to run
16 cores per machine
2 threads per core
Input and Output file ~ 5MB
Data retention for 1 day
In Distributed world, need to make sure same job is not executed by other nodes
System should run it at-least-once
Observability
Logs
Monitoring Resources
graph LR
A[Users / Clients] --> B[Load Balancer]
B --> S1[App Server #1\n~2000-5000 QPS]
B --> S2[App Server #2\n~2000-5000 QPS]
B --> S3[App Server #3\n~2000-5000 QPS]
B --> S4[App Server #4\n~2000-5000 QPS]
S1 --> C[Cache]
S2 --> C
S3 --> C
S4 --> C
C --> D[Database]
S1 --> D
S2 --> D
S3 --> D
S4 --> D
%% Note: Scale when QPS per server > 2000–5000, CPU >70%, or latency >200ms
Back of the Envelop Calculation
Throughput calculation
Expected: 1 Billion jobs per day
same as 1 0 9 jobs per day
1 day = 84,000 seconds ~ 1 0 5 seconds
Number of jobs per second = 1 0 9 jobs / 1 0 5 seconds
= 1 0 4 = 10,000 jobs per second
Blob Storage calculation
Assuming each python script is 200 lines long
Each line is 50 characters = 50 bytes
Total size of each script = 200 ∗ 50 = 1 0 4 ~ 10 MB
Per day size requirements = 1 Billion * Size of Each script
= 1 0 9 ∗ 1 0 4 bytes
= 10 TB
Job Scheduler APIs
Enqueue job
POST /api/v1/jobs
Body:
task_id represents the python code stored in Blob Storage
{
"task_id" : "1234" ,
"trigger" : {
"kind" : "cron" ,
"schedule" : "20 * * *"
},
"params" : {
"param_1" : "value_1" ,
"param_2" : "value_2" ,
...
}
}
Response:
job_id created in the database
status initialized with scheduled
Update Job status
PUT /api/v1/jobs/{job_id}/status
{
"status" : "scheduled|pending|running|disabled"
}
Get Job status
GET /api/v1/jobs/{job_id}/status
Read/Write Ratio
Writes
As per throughput: Writes 10,000 operations per second
Update status of job after completion
Insert into Run table for history
Reads
Mechanism to check the status is scheduled probably every minute
Assuming we have x replicas then each minute they will fetch and cause x queries which is very less
If user queries the status of job
It is clear Writes >> Reads
Services
Job Scheduler Service
Schedule Jobs
See Job status
Database to store Job info
Blob Storage to store python script
Watcher Service
watches the database changes
send to Message Queue
probably can use Change Data Capture (CDC)
Message Queue
Executor Service
Triggers
cron/time based
event based/manual
Job Execution Code
Binaries can be uploaded to Blob Storage
Tables
Job Schedule — keeps schedule of tasks to be run
Run — keeps history of runs
Job Scheduler Table
job_id
s3_url
priority
frequency
run_timestamp
tells when to run it next
If the job failed, then we can reschedule?
increase timestamp so that it is not scheduled again and again??
status:
scheduled
running
completed
failed
Initially we need to add next five runs in advance to put in the table
When job completes we will push the next run
this will make always next 5 runs available
Querying Jobs to be run in next 1 minute
Use FIFO and order by timestamp of scheduled run
SELECT *
FROM job_scheduler
WHERE current_timestamp > run_timestamp
AND status = 'scheduled'
AND priority = 'P1'
AND ....
ORDER BY run_timestamp
LIMIT 10 -- important to add limit
Hence need to index based on status and then run_timestamp
Partitioning Table
Partition Table
Each partition will be read by one worker node
Database Choice
Writes >> Reads
Indexing required on run_timestamp and priority
Read pattern is a range query
partition key + sort key
24 partition keys for each hour
partition key = get_floor_hour(run_timestamp)
get_floor_hour finds nearest floor hour:
floor(run_timestamp / 3600) * 3600
Example
input = 1759603282 = 2025-10-04 18:41:22
eg. floor(1759603282 / 3600) * 3600
= 1759600800 = 2025-10-04 18:00:00
All tasks with same time in a minute will belong to same partition
sort key = run_timestamp+job_id, maybe for only DynamoDB?
partition key + sort key needs to be unique in DynamoDB
Consistency Model
Eventual Consistency
Job status can be little late and we are okay with it
Watcher Service
This will poll DB every 1 minute to check for tasks to be run
can be implemented using cron
Any tasks found will be validated?
Tasks are pushed to queue
Queueing Tasks
Make use of all the workers efficiently
Use Load Balancer
Using hashing on job_id, we can distribute tasks to workers
Long running tasks can cause issues and prevent scheduling on same hash value of job_id
It should also know which worker is idle to intelligently delegate tasks
Use Message Queues
Kafka
It is ordered
If long running tasks come then it can get stuck
In-memory message broker
Recommended, consumers take tasks when idle
For priority we can have different queues for each priority level
Executor Service
Boots up Docker instance to run the code
Running instances aka Workers
Workers are separate from Executor Service, could be Elastic Container Service (ECS)
Prevent malicious code to run
Run in sandboxed environment for safety
Update the running status of tasks
Avoid running same jobs at once
Use Zookeeper
Distributed Locks can be used with TTL, but if TTL is expired another worker can take the job and cause issues
Making jobs idempotent could be workaround
Retry mechanism
Job Dependencies
DAG Table
schedule roots of DAG with epoch 1
shard on DAG ID
MongoDB can be used for data flexibility
When you add a job, you can have:
{
"job_id" : 123 ,
// ...
"dependencies" : [ 24 , 98 ] // dependent job_ids
}