Source code for debusine.db.models

# Copyright 2019, 2021-2022 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for the db application."""
import copy
import logging
import secrets

from django.core.validators import (
    MaxLengthValidator,
    MinLengthValidator,
)
from django.db import IntegrityError, models, transaction
from django.db.models import Count, JSONField, Q
from django.utils import timezone
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _

from debusine.server import notifications

logger = logging.getLogger(__name__)


[docs]class TokenManager(models.Manager): """Manager for Token model."""
[docs] def get_tokens(self, owner=None, key=None): """ Return all the tokens filtered by a specific owner and/or token. To avoid filtering by owner or token set them to None """ tokens = self.all() if owner: tokens = tokens.filter(owner=owner) if key: tokens = tokens.filter(key=key) return tokens
[docs] def get_token_or_none(self, token_key): """Return the token with token_key or None.""" try: return self.select_related('worker').get(key=token_key) except Token.DoesNotExist: return None
[docs]class Token(models.Model): """ Database model of a token. A token contains a key and other related data. It's used as a shared key between debusine server and clients (workers). This token model is very similar to rest_framework.authtoken.models.Token. The bigger difference is that debusine's token's owner is a CharField, the rest_framework owner is a OneToOne foreign key to a user. """ key = models.CharField( max_length=64, unique=True, verbose_name='Hexadecimal key, length is 64 chars', validators=[MaxLengthValidator(64), MinLengthValidator(64)], ) owner = models.CharField( max_length=150, verbose_name='Owner, for traceability' ) comment = models.CharField( max_length=100, default='', verbose_name='Reason that this token was created', ) created_at = models.DateTimeField(auto_now_add=True) enabled = models.BooleanField(default=False)
[docs] def save(self, *args, **kwargs): """Save the token. If key is empty it generates a key.""" if not self.key: self.key = self._generate_key() super().save(*args, **kwargs)
[docs] def enable(self): """Enable the token and save it.""" self.enabled = True self.save()
[docs] def disable(self): """Disable the token and save it.""" self.enabled = False self.save() notifications.notify_worker_token_disabled(self)
def __str__(self): """Return the key of the Token.""" return self.key @classmethod def _generate_key(cls): """Create and return a key.""" return secrets.token_hex(32) objects = TokenManager()
[docs]class WorkerManager(models.Manager): """Manager for Worker model."""
[docs] def connected(self): """Return connected workers.""" return Worker.objects.filter(connected_at__isnull=False).order_by( 'connected_at' )
[docs] def waiting_for_work_request(self): """ Return workers that can be assigned a new work request. The workers without any associated pending or running work request don't have anything to run right now and are thus waiting for a work request. Worker's token must be enabled. """ running_work_request_count = Count( 'assigned_work_requests', filter=Q( assigned_work_requests__status__in=[ WorkRequest.Statuses.RUNNING, WorkRequest.Statuses.PENDING, ] ), ) workers = ( Worker.objects.filter(connected_at__isnull=False) .order_by('connected_at') .annotate(count_running=running_work_request_count) .filter(count_running=0) .filter(token__enabled=True) ) return workers
@staticmethod def _generate_unique_name(name, counter): """Return name slugified adding "-counter" if counter != 1.""" new_name = slugify(name.replace('.', '-')) if counter != 1: new_name += f'-{counter}' return new_name
[docs] @classmethod def create_with_fqdn(cls, fqdn, token): """Return a new Worker with its name based on fqdn, with token.""" counter = 1 while True: name = cls._generate_unique_name(fqdn, counter) try: with transaction.atomic(): return Worker.objects.create( name=name, token=token, registered_at=timezone.now() ) except IntegrityError: counter += 1
[docs] def get_worker_by_token_key_or_none(self, token_key): """Return a Worker identified by its associated secret token.""" try: return Worker.objects.get(token__key=token_key) except Worker.DoesNotExist: return None
[docs] def get_worker_or_none(self, worker_name): """Return the worker with worker_name or None.""" try: return self.get(name=worker_name) except Worker.DoesNotExist: return None
[docs]class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, on_delete=models.PROTECT, related_name="worker" ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True)
[docs] def mark_disconnected(self): """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None self.save()
[docs] def mark_connected(self): """Update and save relevant Worker fields after connecting.""" self.connected_at = timezone.now() self.save()
[docs] def connected(self): """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def metadata(self) -> dict: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict): """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at = timezone.now() self.save()
def __str__(self): """Return the name of the Worker.""" return self.name objects = WorkerManager()
[docs]class WorkRequestManager(models.Manager): """Manager for WorkRequest model."""
[docs] def pending(self, exclude_assigned=False, worker=None): """ Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status. QuerySet is ordered by created_at. Filter out the assigned pending ones if exclude_assigned=True, and include only the WorkRequest for worker. PENDING is the default status of a task on creation. """ if exclude_assigned and worker is not None: raise ValueError("Cannot exclude_assigned and filter by worker") qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING) if exclude_assigned: qs = qs.exclude(worker__isnull=False) if worker is not None: qs = qs.filter(worker=worker) qs = qs.order_by('created_at') return qs
[docs] def running(self, worker=None): """Return a QuerySet of tasks in running status.""" qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING) if worker is not None: qs = qs.filter(worker=worker) return qs
[docs] def running_or_pending_exists(self, worker) -> bool: """Return True if there are running or pending work requests.""" return ( self.running(worker=worker) | self.pending(worker=worker) ).exists()
[docs] def completed(self): """Return a QuerySet of tasks in completed status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED)
[docs] def aborted(self): """Return a QuerySet of tasks in aborted status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED)
[docs]class WorkRequest(models.Model): """ Database model of a request to execute a task. Time-consuming operations offloaded to Workers and using Artifacts (and associated Files) as input and output. Submission API needs to check if the request is valid using ontological rules - e.g. whether the specified distribution for a build task exists. Avoid exposing the status of tasks to the admin interface to avoid runaway changes whilst the scheduler process is running. The WorkRequest uses the non-Django tasks module to do the checks on whether a task can run on a particular worker. WorkRequest State Machine ========================= New WorkRequest database entries default to ``WorkRequest.Statuses.PENDING``. Once the WorkRequest is assigned to a worker and is running starts running the status is changed to ``WorkRequest.Statuses.RUNNING``. If the WorkRequest is aborted, the Scheduled.Task status is ``WorkRequest.Statuses.ABORTED``. If the task finish on the Worker the WorkRequest status will be ``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set, ``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``. .. graphviz:: digraph { Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_ABORTED; Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED; } ``WorkRequest.started_at`` is set when the WorkRequest moves from ``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``. ``WorkRequest.completed_at`` is set when the Task moves from ``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``. """ objects = WorkRequestManager()
[docs] class Statuses(models.TextChoices): PENDING = "pending", _("Pending") RUNNING = "running", _("Running") COMPLETED = "completed", _("Completed") ABORTED = "aborted", _("Aborted")
[docs] class Results(models.TextChoices): NONE = "", "" SUCCESS = "success", _("Success") FAILURE = "failure", _("Failure") ERROR = "error", _("Error")
created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(blank=True, null=True) completed_at = models.DateTimeField(blank=True, null=True) status = models.CharField( max_length=9, choices=Statuses.choices, default=Statuses.PENDING, editable=False, ) result = models.CharField( max_length=7, choices=Results.choices, default=Results.NONE, editable=False, ) # any one work request can only be on one worker # even if the worker can handle multiple work request. worker = models.ForeignKey( Worker, null=True, on_delete=models.CASCADE, related_name="assigned_work_requests", ) task_name = models.CharField( max_length=100, verbose_name='Name of the task to execute' ) task_data = JSONField(default=dict, blank=True) def __str__(self): """Return the name of the WorkRequest.""" return self.task_name
[docs] def mark_running(self): """Worker has begun executing the task.""" if self.worker is None: logger.debug( "Cannot mark WorkRequest %s as running: it does not have " "an assigned worker", self.pk, ) return False if self.status == self.Statuses.RUNNING: # It was already running - nothing to do return True if self.status != self.Statuses.PENDING: logger.debug( "Cannot mark as running - current status %s", self.status ) return False work_request_running_for_worker = WorkRequest.objects.running( worker=self.worker ).first() # There is a possible race condition here. This check (and other # checks in this class) currently help to avoid development mistakes # not database full integrity if work_request_running_for_worker is not None: logger.debug( "Cannot mark WorkRequest %s as running - the assigned worker " "%s is running another WorkRequest %s", self.pk, self.worker, work_request_running_for_worker, ) return False self.started_at = timezone.now() self.status = self.Statuses.RUNNING self.save() logger.debug("Marked WorkRequest %s as running", self.pk) return True
[docs] def mark_completed(self, result): """Worker has finished executing the task.""" if self.status != self.Statuses.RUNNING: logger.debug( "Cannot mark WorkRequest %s as completed: current status is %s", self.pk, self.status, ) return False self.result = result self.completed_at = timezone.now() self.status = self.Statuses.COMPLETED self.save() logger.debug("Marked WorkRequest %s as completed", self.pk) return True
[docs] def mark_aborted(self): """ Worker has aborted the task after request from UI. Task will typically be in CREATED or RUNNING status. """ self.completed_at = timezone.now() self.status = self.Statuses.ABORTED self.save() logger.debug( "Marked WorkRequest %s as aborted (from status %s)", self.pk, self.status, ) return True
[docs] def assign_worker(self, worker): """Assign worker and save it.""" self.worker = worker self.save() notifications.notify_work_request_assigned(self)
@property def duration(self): """Return duration in seconds between started_at and completed_at.""" if self.started_at and self.completed_at: return (self.completed_at - self.started_at).total_seconds() else: return None