Celery là gì
Sau lúc được gợi ý về bài toán chuyển sang sử dụng hàng đợi thay bởi để các service thao tác trực tiếp với database, mình tất cả dành thời gian bài viết liên quan về phong cách thiết kế Queue. Do dự án chạy đa số bằng python yêu cầu tech lead lưu ý sử dụng Celery, một hệ thống cai quản queue phổ biến.
Bạn đang xem: Celery là gì
Kiến trúc sau khi chuyển sang thực hiện queue trong màng lưới hệ thống của bản thân mình sẽ như sau. Một nội dung bài viết khá cụ thể cụ thể về một dạng họa tiết thiết kế queue là message queue đa số người trọn vẹn có thể đọc thêm ở toidicodedao

Application
Một instance được khởi chế tạo ra từ tủ sách Celery được điện thoại tư vấn là applicationNhiều Celery application hoàn toàn có thể cùng sống sót trong một processKhởi tạo nên một celery application :from celery import Celeryapp = Celery ( ) Khi gởi một message cho tới queue, message đó sẽ chỉ cất tên của task cần thực thi .Các celery worker sẽ bản đồ giữa thương hiệu của task với hàm tiến hành task đó, việc mapping bởi vậy được điện thoại tư vấn là task registryapp.taskdef showroom ( x, y ) : return x + y
Tasks
Task vào Celery gồm hai trọng trách chính : định nghĩa mọi gì sẽ xảy ra sau thời điểm một task được điện thoại tư vấn ( giữ hộ đi message ) định nghĩa những gì sẽ xảy ra khi một worker nhận thấy message đóMỗi task gồm một thương hiệu riêng không trùng lặp, tên này sẽ được refer vào message để worker trả toàn có thể tìm được đúng hàm nhằm thực thi. Nếu không định nghĩa tên cho task thì task đó sẽ tiến hành tự để tên phụ thuộc module nhưng task được khái niệm và thương hiệu function của task. Các message của task sẽ không biến thành xóa ngoài queue chừng nào message đó không được một worker giải quyết và xử lý. Một worker hoàn toàn rất có thể giải quyết với xử lý những message, trường hợp worker bị crash nhưng mà chưa giải quyết và xử lý và xử trí hết đầy đủ message kia thì bọn chúng vẫn trả toàn rất có thể được gửi lại cho tới một worker khácCác function của task đề nghị ở tâm lý idempotent : function không khiến ra tác động ảnh hưởng gì bao gồm cả khi gồm bị gọi những lần với cùng một tham số => một task đã thực hiện sẽ bảo đảm an toàn không bị chạy lại đợt tiếp nhữa .
Tạo task
Để sinh sản task họ dùng decorator app.task(name=”create_new_user”)def create_user(username, password):User.objects.create(username=username, password=password)Để task có thể retry chúng ta cũng có thể bound task vào chủ yếu instance của nó
task ( bind = True ) def địa chỉ ( self, x, y ) : logger.info ( self.request.id ) Task cũng hoàn toàn có thể thừa kế
import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo): print(“0!r failed: 1!r”.format(task_id, exc))task(base=MyTask)def add(x, y):raise KeyError()Để biết thêm thông tin và tâm lý của task bạn cũng có thể sử dụng Task.request
Gọi task
Celery cung ứng những API để hotline task sau khi đã định nghĩa bọn chúng ở trên .
Xem thêm: " Vận Động Là Gì ? Nguồn Gốc Và Các Hình Thức Của Vận Động (Câu 5)
apply_async : giữ hộ task message.delay : giữ hộ task messagecalling : task message sẽ không được gửi đi tới worker nhưng mà task sẽ tiến hành thực thi luôn luôn bởi process lúc này .Có một task như sau :app.taskdef showroom ( x, y ) : return x + yĐể điện thoại tư vấn task này tất cả họ sẽ thử dùng 2 method là apply_async cùng delayVới delay vớ cả chúng ta sẽ viết như sau :# task.delay ( arg1, arg2, kwarg1 = ” x ”, kwarg2 = ” y ” ) add.delay ( 10, 5 ) add.delay ( a = 10, b = 5 ) dùng apply_async thì đề xuất viết phức tạp hơn một chút ít ít # task. Apply_async ( args =, kwargs = ” kwarg1 ″ : “ x ”, “ kwarg2 ” : “ y ” ) add. Apply_async ( queue = ” low_priority ”, args = ( 10, 5 ) ) add. Apply_async ( queue = ” high_priority ”, kwargs = ” a ” : 10, “ b ” : 5 ) Về thực tế delay và apply_async là đồng nhất nhưng delay đã gồm sẵn những tùy chỉnh mặc định và tất cả bọn họ chỉ trả toàn rất có thể truyền vào hầu hết tham số đề xuất đã quan niệm trong function của task, còn cùng với apply_async vớ cả bọn họ hoàn toàn rất có thể truyền thêm phần nhiều tham số khác như queue vớ cả họ muốn gửi message vào, …. Best practice là nên thực hiện apply_async để tiện việc config chạy task tùy theo yêu cầu sử dụng .Celery tương trợ việc gọi task theo hình thức chaining, kết quả của task này trả toàn hoàn toàn có thể được truyền vào task tiếp theo
add.apply_async((2, 2), link=add.s(16)) # 20Nhờ vào phương pháp này chúng ta có thể thiết kế callback mang lại task như sauapp.taskdef error_handler(uuid):result = AsyncResult(uuid)exc = result.get(propagate=False)print(“Task 0 raised exception: 1!r2!r”.format( uuid, exc, result.traceback))add.apply_async((2, 2), link_error=error_handler.s())Sử dụng Celery
Cài đặt
pip install – U Celery
Sử dụng
Lựa chọn một số loại message broker tương xứng với dự án Bất Động Sản. Như đã nói trên Celery tương trợ 3 các loại message broker là RabbitMQ, Redis, SQS. Mình sẽ đi sâu vào nghiên cứu và đối chiếu từng các loại message broker trong phần sau về Celery .Tạo một celery worker cùng với task add
from celery Import Celeryapp = Celery(“name of module”, broker=”url_of_broker”)app.taskdef add(x, y):return x + yChạy worker
USD celery – A tasks worker – loglevel = infoGọi task
Xem thêm: Tạo 1 Số Uuid Là Gì ? Nghĩa Của Từ Uuid Vài Thứ Về Uuid
Lưu tác dụng
Cấu hình Celery
Cấu hình khoác định cơ phiên bản của celery :# # Broker settings. Broker_url = “ redis : / / localhost : 6379 / 0 ″ # danh mục of modules khổng lồ import when the Celery worker starts.imports = ( “ myapp.tasks ”, ) # # Using the database khổng lồ store task state and results. Result_backend = “ db + sqlite : / / / results.db ” task_annotations = “ tasks.add ” : “ rate_limit ” : “ 10 / s ” Best practice : sinh sản một tệp tin config riêng mang lại celery celeryconfig.pybroker_url = “ redis : / / localhost : 6379 / 0 : / / ” result_backend = “ rpc : / / ” task_serializer = “ json ” result_serializer = “ json ” accept_content = timezone = “ Europe / Oslo ” enable_utc = Truetask_routes = “ tasks.add ” : “ low-priority ”, # routing một task tới queue ý muốn muốnNgoài biện pháp tạo file config bên trên ra tất cả họ cũng trả toàn rất có thể config trực tiếp bởi application của Celery app.confapp.conf.update ( enable_utc = True, timezone = ” Europe / London ”, ) Tổng kếtCelery không nhất thiết phải config những mà chỉ việc import trường đoản cú module thực hiện trực tiếp như saufrom celery Import Celeryapp = Celery ( “ name of module ”, broker = ” url_of_broker ” ) Worker với client của Celery trả toàn hoàn toàn có thể tự retryMột process của Celery hoàn toàn rất có thể giải quyết và cách xử trí hàng triệu task vào một phút cùng với độ trễ chỉ vài miligiây
Celery hỗ trợ:
Message brokers : RabbitMQRedisSQSXử lý concurrencymultiprocessingmultithreadsingle threadeventlet, geventLưu trữ chức năng trên đầy đủ mạng lưới khối hệ thống : AmqpRedisMemcachedSQLAlchemyAmazon S3File systemSerializationjsonyamlỞ phần sau nội dung bài viết mình đang đi sâu hơn về worker vào Celery và hai một số loại message broker mà Celery tương trợ : SQS – Redis, bên cạnh đó dựng một ứng dụng cơ bạn dạng sử dụng mạng lưới khối hệ thống này .Chuyên mục : Hỏi Đáp