eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||
| 2 | |||
| 3 | namespace OldSound\RabbitMqBundle\RabbitMq; |
||
| 4 | |||
| 5 | use PhpAmqpLib\Message\AMQPMessage; |
||
| 6 | |||
| 7 | abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface |
||
| 8 | { |
||
| 9 | /** @var int */ |
||
| 10 | protected $target; |
||
| 11 | |||
| 12 | /** @var int */ |
||
| 13 | protected $consumed = 0; |
||
| 14 | |||
| 15 | /** @var callable */ |
||
| 16 | protected $callback; |
||
| 17 | |||
| 18 | /** @var bool */ |
||
| 19 | protected $forceStop = false; |
||
| 20 | |||
| 21 | /** @var int */ |
||
| 22 | protected $idleTimeout = 0; |
||
| 23 | |||
| 24 | /** @var int */ |
||
| 25 | protected $idleTimeoutExitCode; |
||
| 26 | |||
| 27 | 15 | public function setCallback($callback) |
|
| 28 | { |
||
| 29 | 15 | $this->callback = $callback; |
|
| 30 | 15 | } |
|
| 31 | |||
| 32 | /** |
||
| 33 | * @return callable |
||
| 34 | */ |
||
| 35 | public function getCallback() |
||
| 36 | { |
||
| 37 | return $this->callback; |
||
| 38 | } |
||
| 39 | |||
| 40 | /** |
||
| 41 | * @param int $msgAmount |
||
| 42 | * @throws \ErrorException |
||
| 43 | */ |
||
| 44 | public function start($msgAmount = 0) |
||
| 45 | { |
||
| 46 | $this->target = $msgAmount; |
||
| 47 | |||
| 48 | $this->setupConsumer(); |
||
| 49 | |||
| 50 | while (count($this->getChannel()->callbacks)) { |
||
| 51 | $this->getChannel()->wait(); |
||
| 52 | } |
||
| 53 | } |
||
| 54 | |||
| 55 | /** |
||
| 56 | * Tell the server you are going to stop consuming. |
||
| 57 | * |
||
| 58 | * It will finish up the last message and not send you any more. |
||
| 59 | */ |
||
| 60 | public function stopConsuming() |
||
| 61 | { |
||
| 62 | // This gets stuck and will not exit without the last two parameters set. |
||
| 63 | $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); |
||
| 64 | } |
||
| 65 | |||
| 66 | 18 | protected function setupConsumer() |
|
| 67 | { |
||
| 68 | 18 | if ($this->autoSetupFabric) { |
|
| 69 | $this->setupFabric(); |
||
| 70 | } |
||
| 71 | 18 | $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage')); |
|
| 72 | 18 | } |
|
| 73 | |||
| 74 | public function processMessage(AMQPMessage $msg) |
||
|
0 ignored issues
–
show
|
|||
| 75 | { |
||
| 76 | //To be implemented by descendant classes |
||
| 77 | } |
||
| 78 | |||
| 79 | 48 | protected function maybeStopConsumer() |
|
| 80 | { |
||
| 81 | 48 | if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { |
|
| 82 | 48 | if (!function_exists('pcntl_signal_dispatch')) { |
|
| 83 | throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called."); |
||
| 84 | } |
||
| 85 | |||
| 86 | 48 | pcntl_signal_dispatch(); |
|
| 87 | } |
||
| 88 | |||
| 89 | 48 | if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) { |
|
| 90 | $this->stopConsuming(); |
||
| 91 | } |
||
| 92 | 48 | } |
|
| 93 | |||
| 94 | public function setConsumerTag($tag) |
||
| 95 | { |
||
| 96 | $this->consumerTag = $tag; |
||
| 97 | } |
||
| 98 | |||
| 99 | 18 | public function getConsumerTag() |
|
| 100 | { |
||
| 101 | 18 | return $this->consumerTag; |
|
| 102 | } |
||
| 103 | |||
| 104 | 1 | public function forceStopConsumer() |
|
| 105 | { |
||
| 106 | 1 | $this->forceStop = true; |
|
| 107 | 1 | } |
|
| 108 | |||
| 109 | /** |
||
| 110 | * Sets the qos settings for the current channel |
||
| 111 | * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 |
||
| 112 | * |
||
| 113 | * @param int $prefetchSize |
||
| 114 | * @param int $prefetchCount |
||
| 115 | * @param bool $global |
||
| 116 | */ |
||
| 117 | public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false) |
||
| 118 | { |
||
| 119 | $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); |
||
| 120 | } |
||
| 121 | |||
| 122 | 9 | public function setIdleTimeout($idleTimeout) |
|
| 123 | { |
||
| 124 | 9 | $this->idleTimeout = $idleTimeout; |
|
| 125 | 9 | } |
|
| 126 | |||
| 127 | /** |
||
| 128 | * Set exit code to be returned when there is a timeout exception |
||
| 129 | * |
||
| 130 | * @param int|null $idleTimeoutExitCode |
||
| 131 | */ |
||
| 132 | 5 | public function setIdleTimeoutExitCode($idleTimeoutExitCode) |
|
| 133 | { |
||
| 134 | 5 | $this->idleTimeoutExitCode = $idleTimeoutExitCode; |
|
| 135 | 5 | } |
|
| 136 | |||
| 137 | 19 | public function getIdleTimeout() |
|
| 138 | { |
||
| 139 | 19 | return $this->idleTimeout; |
|
| 140 | } |
||
| 141 | |||
| 142 | /** |
||
| 143 | * Get exit code to be returned when there is a timeout exception |
||
| 144 | * |
||
| 145 | * @return int|null |
||
| 146 | */ |
||
| 147 | 7 | public function getIdleTimeoutExitCode() |
|
| 148 | { |
||
| 149 | 7 | return $this->idleTimeoutExitCode; |
|
| 150 | } |
||
| 151 | |||
| 152 | /** |
||
| 153 | * Resets the consumed property. |
||
| 154 | * Use when you want to call start() or consume() multiple times. |
||
| 155 | */ |
||
| 156 | public function resetConsumed() |
||
| 157 | { |
||
| 158 | $this->consumed = 0; |
||
| 159 | } |
||
| 160 | } |
||
| 161 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.