Skip to content

Add schema for multiple task attempts #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,25 +197,33 @@ assert result.return_value == 42
If a result has been updated in the background, you can call `refresh` on it to update its values. Results obtained using `get_result` will always be up-to-date.

```python
assert result.status == ResultStatus.NEW
assert result.status == ResultStatus.READY
result.refresh()
assert result.status == ResultStatus.SUCCEEDED
```

#### Exceptions
#### Errors

If a task raised an exception, its `.exception_class` will be the exception class raised:
If a task raised an exception, its `.errors` contains information about the error:

```python
assert result.exception_class == ValueError
assert result.errors[0].exception_class == ValueError
```

Note that this is just the type of exception, and contains no other values. The traceback information is reduced to a string that you can print to help debugging:

```python
assert isinstance(result.traceback, str)
assert isinstance(result.errors[0].traceback, str)
```

Note that currently, whilst `.errors` is a list, it will only ever contain a single element.

#### Attempts

The number of times a task has been run is stored as the `.attempts` attribute. This will currently only ever be 0 or 1.

The date of the last attempt is stored as `.last_attempted_at`.

### Backend introspecting

Because `django-tasks` enables support for multiple different backends, those backends may not support all features, and it can be useful to determine this at runtime to ensure the chosen task queue meets the requirements, or to gracefully degrade functionality if it doesn't.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Generated by Django 5.2.2 on 2025-06-13 14:56

from django.db import migrations, models
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.migrations.state import StateApps


def update_status(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None:
DBTaskResult = apps.get_model("django_tasks_database", "DBTaskResult")

DBTaskResult.objects.filter(status="NEW").update(status="READY")


def revert_status_rename(
apps: StateApps, schema_editor: BaseDatabaseSchemaEditor
) -> None:
DBTaskResult = apps.get_model("django_tasks_database", "DBTaskResult")

DBTaskResult.objects.filter(status="READY").update(status="NEW")


class Migration(migrations.Migration):
dependencies = [
("django_tasks_database", "0016_alter_dbtaskresult_options_and_more"),
]

operations = [
migrations.RemoveIndex(
model_name="dbtaskresult",
name="django_task_new_ordering_idx",
),
migrations.AlterField(
model_name="dbtaskresult",
name="status",
field=models.CharField(
choices=[
("READY", "Ready"),
("RUNNING", "Running"),
("FAILED", "Failed"),
("SUCCEEDED", "Succeeded"),
],
default="READY",
max_length=9,
verbose_name="status",
),
),
migrations.RunPython(update_status, revert_status_rename),
migrations.AddIndex(
model_name="dbtaskresult",
index=models.Index(
models.F("status"),
models.OrderBy(models.F("priority"), descending=True),
models.OrderBy(models.F("run_after")),
condition=models.Q(("status", "READY")),
name="django_task_new_ordering_idx",
),
),
]
24 changes: 14 additions & 10 deletions django_tasks/backends/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
MIN_PRIORITY,
ResultStatus,
Task,
TaskError,
)
from django_tasks.utils import get_exception_traceback, get_module_path, retry

