Issues (8)

src/Producer/BasicProducer.php (1 issue)

1
<?php
2
3
/**
4
 * This file is part of amqp
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 */
9
 
10
declare(strict_types=1);
11
12
namespace Slick\Amqp\Producer;
13
14
use Exception;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Connection\AMQPStreamConnection;
17
use Slick\Amqp\Message;
18
use Slick\Amqp\Producer;
19
20
/**
21
 * BasicProducer
22
 *
23
 * @package Slick\Amqp\Producer
24
 */
25
abstract class BasicProducer implements Producer
26
{
27
    use ProducerMethods;
28
29
    /**
30
     * @var string
31
     */
32
    protected string $exchange = '';
33
34
    /**
35
     * @var bool
36
     */
37
    protected bool $declared = false;
38
39
    /**
40
     * @var AMQPStreamConnection
41
     */
42
    protected AMQPStreamConnection $connection;
43
44
    /**
45
     * @var AMQPChannel
46
     */
47
    private AMQPChannel $channel;
48
49
    /**
50
     * Creates a BasicProducer
51
     *
52
     * @param AMQPStreamConnection $connection
53
     */
54
    public function __construct(AMQPStreamConnection $connection)
55
    {
56
        $this->mergeOptions();
57
        $this->connection = $connection;
58
        $this->channel = $this->connection->channel();
59
    }
60
61
    /**
62
     * @inheritDoc
63
     */
64
    public function publish(Message $message, ?string $routingKey = ""): void
65
    {
66
        if (!$this->isDeclared()) {
67
            $this->declareExchange();
68
        }
69
70
        $this->channel()->basic_publish(
71
            $message->sourceMessage(),
72
            $this->exchange,
73
            $routingKey ?? ""
74
        );
75
    }
76
77
    /**
78
     * Exchange default options
79
     *
80
     * @return array<string, mixed>
81
     */
82
    public static function exchangeDefaultOptions(): array
83
    {
84
        return self::$defaultOptions;
85
    }
86
87
    /**
88
     * Check if this producer has a declared exchange
89
     *
90
     * @return bool
91
     */
92
    protected function isDeclared(): bool
93
    {
94
        return $this->declared;
95
    }
96
97
    /**
98
     * AMQP channel (Session)
99
     *
100
     * @return AMQPChannel
101
     */
102
    protected function channel(): AMQPChannel
103
    {
104
        return $this->channel;
105
    }
106
107
    /**
108
     * Declares the exchange to be used
109
     *
110
     * This method SHOULD set up de the exchange and MUST set the declared bit accordingly
111
     */
112
    protected function declareExchange(): void
113
    {
114
        $this->declared = true;
115
    }
116
117
    public function __destruct()
118
    {
119
        try {
120
            $this->channel->close();
121
            $this->connection->close();
122
        } catch (Exception) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
123
        }
124
    }
125
}
126