Passed
Push — 8.x-2.x ( fa02d2...12a886 )
by Zach
13:19
created

rabbitmq.drush.inc (3 issues)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
/**
4
 * @file
5
 * Drush plugin for RabbitMQ.
6
 */
7
8
use Drupal\rabbitmq\ConnectionFactory;
9
use Drupal\rabbitmq\Consumer;
10
// Avoid possible conflict with the builtin \[InvalidArgument]Exception.
11
use Drupal\rabbitmq\Exception\Exception as RabbitMqException;
12
use Drupal\rabbitmq\Exception\InvalidArgumentException as RabbitMqInvalidArgumentException;
13
use Drupal\rabbitmq\Exception\InvalidWorkerException;
14
use PhpAmqpLib\Connection\AMQPStreamConnection;
15
use PhpAmqpLib\Message\AMQPMessage;
16
use Symfony\Component\Yaml\Yaml;
17
18
/**
19
 * Implements hook_drush_command().
20
 */
21
function rabbitmq_drush_command() {
0 ignored issues
show
The return type could not be reliably inferred; please add a @return annotation.

Our type inference engine in quite powerful, but sometimes the code does not provide enough clues to go by. In these cases we request you to add a @return annotation as described here.

Loading history...
22
  $file = preg_replace('/(inc|php)$/', 'yml', __FILE__);
23
  $config = (new Yaml())->parse(file_get_contents($file));
24
  $items = $config['commands'];
25
  return $items;
26
}
27
28
/**
29
 * Command callback for rabbitmq-queue-info.
30
 *
31
 * @param string $queueName
0 ignored issues
show
Should the type for parameter $queueName not be string|null?

This check looks for @param annotations where the type inferred by our type inference engine differs from the declared type.

It makes a suggestion as to what type it considers more descriptive.

Most often this is a case of a parameter that can be null in addition to its declared types.

Loading history...
32
 *   The name of the requested queue.
33
 */
34
function drush_rabbitmq_queue_info($queueName = NULL) {
35
  /** @var \Drupal\rabbitmq\Service\QueueInfo $info */
36
  $info = (new Drupal())->service('rabbitmq.queue_info');
37
  $count = $info->count($queueName);
38
  echo (new Yaml())->dump([$queueName => $count]);
39
}
40
41
/**
42
 * Command callback for rabbitmq-test-producer.
43
 */
44 View Code Duplication
function drush_rabbitmq_test_producer() {
1 ignored issue
show
This function seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
45
  $connection = new AMQPStreamConnection(
46
    ConnectionFactory::DEFAULT_HOST,
47
    ConnectionFactory::DEFAULT_PORT,
48
    ConnectionFactory::DEFAULT_USER,
49
    ConnectionFactory::DEFAULT_PASS
50
  );
51
  $channel = $connection->channel();
52
  $routingKey = $queueName = 'hello';
53
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
54
  $message = new AMQPMessage('Hello World!');
55
  $channel->basic_publish($message, '', $routingKey);
56
  echo " [x] Sent 'Hello World!'\n";
57
  $channel->close();
58
  $connection->close();
59
}
60
61
/**
62
 * Command callback for rabbitmq-test-consumer.
63
 */
64
function drush_rabbitmq_test_consumer() {
65
  $connection = new AMQPStreamConnection(
66
    ConnectionFactory::DEFAULT_HOST,
67
    ConnectionFactory::DEFAULT_PORT,
68
    ConnectionFactory::DEFAULT_USER,
69
    ConnectionFactory::DEFAULT_PASS
70
  );
71
  $channel = $connection->channel();
72
  $queueName = 'hello';
73
  $channel->queue_declare($queueName, FALSE, FALSE, FALSE, FALSE);
74
  echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
75
76
  $callback = function ($msg) {
77
    echo " [x] Received ", $msg->body, "\n";
78
  };
79
80
  $channel->basic_consume($queueName, '', FALSE, TRUE, FALSE, FALSE, $callback);
81
82
  while (count($channel->callbacks)) {
83
    $channel->wait();
84
  }
85
  $channel->close();
86
  $connection->close();
87
}
88
89
/**
90
 * Command callback: Create a queue worker.
91
 *
92
 * @param string $queueName
93
 *   The name of the queue/ID of the queue worker plugin to wait on.
94
 *
95
 * @return bool
96
 *   Did the command succeed ?
97
 */
98
function drush_rabbitmq_worker($queueName) {
99
  /** @var \Drupal\rabbitmq\Consumer $consumer */
100
  $consumer = (new Drupal())->service('rabbitmq.consumer');
101
102
  // Service might be called from a non-Drush environment, so drush_get_option()
103
  // may not be available to it.
104
  $consumer->setOptionGetter(function (string $name) {
105
    return (int) drush_get_option($name, Consumer::OPTIONS[$name]);
106
  });
107
108
  $queueArgs = ['@name' => $queueName];
109
110
  drupal_register_shutdown_function(function () use ($consumer, $queueName) {
111
    $consumer->shutdownQueue($queueName);
112
  });
113
114
  try {
115
    $consumer->logStart();
116
    $consumer->consumeQueueApi($queueName);
117
  }
118
  catch (InvalidWorkerException $e) {
119
    return drush_set_error(dt("Worker for queue @name does not implement the worker interface.", $queueArgs));
120
  }
121
  catch (RabbitMqInvalidArgumentException $e) {
122
    return drush_set_error($e->getMessage());
123
  }
124
  catch (RabbitMqException $e) {
125
    return drush_set_error(dt("Could not obtain channel for queue.", $queueArgs));
126
  }
127
128
  return TRUE;
129
}
130