1 | <?php |
||
2 | |||
3 | namespace OldSound\RabbitMqBundle\RabbitMq; |
||
4 | |||
5 | use PhpAmqpLib\Message\AMQPMessage; |
||
6 | |||
7 | abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface |
||
8 | { |
||
9 | /** @var int */ |
||
10 | protected $target; |
||
11 | |||
12 | /** @var int */ |
||
13 | protected $consumed = 0; |
||
14 | |||
15 | /** @var callable */ |
||
16 | protected $callback; |
||
17 | |||
18 | /** @var bool */ |
||
19 | protected $forceStop = false; |
||
20 | |||
21 | /** @var int */ |
||
22 | protected $idleTimeout = 0; |
||
23 | |||
24 | /** @var int */ |
||
25 | protected $idleTimeoutExitCode; |
||
26 | |||
27 | 15 | public function setCallback($callback) |
|
28 | { |
||
29 | 15 | $this->callback = $callback; |
|
30 | 15 | } |
|
31 | |||
32 | /** |
||
33 | * @return callable |
||
34 | */ |
||
35 | public function getCallback() |
||
36 | { |
||
37 | return $this->callback; |
||
38 | } |
||
39 | |||
40 | /** |
||
41 | * @param int $msgAmount |
||
42 | * @throws \ErrorException |
||
43 | */ |
||
44 | public function start($msgAmount = 0) |
||
45 | { |
||
46 | $this->target = $msgAmount; |
||
47 | |||
48 | $this->setupConsumer(); |
||
49 | |||
50 | while ($this->getChannel()->is_consuming()) { |
||
51 | $this->getChannel()->wait(); |
||
52 | } |
||
53 | } |
||
54 | |||
55 | /** |
||
56 | * Tell the server you are going to stop consuming. |
||
57 | * |
||
58 | * It will finish up the last message and not send you any more. |
||
59 | */ |
||
60 | public function stopConsuming() |
||
61 | { |
||
62 | // This gets stuck and will not exit without the last two parameters set. |
||
63 | $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); |
||
64 | } |
||
65 | |||
66 | 18 | protected function setupConsumer() |
|
67 | { |
||
68 | 18 | if ($this->autoSetupFabric) { |
|
69 | $this->setupFabric(); |
||
70 | } |
||
71 | 18 | $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']); |
|
72 | 18 | } |
|
73 | |||
74 | public function processMessage(AMQPMessage $msg) |
||
0 ignored issues
–
show
|
|||
75 | { |
||
76 | //To be implemented by descendant classes |
||
77 | } |
||
78 | |||
79 | 50 | protected function maybeStopConsumer() |
|
80 | { |
||
81 | 50 | if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) { |
|
82 | 50 | if (!function_exists('pcntl_signal_dispatch')) { |
|
83 | throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called."); |
||
84 | } |
||
85 | |||
86 | 50 | pcntl_signal_dispatch(); |
|
87 | } |
||
88 | |||
89 | 50 | if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) { |
|
90 | $this->stopConsuming(); |
||
91 | } |
||
92 | 50 | } |
|
93 | |||
94 | public function setConsumerTag($tag) |
||
95 | { |
||
96 | $this->consumerTag = $tag; |
||
97 | } |
||
98 | |||
99 | 18 | public function getConsumerTag() |
|
100 | { |
||
101 | 18 | return $this->consumerTag; |
|
102 | } |
||
103 | |||
104 | public function forceStopConsumer() |
||
105 | { |
||
106 | $this->forceStop = true; |
||
107 | } |
||
108 | |||
109 | /** |
||
110 | * Sets the qos settings for the current channel |
||
111 | * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0 |
||
112 | * |
||
113 | * @param int $prefetchSize |
||
114 | * @param int $prefetchCount |
||
115 | * @param bool $global |
||
116 | */ |
||
117 | public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false) |
||
118 | { |
||
119 | $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global); |
||
120 | } |
||
121 | |||
122 | 9 | public function setIdleTimeout($idleTimeout) |
|
123 | { |
||
124 | 9 | $this->idleTimeout = $idleTimeout; |
|
125 | 9 | } |
|
126 | |||
127 | /** |
||
128 | * Set exit code to be returned when there is a timeout exception |
||
129 | * |
||
130 | * @param int|null $idleTimeoutExitCode |
||
131 | */ |
||
132 | 5 | public function setIdleTimeoutExitCode($idleTimeoutExitCode) |
|
133 | { |
||
134 | 5 | $this->idleTimeoutExitCode = $idleTimeoutExitCode; |
|
135 | 5 | } |
|
136 | |||
137 | 19 | public function getIdleTimeout() |
|
138 | { |
||
139 | 19 | return $this->idleTimeout; |
|
140 | } |
||
141 | |||
142 | /** |
||
143 | * Get exit code to be returned when there is a timeout exception |
||
144 | * |
||
145 | * @return int|null |
||
146 | */ |
||
147 | 7 | public function getIdleTimeoutExitCode() |
|
148 | { |
||
149 | 7 | return $this->idleTimeoutExitCode; |
|
150 | } |
||
151 | |||
152 | /** |
||
153 | * Resets the consumed property. |
||
154 | * Use when you want to call start() or consume() multiple times. |
||
155 | */ |
||
156 | public function resetConsumed() |
||
157 | { |
||
158 | $this->consumed = 0; |
||
159 | } |
||
160 | } |
||
161 |
This check looks for parameters that have been defined for a function or method, but which are not used in the method body.