Total Complexity | 2 |
Total Lines | 39 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | '''http://emmanuel-klinger.net/distributed-task-queues-for-machine-learning-in-python-celery-rabbitmq-redis.html''' |
||
2 | |||
3 | import time |
||
4 | import pika |
||
5 | from backend.fcmapp import ApplicationService |
||
6 | |||
7 | DURABLE = False |
||
8 | APP = ApplicationService() |
||
9 | |||
10 | CONNECTION = APP.bus.connection() |
||
11 | CHANNEL = CONNECTION.channel() |
||
12 | CHANNEL.queue_declare(queue='task_queue', durable=DURABLE) |
||
13 | CHANNEL.basic_qos(prefetch_count=1) |
||
14 | |||
15 | N = 100000 |
||
16 | START = time.time() |
||
17 | for k in range(N): |
||
18 | CHANNEL.basic_publish(exchange='', |
||
19 | routing_key='task_queue', |
||
20 | body=str((1, k)), |
||
21 | properties=pika.BasicProperties(delivery_mode=2, )) |
||
22 | SEND_FINISH = time.time() |
||
23 | |||
24 | CHANNELR = CONNECTION.channel() |
||
25 | CHANNELR.queue_declare(queue='result_queue', durable=DURABLE) |
||
26 | |||
27 | k = 0 |
||
28 | def callback(chan, method, properties, body): |
||
29 | global k |
||
30 | chan.basic_ack(delivery_tag=method.delivery_tag) |
||
31 | if k == N-1: |
||
32 | end = time.time() |
||
33 | print("rabbit", SEND_FINISH - START, end - START) |
||
34 | k += 1 |
||
35 | |||
36 | CHANNELR.basic_qos(prefetch_count=1) |
||
37 | CHANNELR.basic_consume(callback, queue='result_queue') |
||
38 | CHANNELR.start_consuming() |
||
39 |