# Copyright 2019, 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for the db application."""
import copy
import logging
import secrets
from django.core.validators import (
MaxLengthValidator,
MinLengthValidator,
)
from django.db import IntegrityError, models, transaction
from django.db.models import Count, JSONField, Q
from django.utils import timezone
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from debusine.server import notifications
logger = logging.getLogger(__name__)
[docs]class TokenManager(models.Manager):
"""Manager for Token model."""
[docs] def get_tokens(self, owner=None, key=None):
"""
Return all the tokens filtered by a specific owner and/or token.
To avoid filtering by owner or token set them to None
"""
tokens = self.all()
if owner:
tokens = tokens.filter(owner=owner)
if key:
tokens = tokens.filter(key=key)
return tokens
[docs] def get_token_or_none(self, token_key):
"""Return the token with token_key or None."""
try:
return self.select_related('worker').get(key=token_key)
except Token.DoesNotExist:
return None
[docs]class Token(models.Model):
"""
Database model of a token.
A token contains a key and other related data. It's used as a shared
key between debusine server and clients (workers).
This token model is very similar to rest_framework.authtoken.models.Token.
The bigger difference is that debusine's token's owner is a CharField,
the rest_framework owner is a OneToOne foreign key to a user.
"""
key = models.CharField(
max_length=64,
unique=True,
verbose_name='Hexadecimal key, length is 64 chars',
validators=[MaxLengthValidator(64), MinLengthValidator(64)],
)
owner = models.CharField(
max_length=150, verbose_name='Owner, for traceability'
)
comment = models.CharField(
max_length=100,
default='',
verbose_name='Reason that this token was created',
)
created_at = models.DateTimeField(auto_now_add=True)
enabled = models.BooleanField(default=False)
[docs] def save(self, *args, **kwargs):
"""Save the token. If key is empty it generates a key."""
if not self.key:
self.key = self._generate_key()
super().save(*args, **kwargs)
[docs] def enable(self):
"""Enable the token and save it."""
self.enabled = True
self.save()
[docs] def disable(self):
"""Disable the token and save it."""
self.enabled = False
self.save()
notifications.notify_worker_token_disabled(self)
def __str__(self):
"""Return the key of the Token."""
return self.key
@classmethod
def _generate_key(cls):
"""Create and return a key."""
return secrets.token_hex(32)
objects = TokenManager()
[docs]class WorkerManager(models.Manager):
"""Manager for Worker model."""
[docs] def connected(self):
"""Return connected workers."""
return Worker.objects.filter(connected_at__isnull=False).order_by(
'connected_at'
)
[docs] def waiting_for_work_request(self):
"""
Return workers that can be assigned a new work request.
The workers without any associated pending or running work request
don't have anything to run right now and are thus waiting for a work
request.
Worker's token must be enabled.
"""
running_work_request_count = Count(
'assigned_work_requests',
filter=Q(
assigned_work_requests__status__in=[
WorkRequest.Statuses.RUNNING,
WorkRequest.Statuses.PENDING,
]
),
)
workers = (
Worker.objects.filter(connected_at__isnull=False)
.order_by('connected_at')
.annotate(count_running=running_work_request_count)
.filter(count_running=0)
.filter(token__enabled=True)
)
return workers
@staticmethod
def _generate_unique_name(name, counter):
"""Return name slugified adding "-counter" if counter != 1."""
new_name = slugify(name.replace('.', '-'))
if counter != 1:
new_name += f'-{counter}'
return new_name
[docs] @classmethod
def create_with_fqdn(cls, fqdn, token):
"""Return a new Worker with its name based on fqdn, with token."""
counter = 1
while True:
name = cls._generate_unique_name(fqdn, counter)
try:
with transaction.atomic():
return Worker.objects.create(
name=name, token=token, registered_at=timezone.now()
)
except IntegrityError:
counter += 1
[docs] def get_worker_by_token_key_or_none(self, token_key):
"""Return a Worker identified by its associated secret token."""
try:
return Worker.objects.get(token__key=token_key)
except Worker.DoesNotExist:
return None
[docs] def get_worker_or_none(self, worker_name):
"""Return the worker with worker_name or None."""
try:
return self.get(name=worker_name)
except Worker.DoesNotExist:
return None
[docs]class Worker(models.Model):
"""Database model of a worker."""
name = models.SlugField(
unique=True,
help_text='Human readable name of the worker based on the FQDN',
)
registered_at = models.DateTimeField()
connected_at = models.DateTimeField(blank=True, null=True)
# This is the token used by the Worker to authenticate
# Users have their own tokens - this is specific to a single worker.
token = models.OneToOneField(
Token, on_delete=models.PROTECT, related_name="worker"
)
static_metadata = JSONField(default=dict, blank=True)
dynamic_metadata = JSONField(default=dict, blank=True)
dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
[docs] def mark_disconnected(self):
"""Update and save relevant Worker fields after disconnecting."""
self.connected_at = None
self.save()
[docs] def mark_connected(self):
"""Update and save relevant Worker fields after connecting."""
self.connected_at = timezone.now()
self.save()
[docs] def connected(self):
"""Return True if the Worker is connected."""
return self.connected_at is not None
def __str__(self):
"""Return the name of the Worker."""
return self.name
objects = WorkerManager()
[docs]class WorkRequestManager(models.Manager):
"""Manager for WorkRequest model."""
[docs] def pending(self, exclude_assigned=False, worker=None):
"""
Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status.
QuerySet is ordered by created_at.
Filter out the assigned pending ones if exclude_assigned=True,
and include only the WorkRequest for worker.
PENDING is the default status of a task on creation.
"""
if exclude_assigned and worker is not None:
raise ValueError("Cannot exclude_assigned and filter by worker")
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING)
if exclude_assigned:
qs = qs.exclude(worker__isnull=False)
if worker is not None:
qs = qs.filter(worker=worker)
qs = qs.order_by('created_at')
return qs
[docs] def running(self, worker=None):
"""Return a QuerySet of tasks in running status."""
qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING)
if worker is not None:
qs = qs.filter(worker=worker)
return qs
[docs] def running_or_pending_exists(self, worker) -> bool:
"""Return True if there are running or pending work requests."""
return (
self.running(worker=worker) | self.pending(worker=worker)
).exists()
[docs] def completed(self):
"""Return a QuerySet of tasks in completed status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
[docs] def aborted(self):
"""Return a QuerySet of tasks in aborted status."""
return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
[docs]class WorkRequest(models.Model):
"""
Database model of a request to execute a task.
Time-consuming operations offloaded to Workers and using
Artifacts (and associated Files) as input and output.
Submission API needs to check if the request is valid using
ontological rules - e.g. whether the specified distribution for
a build task exists.
Avoid exposing the status of tasks to the admin interface to avoid
runaway changes whilst the scheduler process is running.
The WorkRequest uses the non-Django tasks module to do the checks on
whether a task can run on a particular worker.
WorkRequest State Machine
=========================
New WorkRequest database entries default to
``WorkRequest.Statuses.PENDING``.
Once the WorkRequest is assigned to a worker and is running starts running
the status is changed to ``WorkRequest.Statuses.RUNNING``.
If the WorkRequest is aborted, the Scheduled.Task status is
``WorkRequest.Statuses.ABORTED``.
If the task finish on the Worker the WorkRequest status will be
``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set,
``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``.
.. graphviz::
digraph {
Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED;
Statuses_PENDING -> Statuses_ABORTED;
Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED;
}
``WorkRequest.started_at`` is set when the WorkRequest moves from
``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``.
``WorkRequest.completed_at`` is set when the Task moves from
``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``.
"""
objects = WorkRequestManager()
[docs] class Statuses(models.TextChoices):
PENDING = "pending", _("Pending")
RUNNING = "running", _("Running")
COMPLETED = "completed", _("Completed")
ABORTED = "aborted", _("Aborted")
[docs] class Results(models.TextChoices):
NONE = "", ""
SUCCESS = "success", _("Success")
FAILURE = "failure", _("Failure")
ERROR = "error", _("Error")
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True)
completed_at = models.DateTimeField(blank=True, null=True)
status = models.CharField(
max_length=9,
choices=Statuses.choices,
default=Statuses.PENDING,
editable=False,
)
result = models.CharField(
max_length=7,
choices=Results.choices,
default=Results.NONE,
editable=False,
)
# any one work request can only be on one worker
# even if the worker can handle multiple work request.
worker = models.ForeignKey(
Worker,
null=True,
on_delete=models.CASCADE,
related_name="assigned_work_requests",
)
task_name = models.CharField(
max_length=100, verbose_name='Name of the task to execute'
)
task_data = JSONField(default=dict, blank=True)
def __str__(self):
"""Return the name of the WorkRequest."""
return self.task_name
[docs] def mark_running(self):
"""Worker has begun executing the task."""
if self.worker is None:
logger.debug(
"Cannot mark WorkRequest %s as running: it does not have "
"an assigned worker",
self.pk,
)
return False
if self.status == self.Statuses.RUNNING:
# It was already running - nothing to do
return True
if self.status != self.Statuses.PENDING:
logger.debug(
"Cannot mark as running - current status %s", self.status
)
return False
work_request_running_for_worker = WorkRequest.objects.running(
worker=self.worker
).first()
# There is a possible race condition here. This check (and other
# checks in this class) currently help to avoid development mistakes
# not database full integrity
if work_request_running_for_worker is not None:
logger.debug(
"Cannot mark WorkRequest %s as running - the assigned worker "
"%s is running another WorkRequest %s",
self.pk,
self.worker,
work_request_running_for_worker,
)
return False
self.started_at = timezone.now()
self.status = self.Statuses.RUNNING
self.save()
logger.debug("Marked WorkRequest %s as running", self.pk)
return True
[docs] def mark_completed(self, result):
"""Worker has finished executing the task."""
if self.status != self.Statuses.RUNNING:
logger.debug(
"Cannot mark WorkRequest %s as completed: current status is %s",
self.pk,
self.status,
)
return False
self.result = result
self.completed_at = timezone.now()
self.status = self.Statuses.COMPLETED
self.save()
logger.debug("Marked WorkRequest %s as completed", self.pk)
return True
[docs] def mark_aborted(self):
"""
Worker has aborted the task after request from UI.
Task will typically be in CREATED or RUNNING status.
"""
self.completed_at = timezone.now()
self.status = self.Statuses.ABORTED
self.save()
logger.debug(
"Marked WorkRequest %s as aborted (from status %s)",
self.pk,
self.status,
)
return True
[docs] def assign_worker(self, worker):
"""Assign worker and save it."""
self.worker = worker
self.save()
notifications.notify_work_request_assigned(self)
@property
def duration(self):
"""Return duration in seconds between started_at and completed_at."""
if self.started_at and self.completed_at:
return (self.completed_at - self.started_at).total_seconds()
else:
return None