Issues (8)

src/Consumer/BasicConsumer.php (1 issue)

1
<?php
2
3
/**
4
 * This file is part of amqp
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 */
9
 
10
declare(strict_types=1);
11
12
namespace Slick\Amqp\Consumer;
13
14
use Exception;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Connection\AMQPStreamConnection;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Slick\Amqp\Consumer;
19
use Slick\Amqp\Message;
20
21
/**
22
 * BasicConsumer
23
 *
24
 * @package Slick\Amqp\Consumer
25
 */
26
abstract class BasicConsumer implements Consumer
27
{
28
    use ConsumerMethods;
29
30
    /**
31
     * @var string
32
     */
33
    protected string $queue = '';
34
35
    /**
36
     * @var string
37
     */
38
    protected string $exchange = '';
39
40
    /**
41
     * @var bool
42
     */
43
    protected bool $declared = false;
44
45
    /**
46
     * @var AMQPStreamConnection
47
     */
48
    protected AMQPStreamConnection $connection;
49
50
    /**
51
     * @var AMQPChannel
52
     */
53
    private AMQPChannel $channel;
54
55
    /**
56
     * Creates a BasicProducer
57
     *
58
     * @param AMQPStreamConnection $connection
59
     */
60
    public function __construct(AMQPStreamConnection $connection)
61
    {
62
        $this->mergeOptions();
63
        $this->mergeExchangeOptions();
64
        $this->connection = $connection;
65
        $this->channel = $this->connection->channel();
66
    }
67
68
    /**
69
     * @inheritDoc
70
     */
71
    public function bind(?string $routingKey = ''): mixed
72
    {
73
        if (!$this->isDeclared()) {
74
            $this->declareQueue();
75
        }
76
77
        return $this->channel()->queue_bind($this->queue, $this->exchange, $routingKey ?? "");
78
    }
79
80
    /**
81
     * @inheritDoc
82
     * @param array<string, mixed> $options
83
     */
84
    public function consume(callable $callable, array $options = []): void
85
    {
86
        if (!$this->isDeclared()) {
87
            $this->declareQueue();
88
        }
89
90
        $callback = function (AMQPMessage $message) use ($callable) {
91
            return $callable(Message::fromAMQPMessage($message));
92
        };
93
94
        $this->mergeConsumeOptions(array_merge($options, ['callback' => $callback]));
95
        $args = $this->consumeOptions;
96
        call_user_func_array([$this->channel(), 'basic_consume'], $args);
97
98
        while ($this->channel()->is_consuming()) {
99
            $this->channel()->wait();
100
        }
101
    }
102
103
    /**
104
     * @inheritDoc
105
     */
106
    public function acknowledge(Message $message): void
107
    {
108
        if ($message->channel()) {
109
            $message->channel()->basic_ack($message->deliveryTag());
110
        }
111
    }
112
113
    /**
114
     * AMQP channel (Session)
115
     *
116
     * @return AMQPChannel
117
     */
118
    protected function channel(): AMQPChannel
119
    {
120
        return $this->channel;
121
    }
122
123
    /**
124
     * Check if this producer has a declared exchange
125
     *
126
     * @return bool
127
     */
128
    protected function isDeclared(): bool
129
    {
130
        return $this->declared;
131
    }
132
133
    /**
134
     * Declares the exchange to be used
135
     *
136
     * This method SHOULD set up de the exchange and MUST set the declared bit accordingly
137
     */
138
    protected function declareQueue(): void
139
    {
140
        $args = array_values($this->options());
141
        array_unshift($args, $this->queue);
142
        list($queueName, ,) = call_user_func_array([$this->channel(), 'queue_declare'], $args);
143
        $this->queue = $queueName;
144
        $this->declared = true;
145
    }
146
147
    public function __destruct()
148
    {
149
        try {
150
            $this->channel->close();
151
            $this->connection->close();
152
        } catch (Exception) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
153
        }
154
    }
155
}
156