Skip to content

Periodic Tasks

This document outlines the automated periodic tasks that run in the background.

Mark Expired Offers

This task identifies and marks offers as "EXPIRED" when their expiration date has passed.

  • Task: offers.tasks.mark_offers_as_expired
  • Schedule: This task is configured to run every hour. The schedule is managed in the database via the Django admin panel under "Periodic Tasks".
  • Description: The task queries for offers that are currently in a "SENT" status and have an expires_at timestamp that is in the past. It then updates the status of these offers to "EXPIRED".
  • Code References:

    Periodic task to mark offers as expired.

    Source code in backend/offers/tasks.py
    @shared_task
    def mark_offers_as_expired() -> None:
        """Periodic task to mark offers as expired."""
        offers_services.mark_offers_as_expired()
    

    The task calls the mark_offers_as_expired service function, which contains the business logic.

    Mark offers as expired if their expiration date has passed.

    Source code in backend/offers/services.py
    def mark_offers_as_expired() -> None:
        """Mark offers as expired if their expiration date has passed."""
        Offer = apps.get_model("offers", "Offer")
        expired_offers = Offer.objects.filter(status=OfferStatus.SENT, expires_at__lt=timezone.now())
    
        expired_offer_ids = list(expired_offers.values_list("id", flat=True))
    
        count = expired_offers.update(status=OfferStatus.EXPIRED)
        logger.info(f"Marked {count} offers as expired.")
    
        for offer in Offer.objects.filter(id__in=expired_offer_ids):
            try:
                send_offer_expired_notification(offer)
            except Exception as exc:
                logger.exception(f"Failed to send expiration notification for offer {offer.pk}: {exc}")
    

    The task operates on the Offer model.

    Bases: LifecycleModelMixin, Model

    Source code in backend/offers/models.py
    class Offer(LifecycleModelMixin, models.Model):
        salesforce_ref = models.CharField(max_length=255, blank=True)
    
        job_application = models.OneToOneField(
            JobApplication,
            on_delete=models.CASCADE,
            related_name="offer",
        )
    
        message = models.TextField(blank=True)
        start_date = models.DateField()
        end_date = models.DateField(null=True, blank=True)
        expiration_days = models.PositiveIntegerField()
        expires_at = models.DateTimeField(null=True, blank=True, editable=False)
    
        status = models.CharField(
            max_length=255,
            choices=OfferStatus.choices,
            default=OfferStatus.DRAFT,
        )
    
        created_at = models.DateTimeField(auto_now_add=True)
        updated_at = models.DateTimeField(auto_now=True)
        accepted_at = models.DateTimeField(null=True, blank=True)
        rejected_at = models.DateTimeField(null=True, blank=True)
        rejection_reason = models.CharField(
            max_length=255,
            choices=OfferRejectionReason.choices,
            blank=True,
        )
        rejection_reason_description = models.CharField(max_length=200, blank=True)
        withdrawn_by = models.ForeignKey(
            "authentication.User",
            on_delete=models.SET_NULL,
            null=True,
            blank=True,
            related_name="withdrawn_offers",
        )
        withdrawn_at = models.DateTimeField(null=True, blank=True)
        withdrawal_reason = models.CharField(max_length=1500, blank=True)
        is_read = models.BooleanField(default=False)
    
        objects = OfferManager()
    
        if TYPE_CHECKING:
            daily_schedules: models.QuerySet["OfferDailySchedule"]
    
        class Meta:
            ordering = ["-pk"]
            verbose_name = "Offer"
            verbose_name_plural = "Offers"
            db_table = "offers"
            indexes = [
                models.Index(fields=["salesforce_ref"]),
            ]
            constraints = [
                models.UniqueConstraint(
                    fields=["job_application"],
                    name="unique_job_application_offer",
                )
            ]
    
        def __str__(self):
            return f"Offer {self.pk} for {self.job_application}"
    
        @property
        def is_expired(self) -> bool:
            """Check if the offer is expired."""
            return self.expires_at is not None and timezone.now() >= self.expires_at
    
        @property
        def company(self):
            return self.job_application.job.company
    
        @property
        def worker(self):
            return self.job_application.contact
    
        @hook(AFTER_CREATE)
        def create_notification(self):
            title = "New Job Offer"
            company = self.job_application.job.company
            job = self.job_application.job
            job_title = job.required_position.title if job.required_position else f"Job {job.job_id}"
            body = f"New job offer from: {company.name} for position: {job_title}."
            body += " Please review the offer details."
            worker = self.job_application.contact
            if hasattr(worker, "user") and worker.user is not None:
                push_enabled, email_enabled, preference_pk = get_notification_preferences(
                    worker.user, NotificationTypes.NEW_JOB_OFFER
                )
    
                AppNotification.objects.create(
                    user=worker.user,
                    title=title,
                    body=body,
                    type=NotificationTypes.NEW_JOB_OFFER,
                    data={
                        "title": title,
                        "body": body,
                        "job_id": str(self.job_application.job.id),
                        "offer_id": str(self.id),
                    },
                    should_send_fcm=push_enabled,
                )
    
                if email_enabled:
                    job_title = (
                        self.job_application.job.required_position.title
                        if self.job_application.job.required_position
                        else f"Job {self.job_application.job.job_id}"
                    )
    
                    send_notification_email(
                        user=worker.user,
                        subject=f"New job opportunity from {company.name}",
                        template_name="new_job_offer.html",
                        context={
                            "preference_pk": preference_pk,
                            "company_name": company.name if company else "Company",
                            "position_name": job_title,
                            "start_date": self.start_date,
                        },
                    )
    
        @hook(AFTER_CREATE)
        def invalidate_tags_on_offer_creation(self):
            """Broadcast cache invalidation when a new offer is created."""
            UserEventsResolver().resolve(self)
    
        @hook(AFTER_UPDATE)
        def invalidate_tags_on_offer_update(self):
            """Broadcast cache invalidation when offer is updated."""
            UserEventsResolver().resolve(self)
    
        @hook(AFTER_UPDATE)
        def mark_as_unread_on_update(self):
            # Skip if is_read is the field being changed
            if self.has_changed("is_read"):
                return
    
            # Skip if worker initiated a status change (accept/reject)
            if self.has_changed("status") and self.status in [OfferStatus.ACCEPTED, OfferStatus.REJECTED]:
                return
    
            # Metadata to ignore
            ignored = {"is_read", "updated_at"}
    
            # Check if any OTHER field changed.
            has_content_changes = any(self.has_changed(f.name) for f in self._meta.fields if f.name not in ignored)
    
            # Only unread if content actually changed AND it is currently read
            if self.is_read and has_content_changes:
                self.is_read = False
                self.save(skip_hooks=True)
    
        @hook(
            AFTER_UPDATE,
            condition=WhenFieldValueIs("status", OfferStatus.ACCEPTED) & WhenFieldHasChanged("status", has_changed=True),
        )
        def create_placement(self):
            Placement.objects.create(
                worker=self.job_application.contact,
                company=self.job_application.job.company,
                job=self.job_application.job,
                start_date=self.start_date,
                end_date=self.end_date,
                offer=self,
            )
    
        @hook(AFTER_UPDATE)
        def send_offer_updated_notification(self):
            """Send notification when offer is updated."""
            # Skip notification for automated or worker-initiated status changes
            if self.has_changed("status") and self.status in [
                OfferStatus.EXPIRED,
                OfferStatus.ACCEPTED,
                OfferStatus.REJECTED,
            ]:
                return
    
            # Skip if is_read field is changing (worker is marking as read)
            if self.has_changed("is_read"):
                return
    
            title = "🔄 Your Offer is Renewed!"
            company = self.job_application.job.company
            position_name = (
                self.job_application.job.required_position.title
                if self.job_application.job.required_position
                else "Job position"
            )
            body = f"{company.name} has edited your offer for {position_name}. Check it out!"
            worker = self.job_application.contact
    
            if hasattr(worker, "user") and worker.user is not None:
                push_enabled, email_enabled, preference_pk = get_notification_preferences(
                    worker.user, NotificationTypes.OFFER_RENEWED
                )
    
                AppNotification.objects.create(
                    user=worker.user,
                    title=title,
                    body=body,
                    type=NotificationTypes.OFFER_RENEWED,
                    data={
                        "title": title,
                        "body": body,
                        "job_id": str(self.job_application.job.pk),
                        "offer_id": str(self.pk),
                    },
                    should_send_fcm=push_enabled,
                )
    
                if email_enabled:
                    offer_url = f"{settings.SITE_URL}/workers/work/offers/{self.pk}"
    
                    send_notification_email(
                        user=worker.user,
                        subject=f"Renewed offer from {company.name}",
                        template_name="offer_renewed.html",
                        context={
                            "preference_pk": preference_pk,
                            "company_name": company.name if company else "Company",
                            "position_name": position_name,
                            "start_date": self.start_date,
                            "offer_url": offer_url,
                        },
                    )
    
        @hook(
            AFTER_UPDATE,
            condition=WhenFieldValueIs("status", OfferStatus.ACCEPTED) & WhenFieldHasChanged("status", has_changed=True),
        )
        def notify_client_on_offer_accepted(self):
            """Notify company employers when a candidate accepts an offer."""
            candidate = self.job_application.contact
            candidate_name = candidate.full_name
            job_id = self.job_application.job.job_id
            title = "Offer Accepted"
            body = (
                f"Good news! {candidate_name} has accepted your offer for "
                f"{job_id} job and Awaiting Placement has been created"
            )
            for user in self.job_application.job.company.user_set.all():
                AppNotification.objects.create(
                    user=user,
                    title=title,
                    body=body,
                    type=ClientNotificationTypes.CLIENT_OFFER_ACCEPTED,
                    data={
                        "title": title,
                        "body": body,
                        "job_id": str(self.job_application.job.pk),
                        "offer_id": str(self.pk),
                        "candidate_name": candidate_name,
                    },
                    should_send_fcm=False,
                )
    
        @hook(
            AFTER_UPDATE,
            condition=WhenFieldValueIs("status", OfferStatus.REJECTED) & WhenFieldHasChanged("status", has_changed=True),
        )
        def notify_client_on_offer_declined(self):
            """Notify company employers when a candidate declines an offer."""
            candidate = self.job_application.contact
            candidate_name = candidate.full_name
            job_id = self.job_application.job.job_id
            title = "Offer Declined"
            body = f"Unfortunately, {candidate_name} has declined your offer for {job_id} job"
            for user in self.job_application.job.company.user_set.all():
                AppNotification.objects.create(
                    user=user,
                    title=title,
                    body=body,
                    type=ClientNotificationTypes.CLIENT_OFFER_DECLINED,
                    data={
                        "title": title,
                        "body": body,
                        "job_id": str(self.job_application.job.pk),
                        "offer_id": str(self.pk),
                        "candidate_name": candidate_name,
                    },
                    should_send_fcm=False,
                )
    

    is_expired property

    Check if the offer is expired.

    invalidate_tags_on_offer_creation()

    Broadcast cache invalidation when a new offer is created.

    Source code in backend/offers/models.py
    @hook(AFTER_CREATE)
    def invalidate_tags_on_offer_creation(self):
        """Broadcast cache invalidation when a new offer is created."""
        UserEventsResolver().resolve(self)
    

    invalidate_tags_on_offer_update()

    Broadcast cache invalidation when offer is updated.

    Source code in backend/offers/models.py
    @hook(AFTER_UPDATE)
    def invalidate_tags_on_offer_update(self):
        """Broadcast cache invalidation when offer is updated."""
        UserEventsResolver().resolve(self)
    

    notify_client_on_offer_accepted()

    Notify company employers when a candidate accepts an offer.

    Source code in backend/offers/models.py
    @hook(
        AFTER_UPDATE,
        condition=WhenFieldValueIs("status", OfferStatus.ACCEPTED) & WhenFieldHasChanged("status", has_changed=True),
    )
    def notify_client_on_offer_accepted(self):
        """Notify company employers when a candidate accepts an offer."""
        candidate = self.job_application.contact
        candidate_name = candidate.full_name
        job_id = self.job_application.job.job_id
        title = "Offer Accepted"
        body = (
            f"Good news! {candidate_name} has accepted your offer for "
            f"{job_id} job and Awaiting Placement has been created"
        )
        for user in self.job_application.job.company.user_set.all():
            AppNotification.objects.create(
                user=user,
                title=title,
                body=body,
                type=ClientNotificationTypes.CLIENT_OFFER_ACCEPTED,
                data={
                    "title": title,
                    "body": body,
                    "job_id": str(self.job_application.job.pk),
                    "offer_id": str(self.pk),
                    "candidate_name": candidate_name,
                },
                should_send_fcm=False,
            )
    

    notify_client_on_offer_declined()

    Notify company employers when a candidate declines an offer.

    Source code in backend/offers/models.py
    @hook(
        AFTER_UPDATE,
        condition=WhenFieldValueIs("status", OfferStatus.REJECTED) & WhenFieldHasChanged("status", has_changed=True),
    )
    def notify_client_on_offer_declined(self):
        """Notify company employers when a candidate declines an offer."""
        candidate = self.job_application.contact
        candidate_name = candidate.full_name
        job_id = self.job_application.job.job_id
        title = "Offer Declined"
        body = f"Unfortunately, {candidate_name} has declined your offer for {job_id} job"
        for user in self.job_application.job.company.user_set.all():
            AppNotification.objects.create(
                user=user,
                title=title,
                body=body,
                type=ClientNotificationTypes.CLIENT_OFFER_DECLINED,
                data={
                    "title": title,
                    "body": body,
                    "job_id": str(self.job_application.job.pk),
                    "offer_id": str(self.pk),
                    "candidate_name": candidate_name,
                },
                should_send_fcm=False,
            )
    

    send_offer_updated_notification()

    Send notification when offer is updated.

    Source code in backend/offers/models.py
    @hook(AFTER_UPDATE)
    def send_offer_updated_notification(self):
        """Send notification when offer is updated."""
        # Skip notification for automated or worker-initiated status changes
        if self.has_changed("status") and self.status in [
            OfferStatus.EXPIRED,
            OfferStatus.ACCEPTED,
            OfferStatus.REJECTED,
        ]:
            return
    
        # Skip if is_read field is changing (worker is marking as read)
        if self.has_changed("is_read"):
            return
    
        title = "🔄 Your Offer is Renewed!"
        company = self.job_application.job.company
        position_name = (
            self.job_application.job.required_position.title
            if self.job_application.job.required_position
            else "Job position"
        )
        body = f"{company.name} has edited your offer for {position_name}. Check it out!"
        worker = self.job_application.contact
    
        if hasattr(worker, "user") and worker.user is not None:
            push_enabled, email_enabled, preference_pk = get_notification_preferences(
                worker.user, NotificationTypes.OFFER_RENEWED
            )
    
            AppNotification.objects.create(
                user=worker.user,
                title=title,
                body=body,
                type=NotificationTypes.OFFER_RENEWED,
                data={
                    "title": title,
                    "body": body,
                    "job_id": str(self.job_application.job.pk),
                    "offer_id": str(self.pk),
                },
                should_send_fcm=push_enabled,
            )
    
            if email_enabled:
                offer_url = f"{settings.SITE_URL}/workers/work/offers/{self.pk}"
    
                send_notification_email(
                    user=worker.user,
                    subject=f"Renewed offer from {company.name}",
                    template_name="offer_renewed.html",
                    context={
                        "preference_pk": preference_pk,
                        "company_name": company.name if company else "Company",
                        "position_name": position_name,
                        "start_date": self.start_date,
                        "offer_url": offer_url,
                    },
                )