Passed
Push — 8.x-2.x ( 1a4044...c40811 )
by Zach
14:39 queued 12s
created

rabbitmq.drush.inc (1 issue)

Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/**
4
 * @file
5
 * Drush plugin for RabbitMQ.
6
 */
7
8
use Drupal\rabbitmq\ConnectionFactory;
9
use Drupal\rabbitmq\Consumer;
10
// Avoid possible conflict with the builtin \[InvalidArgument]Exception.
11
use Drupal\rabbitmq\Exception\Exception as RabbitMqException;
12
use Drupal\rabbitmq\Exception\InvalidArgumentException as RabbitMqInvalidArgumentException;
13
use Drupal\rabbitmq\Exception\InvalidWorkerException;
14
use PhpAmqpLib\Connection\AMQPStreamConnection;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Symfony\Component\Yaml\Yaml;
17
18
/**
19
 * Implements hook_drush_command().
20
 */
21
function rabbitmq_drush_command() {
0 ignored issues
show
The return type could not be reliably inferred; please add a @return annotation.

Our type inference engine in quite powerful, but sometimes the code does not provide enough clues to go by. In these cases we request you to add a @return annotation as described here.

Loading history...
22
  $file = preg_replace('/(inc|php)$/', 'yml', __FILE__);
23
  $config = (new Yaml())->parse(file_get_contents($file));
24
  $items = $config['commands'];
25
  return $items;
26
}
27
28
/**
29
 * Command callback for rabbitmq-queue-info.
30
 *
31
 * @param string|null $queueName
32
 *   The name of the requested queue.
33
 */
34
function drush_rabbitmq_queue_info($queueName = NULL) {
35
  /** @var \Drupal\rabbitmq\Service\QueueInfo $info */
36
  $info = (new Drupal())->service('rabbitmq.queue_info');
37
  $count = $info->count($queueName);
38
  echo (new Yaml())->dump([$queueName => $count]);
39
}
40
41
/**
42
 * Command callback for rabbitmq-test-producer.
43
 */
44 View Code Duplication
function drush_rabbitmq_test_producer() {
45
  $connection = new AMQPStreamConnection(
46
    ConnectionFactory::DEFAULT_HOST,
47
    ConnectionFactory::DEFAULT_PORT,
48
    ConnectionFactory::DEFAULT_USER,
49
    ConnectionFactory::DEFAULT_PASS
50
  );
51
  $channel = $connection->channel();
52
  $routingKey = $queueName = 'hello';
53
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
54
  $message = new AMQPMessage('Hello World!');
55
  $channel->basic_publish($message, '', $routingKey);
56
  echo " [x] Sent 'Hello World!'\n";
57
  $channel->close();
58
  $connection->close();
59
}
60
61
/**
62
 * Command callback for rabbitmq-test-consumer.
63
 */
64
function drush_rabbitmq_test_consumer() {
65
  $connection = new AMQPStreamConnection(
66
    ConnectionFactory::DEFAULT_HOST,
67
    ConnectionFactory::DEFAULT_PORT,
68
    ConnectionFactory::DEFAULT_USER,
69
    ConnectionFactory::DEFAULT_PASS
70
  );
71
  $channel = $connection->channel();
72
  $queueName = 'hello';
73
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
74
  echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
75
76
  $callback = function ($msg) {
77
    echo " [x] Received ", $msg->body, "\n";
78
  };
79
80
  $channel->basic_consume($queueName, '', FALSE, TRUE, FALSE, FALSE, $callback);
81
82
  while (count($channel->callbacks)) {
83
    $channel->wait();
84
  }
85
  $channel->close();
86
  $connection->close();
87
}
88
89
/**
90
 * Command callback: Create a queue worker.
91
 *
92
 * @param string $queueName
93
 *   The name of the queue/ID of the queue worker plugin to wait on.
94
 *
95
 * @return bool
96
 *   Did the command succeed ?
97
 */
98
function drush_rabbitmq_worker($queueName) {
99
  /** @var \Drupal\rabbitmq\Consumer $consumer */
100
  $consumer = (new Drupal())->service('rabbitmq.consumer');
101
102
  // Service might be called from a non-Drush environment, so drush_get_option()
103
  // may not be available to it.
104
  $consumer->setOptionGetter(function (string $name) {
105
    return (int) drush_get_option($name, Consumer::OPTIONS[$name]);
106
  });
107
108
  $queueArgs = ['@name' => $queueName];
109
110
  drupal_register_shutdown_function(function () use ($consumer, $queueName) {
111
    $consumer->shutdownQueue($queueName);
112
  });
113
114
  try {
115
    $consumer->logStart();
116
    $consumer->consumeQueueApi($queueName);
117
  }
118
  catch (InvalidWorkerException $e) {
119
    return drush_set_error(dt("Worker for queue @name does not implement the worker interface.", $queueArgs));
120
  }
121
  catch (RabbitMqInvalidArgumentException $e) {
122
    return drush_set_error($e->getMessage());
123
  }
124
  catch (RabbitMqException $e) {
125
    return drush_set_error(dt("Could not obtain channel for queue.", $queueArgs));
126
  }
127
128
  return TRUE;
129
}
130