© 2020, Developed by Hieu Dev

Tự động thực thi tác vụ theo lịch trong Django Rest Framework với APScheduler

Trong một dự án phần mềm thực tế, bạn sẽ không thể không tránh khỏi việc gặp các task liên quan đến các tác vụ chạy ngầm, hay một công việc sẽ tự động thực thi theo lịch trình. 

Tự động thực thi tác vụ theo lịch trong Django Rest Framework với APScheduler

Trong Django, khi bạn tìm hiểu chức năng này, bạn cũng dễ dàng tìm thấy django-crontab chẳng hạn, nhưng có một điểm là django-crontab không hỗ trợ trên Windows. Vì thế, mình giới thiệu bạn APSchedule để thực hiện thực thi tác vụ theo lịch trong DRF và tất nhiên chúng cũng được sử dụng windows.

APScheduler

APScheduler là một thư viện Python cho phép bạn tạo các lập lịch theo chỉ một lần chỉ định hoặc định kỳ. Bạn có thể thêm công việc mới hoặc xóa nhanh công việc cũ tùy ý. 

Nếu bạn lưu trữ các công việc của mình trong cơ sở dữ liệu, chúng cũng sẽ tồn tại khi trình lập lịch khởi động lại và duy trì trạng thái của chúng. Khi bộ lập lịch được khởi động lại, nó sẽ chạy tất cả các công việc mà lẽ ra nó phải chạy.

Ví dụ như trong các sản phẩm (product) sẽ có ngày hết hạn (expire), bạn muốn khi các sản phẩm hết hạn sẽ tự động xoá hoặc là ẩn các sản phẩm đó đi. Hoặc là bạn cũng có thể tạo một tác vụ trước 1 tháng trước ngày hết hạn, sẽ thông báo để admin. APScheduler có lẽ là những gì bạn đang tìm kiếm.

Thiết lập

Trước tiên để cài đặt APScheduler, bạn cài đặt bằng pip command sau:

$ pip install apscheduler


Tiếp theo, bạn tạo file task.py trong app và thiết lập code sau:

from apscheduler.schedulers.background import BackgroundScheduler
from pytz import utc
from products.views import ListProductViewSet

scheduler = BackgroundScheduler(timezone=utc)

def start():
    productVietSet = ListProductViewSet()
    scheduler.add_job(productVietSet.say_hello, 'interval', minutes=1/2)
    print("Schedule called")
    scheduler.start()


Như code trên, ta gọi đến ListProductViewSet từ view.py để gọi đến phương thức say_hello() sau mỗi nửa phút. Vì vậy, bây giờ bạn cần định nghĩa phương thức say_hello() trong view.py đơn giản như sau:

class ListProductViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
    ...
    
    def say_hello(self):
        print("Hello Hieu.Tech")


Sau khi đã định nghĩa phương thức, bạn cần update code trong app.py như sau:

 
from django.apps import AppConfig


class ProductsConfig(AppConfig):

    def ready(self):
        print("Starting Scheduler ...")
        from . import task
        task.start()

Như vậy, sau cứ mỗi nửa phút phương thức say_hello() sẽ được chạy, và kết quả sẽ như sau:


Lời kết

Các bước thực hiện rất dễ dàng, và bạn cũng thể áp dụng phù hợp với dự án của mình. Các bạn có thể xem nhiều hơn về thư viện như link bên dưới:


Chúc các bạn thành công!

Hieu Ho.

2 Nhận xét

  1. Có ga vào nhìn chất quá Hiếu, site này view tốt ko e?

    Trả lờiXóa
  2. Vẫn chưa tốt lắm anh, vẫn đang học hỏi để seo lên

    Trả lờiXóa
Mới hơn Cũ hơn