1
|
|
|
#!/usr/bin/env python |
2
|
|
|
|
3
|
|
|
import pika |
4
|
|
|
import uuid |
5
|
|
|
|
6
|
|
|
class RPCServer: |
7
|
|
|
def __init__(self, config): |
8
|
|
|
self.config = config |
9
|
|
|
print self.config['RabbitMQ']['host'] |
10
|
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters( |
11
|
|
|
host=self.config['RabbitMQ']['host'], |
12
|
|
|
)) |
13
|
|
|
|
14
|
|
|
self.channel = self.connection.channel() |
15
|
|
|
self.channel.queue_declare(queue=self.config['queue']) |
16
|
|
|
self.channel.basic_qos(prefetch_count=1) |
17
|
|
|
|
18
|
|
|
def consume(self, response): |
19
|
|
|
self.response = response |
20
|
|
|
self.channel.basic_consume(self.on_request, self.config['queue']) |
21
|
|
|
|
22
|
|
|
print(" [x] Awaiting RPC requests") |
23
|
|
|
self.channel.start_consuming() |
24
|
|
|
|
25
|
|
|
def on_request(ch, method, props, body): |
26
|
|
|
reply = self.response(body) |
27
|
|
|
|
28
|
|
|
ch.basic_publish( |
29
|
|
|
exchange='', |
30
|
|
|
routing_key=props.reply_to, |
31
|
|
|
properties=pika.BasicProperties( |
32
|
|
|
correlation_id = props.correlation_id, |
33
|
|
|
), |
34
|
|
|
body=str(reply), |
35
|
|
|
) |
36
|
|
|
ch.basic_ack(delivery_tag = method.delivery_tag) |
37
|
|
|
|
38
|
|
|
class RPCClient: |
39
|
|
|
def __init__(self, config): |
40
|
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters( |
41
|
|
|
host=self.config['RabbitMQ']['host'], |
42
|
|
|
)) |
43
|
|
|
|
44
|
|
|
self.channel = self.connection.channel() |
45
|
|
|
|
46
|
|
|
result = self.channel.queue_declare(exclusive=True) |
47
|
|
|
self.reply_queue = result.method.queue |
48
|
|
|
|
49
|
|
|
self.channel.basic_consume( |
50
|
|
|
self.on_response, |
51
|
|
|
no_ack=True, |
52
|
|
|
queue=self.reply_queue |
53
|
|
|
) |
54
|
|
|
|
55
|
|
|
def on_response(self, ch, method, props, body): |
56
|
|
|
if self.corr_id == props.correlation_id: |
57
|
|
|
self.reply = body |
58
|
|
|
|
59
|
|
|
def request(self, query): |
60
|
|
|
self.reply = None |
61
|
|
|
self.corr_id = str(uuid.uuid4()) |
62
|
|
|
self.channel.basic_publish(exchange='', |
63
|
|
|
routing_key=self.config['queue'], |
64
|
|
|
properties=pika.BasicProperties( |
65
|
|
|
reply_to = self.reply_queue, |
66
|
|
|
correlation_id = self.corr_id, |
67
|
|
|
), |
68
|
|
|
body=str(query) |
69
|
|
|
) |
70
|
|
|
while self.reply is None: |
71
|
|
|
self.connection.process_data_events() |
72
|
|
|
return self.reply |
73
|
|
|
|
74
|
|
|
|