1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/** |
4
|
|
|
* @file |
5
|
|
|
* Drush plugin for RabbitMQ. |
6
|
|
|
*/ |
7
|
|
|
|
8
|
|
|
use Drupal\rabbitmq\ConnectionFactory; |
9
|
|
|
use Drupal\rabbitmq\Consumer; |
10
|
|
|
// Avoid possible conflict with the builtin \[InvalidArgument]Exception. |
11
|
|
|
use Drupal\rabbitmq\Exception\Exception as RabbitMqException; |
12
|
|
|
use Drupal\rabbitmq\Exception\InvalidArgumentException as RabbitMqInvalidArgumentException; |
13
|
|
|
use Drupal\rabbitmq\Exception\InvalidWorkerException; |
14
|
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection; |
15
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
16
|
|
|
use Symfony\Component\Yaml\Yaml; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* Implements hook_drush_command(). |
20
|
|
|
*/ |
21
|
|
|
function rabbitmq_drush_command() { |
|
|
|
|
22
|
|
|
$file = preg_replace('/(inc|php)$/', 'yml', __FILE__); |
23
|
|
|
$config = (new Yaml())->parse(file_get_contents($file)); |
24
|
|
|
$items = $config['commands']; |
25
|
|
|
return $items; |
26
|
|
|
} |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* Command callback for rabbitmq-queue-info. |
30
|
|
|
* |
31
|
|
|
* @param string|null $queueName |
32
|
|
|
* The name of the requested queue. |
33
|
|
|
*/ |
34
|
|
|
function drush_rabbitmq_queue_info($queueName = NULL) { |
35
|
|
|
/** @var \Drupal\rabbitmq\Service\QueueInfo $info */ |
36
|
|
|
$info = (new Drupal())->service('rabbitmq.queue_info'); |
37
|
|
|
$count = $info->count($queueName); |
38
|
|
|
echo (new Yaml())->dump([$queueName => $count]); |
39
|
|
|
} |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Command callback for rabbitmq-test-producer. |
43
|
|
|
*/ |
44
|
|
View Code Duplication |
function drush_rabbitmq_test_producer() { |
45
|
|
|
$connection = new AMQPStreamConnection( |
46
|
|
|
ConnectionFactory::DEFAULT_HOST, |
47
|
|
|
ConnectionFactory::DEFAULT_PORT, |
48
|
|
|
ConnectionFactory::DEFAULT_USER, |
49
|
|
|
ConnectionFactory::DEFAULT_PASS |
50
|
|
|
); |
51
|
|
|
$channel = $connection->channel(); |
52
|
|
|
$routingKey = $queueName = 'hello'; |
53
|
|
|
$channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE); |
54
|
|
|
$message = new AMQPMessage('Hello World!'); |
55
|
|
|
$channel->basic_publish($message, '', $routingKey); |
56
|
|
|
echo " [x] Sent 'Hello World!'\n"; |
57
|
|
|
$channel->close(); |
58
|
|
|
$connection->close(); |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* Command callback for rabbitmq-test-consumer. |
63
|
|
|
*/ |
64
|
|
|
function drush_rabbitmq_test_consumer() { |
65
|
|
|
$connection = new AMQPStreamConnection( |
66
|
|
|
ConnectionFactory::DEFAULT_HOST, |
67
|
|
|
ConnectionFactory::DEFAULT_PORT, |
68
|
|
|
ConnectionFactory::DEFAULT_USER, |
69
|
|
|
ConnectionFactory::DEFAULT_PASS |
70
|
|
|
); |
71
|
|
|
$channel = $connection->channel(); |
72
|
|
|
$queueName = 'hello'; |
73
|
|
|
$channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE); |
74
|
|
|
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n"; |
75
|
|
|
|
76
|
|
|
$callback = function ($msg) { |
77
|
|
|
echo " [x] Received ", $msg->body, "\n"; |
78
|
|
|
}; |
79
|
|
|
|
80
|
|
|
$channel->basic_consume($queueName, '', FALSE, TRUE, FALSE, FALSE, $callback); |
81
|
|
|
|
82
|
|
|
while (count($channel->callbacks)) { |
83
|
|
|
$channel->wait(); |
84
|
|
|
} |
85
|
|
|
$channel->close(); |
86
|
|
|
$connection->close(); |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* Command callback: Create a queue worker. |
91
|
|
|
* |
92
|
|
|
* @param string $queueName |
93
|
|
|
* The name of the queue/ID of the queue worker plugin to wait on. |
94
|
|
|
* |
95
|
|
|
* @return bool |
96
|
|
|
* Did the command succeed ? |
97
|
|
|
*/ |
98
|
|
|
function drush_rabbitmq_worker($queueName) { |
99
|
|
|
/** @var \Drupal\rabbitmq\Consumer $consumer */ |
100
|
|
|
$consumer = (new Drupal())->service('rabbitmq.consumer'); |
101
|
|
|
|
102
|
|
|
// Service might be called from a non-Drush environment, so drush_get_option() |
103
|
|
|
// may not be available to it. |
104
|
|
|
$consumer->setOptionGetter(function (string $name) { |
105
|
|
|
return (int) drush_get_option($name, Consumer::OPTIONS[$name]); |
106
|
|
|
}); |
107
|
|
|
|
108
|
|
|
$queueArgs = ['@name' => $queueName]; |
109
|
|
|
|
110
|
|
|
drupal_register_shutdown_function(function () use ($consumer, $queueName) { |
111
|
|
|
$consumer->shutdownQueue($queueName); |
112
|
|
|
}); |
113
|
|
|
|
114
|
|
|
try { |
115
|
|
|
$consumer->logStart(); |
116
|
|
|
$consumer->consumeQueueApi($queueName); |
117
|
|
|
} |
118
|
|
|
catch (InvalidWorkerException $e) { |
119
|
|
|
return drush_set_error(dt("Worker for queue @name does not implement the worker interface.", $queueArgs)); |
120
|
|
|
} |
121
|
|
|
catch (RabbitMqInvalidArgumentException $e) { |
122
|
|
|
return drush_set_error($e->getMessage()); |
123
|
|
|
} |
124
|
|
|
catch (RabbitMqException $e) { |
125
|
|
|
return drush_set_error(dt("Could not obtain channel for queue.", $queueArgs)); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
return TRUE; |
129
|
|
|
} |
130
|
|
|
|
Our type inference engine in quite powerful, but sometimes the code does not provide enough clues to go by. In these cases we request you to add a
@return
annotation as described here.