Expand Down Expand Up @@ -56,7 +57,7 @@ def ready(self) -> "DBTaskResultQuerySet":
Return tasks which are ready to be processed.
"""
return self.filter(
status=ResultStatus.NEW,
status=ResultStatus.READY,
).filter(models.Q(run_after=DATE_MAX) | models.Q(run_after__lte=timezone.now()))

def succeeded(self) -> "DBTaskResultQuerySet":
Expand Down Expand Up @@ -85,7 +86,7 @@ class DBTaskResult(GenericBase[P, T], models.Model):
status = models.CharField(
_("status"),
choices=ResultStatus.choices,
default=ResultStatus.NEW,
default=ResultStatus.READY,
max_length=max(len(value) for value in ResultStatus.values),
)

Expand Down Expand Up @@ -122,7 +123,7 @@ class Meta:
"status",
*ordering,
name="django_task_new_ordering_idx",
condition=Q(status=ResultStatus.NEW),
condition=Q(status=ResultStatus.READY),
),
models.Index(fields=["queue_name"]),
models.Index(fields=["backend_name"]),
Expand Down Expand Up @@ -163,26 +164,29 @@ def task(self) -> Task[P, T]:
def task_result(self) -> "TaskResult[T]":
from .backend import TaskResult

try:
exception_class = import_string(self.exception_class_path)
except ImportError:
exception_class = None

task_result = TaskResult[T](
db_result=self,
task=self.task,
id=normalize_uuid(self.id),
status=ResultStatus[self.status],
enqueued_at=self.enqueued_at,
started_at=self.started_at,
last_attempted_at=self.started_at,
finished_at=self.finished_at,
args=self.args_kwargs["args"],
kwargs=self.args_kwargs["kwargs"],
backend=self.backend_name,
errors=[],
)

object.__setattr__(task_result, "_exception_class", exception_class)
object.__setattr__(task_result, "_traceback", self.traceback or None)
if self.status == ResultStatus.FAILED:
task_result.errors.append(
TaskError(
exception_class_path=self.exception_class_path,
traceback=self.traceback,
)
)

object.__setattr__(task_result, "_return_value", self.return_value)

return task_result
Expand Down
4 changes: 3 additions & 1 deletion django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ def enqueue(
result = TaskResult[T](
task=task,
id=get_random_id(),
status=ResultStatus.NEW,
status=ResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
Expand Down
22 changes: 17 additions & 5 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
from typing_extensions import ParamSpec

from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.task import ResultStatus, Task, TaskResult
from django_tasks.utils import get_exception_traceback, get_random_id, json_normalize
from django_tasks.task import ResultStatus, Task, TaskError, TaskResult
from django_tasks.utils import (
get_exception_traceback,
get_module_path,
get_random_id,
json_normalize,
)

from .base import BaseTaskBackend

Expand Down Expand Up @@ -39,6 +44,7 @@ def _execute_task(self, task_result: TaskResult) -> None:

object.__setattr__(task_result, "status", ResultStatus.RUNNING)
object.__setattr__(task_result, "started_at", timezone.now())
object.__setattr__(task_result, "last_attempted_at", timezone.now())
task_started.send(sender=type(self), task_result=task_result)

try:
Expand All @@ -56,8 +62,12 @@ def _execute_task(self, task_result: TaskResult) -> None:

object.__setattr__(task_result, "finished_at", timezone.now())

object.__setattr__(task_result, "_traceback", get_exception_traceback(e))
object.__setattr__(task_result, "_exception_class", type(e))
task_result.errors.append(
TaskError(
exception_class_path=get_module_path(type(e)),
traceback=get_exception_traceback(e),
)
)

object.__setattr__(task_result, "status", ResultStatus.FAILED)

Expand All @@ -79,13 +89,15 @@ def enqueue(
task_result = TaskResult[T](
task=task,
id=get_random_id(),
status=ResultStatus.NEW,
status=ResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
Expand Down
53 changes: 36 additions & 17 deletions django_tasks/backends/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,39 @@
from django.core.checks import messages
from django.core.exceptions import SuspiciousOperation
from django.db import transaction
from django.utils.module_loading import import_string
from redis.client import Redis
from rq.job import Callback, JobStatus
from rq.job import Job as BaseJob
from rq.registry import ScheduledJobRegistry
from rq.results import Result
from typing_extensions import ParamSpec

from django_tasks.backends.base import BaseTaskBackend
from django_tasks.exceptions import ResultDoesNotExist
from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.task import DEFAULT_PRIORITY, MAX_PRIORITY, ResultStatus, Task
from django_tasks.task import (
DEFAULT_PRIORITY,
MAX_PRIORITY,
ResultStatus,
Task,
TaskError,
)
from django_tasks.task import TaskResult as BaseTaskResult
from django_tasks.utils import get_module_path, get_random_id

T = TypeVar("T")
P = ParamSpec("P")

RQ_STATUS_TO_RESULT_STATUS = {
JobStatus.QUEUED: ResultStatus.NEW,
JobStatus.QUEUED: ResultStatus.READY,
JobStatus.FINISHED: ResultStatus.SUCCEEDED,
JobStatus.FAILED: ResultStatus.FAILED,
JobStatus.STARTED: ResultStatus.RUNNING,
JobStatus.DEFERRED: ResultStatus.NEW,
JobStatus.SCHEDULED: ResultStatus.NEW,
JobStatus.DEFERRED: ResultStatus.READY,
JobStatus.SCHEDULED: ResultStatus.READY,
JobStatus.STOPPED: ResultStatus.FAILED,
JobStatus.CANCELED: ResultStatus.FAILED,
None: ResultStatus.NEW,
None: ResultStatus.READY,
}


Expand Down Expand Up @@ -91,23 +97,32 @@ def into_task_result(self) -> TaskResult:
status=RQ_STATUS_TO_RESULT_STATUS[self.get_status()],
enqueued_at=self.enqueued_at,
started_at=self.started_at,
last_attempted_at=self.started_at,
finished_at=self.ended_at,
args=list(self.args),
kwargs=self.kwargs,
backend=self.meta["backend_name"],
errors=[],
)

latest_result = self.latest_result()
exception_classes = self.meta.get("_django_tasks_exceptions", []).copy()

if latest_result is not None:
if "exception_class" in self.meta:
object.__setattr__(
task_result,
"_exception_class",
import_string(self.meta["exception_class"]),
rq_results = self.results()

for rq_result in rq_results:
if rq_result.type == Result.Type.FAILED:
task_result.errors.append(
TaskError(
exception_class_path=exception_classes.pop(),
traceback=rq_result.exc_string, # type: ignore[arg-type]
)
)
object.__setattr__(task_result, "_traceback", latest_result.exc_string)
object.__setattr__(task_result, "_return_value", latest_result.return_value)

if rq_results:
object.__setattr__(task_result, "_return_value", rq_results[0].return_value)
object.__setattr__(
task_result, "last_attempted_at", rq_results[0].created_at
)

return task_result

Expand All @@ -120,7 +135,9 @@ def failed_callback(
traceback: TracebackType,
) -> None:
# Smuggle the exception class through meta
job.meta["exception_class"] = get_module_path(exception_class)
job.meta.setdefault("_django_tasks_exceptions", []).append(
get_module_path(exception_class)
)
job.save_meta() # type: ignore[no-untyped-call]

task_result = job.into_task_result()
Expand Down Expand Up @@ -162,13 +179,15 @@ def enqueue(
task_result = TaskResult[T](
task=task,
id=get_random_id(),
status=ResultStatus.NEW,
status=ResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
)

job = queue.create_job(
Expand Down
Loading