AbstractConsumer::isExclusive()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 1
c 1
b 0
f 1
dl 0
loc 3
rs 10
ccs 0
cts 2
cp 0
cc 1
nc 1
nop 0
crap 2
1
<?php declare(strict_types = 1);
2
3
namespace Portiny\RabbitMQ\Consumer;
4
5
use Bunny\Async\Client as AsyncClient;
6
use Bunny\Channel;
7
use Bunny\Client;
8
use Bunny\Message;
9
use Bunny\Protocol\MethodBasicConsumeOkFrame;
10
use InvalidArgumentException;
11
use React\Promise\PromiseInterface;
12
use Throwable;
13
14
abstract class AbstractConsumer
15
{
16
	public const MESSAGE_ACK = 1;
17
18
	public const MESSAGE_NACK = 2;
19
20
	public const MESSAGE_REJECT = 3;
21
22
	public const MESSAGE_REJECT_REQUEUE = 4;
23
24
	/**
25
	 * @var int
26
	 */
27
	private $consumedMessages = 0;
28
29
30
	/**
31
	 * @return MethodBasicConsumeOkFrame|PromiseInterface
32
	 */
33
	final public function consume(Channel $channel, ?int $numberOfMessages = null)
34
	{
35
		$channel->qos($this->getPrefetchSize(), $this->getPrefetchCount());
36
37
		return $channel->consume(
38
			function (Message $message, Channel $channel, $client) use ($numberOfMessages): void {
39
				try {
40
					$result = $this->process($message);
41
				} catch (Throwable $throwable) {
42
					$channel->reject($message);
43
					throw $throwable;
44
				}
45
46
				switch ($result) {
47
					case self::MESSAGE_ACK:
48
						$channel->ack($message);
49
						break;
50
					case self::MESSAGE_NACK:
51
						$channel->nack($message);
52
						break;
53
					case self::MESSAGE_REJECT:
54
						$channel->reject($message, false);
55
						break;
56
					case self::MESSAGE_REJECT_REQUEUE:
57
						$channel->reject($message);
58
						break;
59
					default:
60
						$channel->reject($message);
61
						throw new InvalidArgumentException('Unknown return value of consumer');
62
				}
63
64
				if ($numberOfMessages !== null && ++$this->consumedMessages >= $numberOfMessages) {
65
					if ($client instanceof Client || $client instanceof AsyncClient) {
66
						$client->stop();
67
					}
68
				}
69
			},
70
			$this->getQueueName(),
71
			$this->getConsumerTag(),
72
			$this->isNoLocal(),
73
			$this->isNoAck(),
74
			$this->isExclusive(),
75
			$this->isNoWait(),
76
			$this->getArguments()
77
		);
78
	}
79
80
81
	abstract protected function process(Message $message): int;
82
83
84
	abstract protected function getQueueName(): string;
85
86
87
	protected function getConsumerTag(): string
88
	{
89
		return '';
90
	}
91
92
93
	protected function isNoLocal(): bool
94
	{
95
		return false;
96
	}
97
98
99
	protected function isNoAck(): bool
100
	{
101
		return false;
102
	}
103
104
105
	protected function isExclusive(): bool
106
	{
107
		return false;
108
	}
109
110
111
	protected function isNoWait(): bool
112
	{
113
		return false;
114
	}
115
116
117
	protected function getArguments(): array
118
	{
119
		return [];
120
	}
121
122
123
	protected function getPrefetchSize(): int
124
	{
125
		return 0;
126
	}
127
128
129
	protected function getPrefetchCount(): int
130
	{
131
		return 1;
132
	}
133
134
}
135