Passed
Push — 8.x-2.x ( ac762f...1a4044 )
by Zach
15:04
created

rabbitmq.drush.inc (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/**
4
 * @file
5
 * Drush plugin for RabbitMQ.
6
 */
7
8
use Drupal\rabbitmq\ConnectionFactory;
9
use Drupal\rabbitmq\Consumer;
10
// Avoid possible conflict with the builtin \[InvalidArgument]Exception.
11
use Drupal\rabbitmq\Exception\Exception as RabbitMqException;
12
use Drupal\rabbitmq\Exception\InvalidArgumentException as RabbitMqInvalidArgumentException;
13
use Drupal\rabbitmq\Exception\InvalidWorkerException;
14
use PhpAmqpLib\Connection\AMQPStreamConnection;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Symfony\Component\Yaml\Yaml;
17
18
/**
19
 * Implements hook_drush_command().
20
 */
21
function rabbitmq_drush_command() {
22
  $file = preg_replace('/(inc|php)$/', 'yml', __FILE__);
23
  $config = (new Yaml())->parse(file_get_contents($file));
24
  $items = $config['commands'];
25
  return $items;
26
}
27
28
/**
29
 * Command callback for rabbitmq-queue-info.
30
 *
31
 * @param string $queueName
0 ignored issues
show
Should the type for parameter $queueName not be string|null?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
32
 *   The name of the requested queue.
33
 */
34
function drush_rabbitmq_queue_info($queueName = NULL) {
35
  /** @var \Drupal\rabbitmq\Service\QueueInfo $info */
36
  $info = (new Drupal())->service('rabbitmq.queue_info');
37
  $count = $info->count($queueName);
38
  echo (new Yaml())->dump([$queueName => $count]);
39
}
40
41
/**
42
 * Command callback for rabbitmq-test-producer.
43
 */
44 View Code Duplication
function drush_rabbitmq_test_producer() {
45
  $connection = new AMQPStreamConnection(
46
    ConnectionFactory::DEFAULT_HOST,
47
    ConnectionFactory::DEFAULT_PORT,
48
    ConnectionFactory::DEFAULT_USER,
49
    ConnectionFactory::DEFAULT_PASS
50
  );
51
  $channel = $connection->channel();
52
  $routingKey = $queueName = 'hello';
53
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
54
  $message = new AMQPMessage('Hello World!');
55
  $channel->basic_publish($message, '', $routingKey);
56
  echo " [x] Sent 'Hello World!'\n";
57
  $channel->close();
58
  $connection->close();
59
}
60
61
/**
62
 * Command callback for rabbitmq-test-consumer.
63
 */
64
function drush_rabbitmq_test_consumer() {
65
  $connection = new AMQPStreamConnection(
66
    ConnectionFactory::DEFAULT_HOST,
67
    ConnectionFactory::DEFAULT_PORT,
68
    ConnectionFactory::DEFAULT_USER,
69
    ConnectionFactory::DEFAULT_PASS
70
  );
71
  $channel = $connection->channel();
72
  $queueName = 'hello';
73
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
74
  echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
75
76
  $callback = function ($msg) {
77
    echo " [x] Received ", $msg->body, "\n";
78
  };
79
80
  $channel->basic_consume($queueName, '', FALSE, TRUE, FALSE, FALSE, $callback);
81
82
  while (count($channel->callbacks)) {
83
    $channel->wait();
84
  }
85
  $channel->close();
86
  $connection->close();
87
}
88
89
/**
90
 * Command callback: Create a queue worker.
91
 *
92
 * @param string $queueName
93
 *   The name of the queue/ID of the queue worker plugin to wait on.
94
 *
95
 * @return bool
96
 *   Did the command succeed ?
97
 */
98
function drush_rabbitmq_worker($queueName) {
99
  /** @var \Drupal\rabbitmq\Consumer $consumer */
100
  $consumer = (new Drupal())->service('rabbitmq.consumer');
101
102
  // Service might be called from a non-Drush environment, so drush_get_option()
103
  // may not be available to it.
104
  $consumer->setOptionGetter(function (string $name) {
105
    return (int) drush_get_option($name, Consumer::OPTIONS[$name]);
106
  });
107
108
  $queueArgs = ['@name' => $queueName];
109
110
  drupal_register_shutdown_function(function () use ($consumer, $queueName) {
111
    $consumer->shutdownQueue($queueName);
112
  });
113
114
  try {
115
    $consumer->logStart();
116
    $consumer->consumeQueueApi($queueName);
117
  }
118
  catch (InvalidWorkerException $e) {
119
    return drush_set_error(dt("Worker for queue @name does not implement the worker interface.", $queueArgs));
120
  }
121
  catch (RabbitMqInvalidArgumentException $e) {
122
    return drush_set_error($e->getMessage());
123
  }
124
  catch (RabbitMqException $e) {
125
    return drush_set_error(dt("Could not obtain channel for queue.", $queueArgs));
126
  }
127
128
  return TRUE;
129
}
130