Passed
Pull Request — master (#2)
by Tomáš
10:59
created

AbstractConsumer   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 136
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 3
Bugs 0 Features 2
Metric Value
wmc 21
eloc 51
c 3
b 0
f 2
dl 0
loc 136
ccs 0
cts 48
cp 0
rs 10

12 Methods

Rating   Name   Duplication   Size   Complexity  
A getPrefetchSize() 0 3 1
A beforeProcess() 0 2 1
A isExclusive() 0 3 1
A isNoAck() 0 3 1
A getConsumerTag() 0 3 1
A getArguments() 0 3 1
A isNoWait() 0 3 1
A errorProcess() 0 2 1
A afterProcess() 0 2 1
A isNoLocal() 0 3 1
A getPrefetchCount() 0 3 1
B consume() 0 47 10
1
<?php declare(strict_types = 1);
2
3
namespace Portiny\RabbitMQ\Consumer;
4
5
use Bunny\Async\Client as AsyncClient;
6
use Bunny\Channel;
7
use Bunny\Client;
8
use Bunny\Message;
9
use Bunny\Protocol\MethodBasicConsumeOkFrame;
10
use InvalidArgumentException;
11
use React\Promise\PromiseInterface;
12
use Throwable;
13
14
abstract class AbstractConsumer
15
{
16
	public const MESSAGE_ACK = 1;
17
18
	public const MESSAGE_NACK = 2;
19
20
	public const MESSAGE_REJECT = 3;
21
22
	public const MESSAGE_REJECT_REQUEUE = 4;
23
24
	/**
25
	 * @var int
26
	 */
27
	private $consumedMessages = 0;
28
29
30
	/**
31
	 * @return MethodBasicConsumeOkFrame|PromiseInterface
32
	 */
33
	final public function consume(Channel $channel, ?int $numberOfMessages = null)
34
	{
35
		$channel->qos($this->getPrefetchSize(), $this->getPrefetchCount());
36
37
		return $channel->consume(
38
			function (Message $message, Channel $channel, $client) use ($numberOfMessages): void {
39
				$this->beforeProcess($message);
40
				try {
41
					$result = $this->process($message);
42
					$this->afterProcess($message);
43
				} catch (Throwable $throwable) {
44
					$this->errorProcess($message, $throwable);
45
					$channel->reject($message);
46
					throw $throwable;
47
				}
48
49
				switch ($result) {
50
					case self::MESSAGE_ACK:
51
						$channel->ack($message);
52
						break;
53
					case self::MESSAGE_NACK:
54
						$channel->nack($message);
55
						break;
56
					case self::MESSAGE_REJECT:
57
						$channel->reject($message, false);
58
						break;
59
					case self::MESSAGE_REJECT_REQUEUE:
60
						$channel->reject($message);
61
						break;
62
					default:
63
						$channel->reject($message);
64
						throw new InvalidArgumentException('Unknown return value of consumer');
65
				}
66
67
				if ($numberOfMessages !== null && ++$this->consumedMessages >= $numberOfMessages) {
68
					if ($client instanceof Client || $client instanceof AsyncClient) {
69
						$client->stop();
70
					}
71
				}
72
			},
73
			$this->getQueueName(),
74
			$this->getConsumerTag(),
75
			$this->isNoLocal(),
76
			$this->isNoAck(),
77
			$this->isExclusive(),
78
			$this->isNoWait(),
79
			$this->getArguments()
80
		);
81
	}
82
83
84
	abstract protected function process(Message $message): int;
85
86
87
	abstract protected function getQueueName(): string;
88
89
90
	protected function getConsumerTag(): string
91
	{
92
		return '';
93
	}
94
95
96
	protected function isNoLocal(): bool
97
	{
98
		return false;
99
	}
100
101
102
	protected function isNoAck(): bool
103
	{
104
		return false;
105
	}
106
107
108
	protected function isExclusive(): bool
109
	{
110
		return false;
111
	}
112
113
114
	protected function isNoWait(): bool
115
	{
116
		return false;
117
	}
118
119
120
	protected function getArguments(): array
121
	{
122
		return [];
123
	}
124
125
126
	protected function getPrefetchSize(): int
127
	{
128
		return 0;
129
	}
130
131
132
	protected function getPrefetchCount(): int
133
	{
134
		return 1;
135
	}
136
137
138
	protected function beforeProcess(Message $message): void
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

138
	protected function beforeProcess(/** @scrutinizer ignore-unused */ Message $message): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
139
	{
140
	}
141
142
143
	protected function afterProcess(Message $message): void
0 ignored issues
show
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

143
	protected function afterProcess(/** @scrutinizer ignore-unused */ Message $message): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
144
	{
145
	}
146
147
148
	protected function errorProcess(Message $message, Throwable $throwable): void
0 ignored issues
show
Unused Code introduced by
The parameter $throwable is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

148
	protected function errorProcess(Message $message, /** @scrutinizer ignore-unused */ Throwable $throwable): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $message is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

148
	protected function errorProcess(/** @scrutinizer ignore-unused */ Message $message, Throwable $throwable): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
149
	{
150
	}
151
152
}
153