Replies: 2 comments 2 replies
-
After much discussion, we are converging on the following approach:
|
Beta Was this translation helpful? Give feedback.
-
The cascade delete is nice, this design will eliminate the possibility of "orphaned" jobs, removing the need for the This means users with delete/drop privilege on the Autopopulate tables must also have delete/drop privilege on the corresponding |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Problem Statement:
The current dataJoint-python approach for jobs reservation, orchestration, and execution (i.e. the
autopopulate
) faces scalability limitations. While its original design effectively handled job reservation/distribution for parallelization, it falls short when building a comprehensive data platform.Limitations of the
jobs
tableThe existing
jobs
table functions more as an error/reserve table than a true jobs queue.error
(failed jobs) andreserved
(jobs in progress) states. It lacks crucial statuses such as:pending
/scheduled
(jobs not yet started)success
(record of successfully completed jobs and their duration).key_source
to get a list of jobs, which, while ensuring up-to-date information, strains the database.Limitations of
key_source
Behavior/UsageThe default
key_source
(an inner-join of parent tables) is intended to represent all possible jobs for a given table.key_source
(e.g., restricting byparamset
or other tables).key_source
settings are only visible to the local code executing the pipeline, not globally at the database level. This leads to:key_source
definitions.(Table.key_source - Table).fetch('KEY')
is DataJoint's method for retrieving the job queue and can be an expensive operation, especially when called frequently by multiple workers. This significantly strains the database server, as observed by other users.Proposed Solution: New Jobs Table
Step 1: A New
JOB2
Table (Name TBD)A new table, tentatively named
JOB2
, would be introduced with the following schema:table_name
:varchar(255)
- TheclassName
of the table.key_hash
:char(32)
- A hash of the job's key.status
:enum('reserved','error','ignore','scheduled','success')
- The current status of the job.key
:json
- A JSON structure containing the job's key.status_message
:varchar(2000)
- e.g., error message if failed.error_stack
:mediumblob
- The error stack if the job failed.timestamp
:timestamp
- The scheduled time (UTC) for the job to run.run_duration
:float
- The run duration in seconds.run_version
:json
- Representation of the code/environment version of the run (e.g., git commit hash).user
:varchar(255)
- The database user.host
:varchar(255)
- The system hostname.pid
:int unsigned
- The system process ID.connection_id
:bigint unsigned
- The database connection ID.Step 2: Mechanism to "Hydrate"/"Refresh" the
JOB2
TableA new class method,
refresh_jobs()
, would be introduced for every Autopopulate table. This method would:key_source
of the table.JOB2
.JOB2
due to upstream record deletions.The key challenge here is how and when to trigger
refresh_jobs()
. If triggered by everypopulate(reserved_jobs=True)
call, it could become a bottleneck due to read/write operations toJOB2
and potential race conditions/deadlocks.Step 3: New/Updated
populate()
FunctionThe
populate()
function would be updated to:JOB2
for a list of "scheduled" jobs.populate1(key)
as usual for each job.JOB2
tosuccess
and add additional information (e.g., run duration, code version).Considerations
refresh_jobs()
Frequency and Staleness: How often shouldrefresh_jobs()
be called, and what level of staleness inJOB2
is acceptable? A centralized process could refresh jobs for each research project on a schedule (e.g., every 10, 15, or 30 minutes), similar to current worker-manager cron jobs. This would address the performance issues related tokey_source
that many users have experienced.refresh_jobs()
without Pipeline Code: Shouldrefresh_jobs()
be callable without the pipeline code installed (i.e., from a "virtual module")? Yes, to avoid the complexity and expense of requiring full code installation.Notes
We have considered adopting and integrating with other industry standards for workflow orchestration such as Airflow, Flyte or Prefect, and have produced and evaluated multiple working prototypes.
However, we think that the additional burden of deployment & maintenance of those tools is too much for a python open-source project such as DataJoint - the enhanced features come with significant DevOps requirements & burden.
Beta Was this translation helpful? Give feedback.
All reactions