|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
|
|
3
|
|
|
import pika |
|
4
|
|
|
import uuid |
|
5
|
|
|
|
|
6
|
|
|
class RPCServer: |
|
7
|
|
|
def __init__(self, config): |
|
8
|
|
|
self.config = config |
|
9
|
|
|
print self.config['RabbitMQ']['host'] |
|
10
|
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters( |
|
11
|
|
|
host=self.config['RabbitMQ']['host'], |
|
12
|
|
|
)) |
|
13
|
|
|
|
|
14
|
|
|
self.channel = self.connection.channel() |
|
15
|
|
|
self.channel.queue_declare(queue=self.config['queue']) |
|
16
|
|
|
self.channel.basic_qos(prefetch_count=1) |
|
17
|
|
|
|
|
18
|
|
|
def consume(self, response): |
|
19
|
|
|
self.response = response |
|
20
|
|
|
self.channel.basic_consume(self.on_request, self.config['queue']) |
|
21
|
|
|
|
|
22
|
|
|
print(" [x] Awaiting RPC requests") |
|
23
|
|
|
self.channel.start_consuming() |
|
24
|
|
|
|
|
25
|
|
|
def on_request(ch, method, props, body): |
|
26
|
|
|
reply = self.response(body) |
|
27
|
|
|
|
|
28
|
|
|
ch.basic_publish( |
|
29
|
|
|
exchange='', |
|
30
|
|
|
routing_key=props.reply_to, |
|
31
|
|
|
properties=pika.BasicProperties( |
|
32
|
|
|
correlation_id = props.correlation_id, |
|
33
|
|
|
), |
|
34
|
|
|
body=str(reply), |
|
35
|
|
|
) |
|
36
|
|
|
ch.basic_ack(delivery_tag = method.delivery_tag) |
|
37
|
|
|
|
|
38
|
|
|
class RPCClient: |
|
39
|
|
|
def __init__(self, config): |
|
40
|
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters( |
|
41
|
|
|
host=self.config['RabbitMQ']['host'], |
|
42
|
|
|
)) |
|
43
|
|
|
|
|
44
|
|
|
self.channel = self.connection.channel() |
|
45
|
|
|
|
|
46
|
|
|
result = self.channel.queue_declare(exclusive=True) |
|
47
|
|
|
self.reply_queue = result.method.queue |
|
48
|
|
|
|
|
49
|
|
|
self.channel.basic_consume( |
|
50
|
|
|
self.on_response, |
|
51
|
|
|
no_ack=True, |
|
52
|
|
|
queue=self.reply_queue |
|
53
|
|
|
) |
|
54
|
|
|
|
|
55
|
|
|
def on_response(self, ch, method, props, body): |
|
56
|
|
|
if self.corr_id == props.correlation_id: |
|
57
|
|
|
self.reply = body |
|
58
|
|
|
|
|
59
|
|
|
def request(self, query): |
|
60
|
|
|
self.reply = None |
|
61
|
|
|
self.corr_id = str(uuid.uuid4()) |
|
62
|
|
|
self.channel.basic_publish(exchange='', |
|
63
|
|
|
routing_key=self.config['queue'], |
|
64
|
|
|
properties=pika.BasicProperties( |
|
65
|
|
|
reply_to = self.reply_queue, |
|
66
|
|
|
correlation_id = self.corr_id, |
|
67
|
|
|
), |
|
68
|
|
|
body=str(query) |
|
69
|
|
|
) |
|
70
|
|
|
while self.reply is None: |
|
71
|
|
|
self.connection.process_data_events() |
|
72
|
|
|
return self.reply |
|
73
|
|
|
|
|
74
|
|
|
|