Publisher::publish()   A
last analyzed

Complexity

Conditions 5
Paths 8

Size

Total Lines 34
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 21
c 3
b 0
f 0
dl 0
loc 34
rs 9.2728
cc 5
nc 8
nop 4
1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ;
6
7
use Throwable;
8
use Interop\Amqp;
9
use Psr\Log\LoggerInterface;
10
use BinaryCube\CarrotMQ\Entity\Topic;
11
use BinaryCube\CarrotMQ\Support\Collection;
12
use BinaryCube\CarrotMQ\Support\AutoWireAwareTrait;
13
14
use function max;
15
use function usleep;
16
use function is_callable;
17
use function call_user_func_array;
18
19
/**
20
 * Class Publisher
21
 */
22
class Publisher extends Core implements PublisherInterface
23
{
24
    use AutoWireAwareTrait;
25
26
    /**
27
     * @const array Default queue parameters
28
     */
29
    const DEFAULTS = [];
30
31
    /**
32
     * @var Topic
33
     */
34
    protected $topic;
35
36
    /**
37
     * @var Amqp\AmqpProducer
38
     */
39
    protected $producer;
40
41
    /**
42
     * @var array
43
     */
44
    protected $config;
45
46
    /**
47
     * @var boolean
48
     */
49
    protected $producerWasWired = false;
50
51
    /**
52
     * Constructor.
53
     *
54
     * @param string               $id
55
     * @param Topic                $topic
56
     * @param Container            $container
57
     * @param array                $config
58
     * @param LoggerInterface|null $logger
59
     */
60
    public function __construct(
61
        string $id,
62
        Topic $topic,
63
        Container $container,
64
        array $config = [],
65
        ?LoggerInterface $logger = null
66
    ) {
67
        parent::__construct($id, $container, $logger);
68
69
        $this->id        = $id;
70
        $this->topic     = $topic;
71
        $this->container = $container;
72
        $this->config    = Collection::make(static::DEFAULTS)->merge($config)->all();
73
74
        $this->topic->setLogger($this->logger);
75
    }
76
77
    /**
78
     * @return Connection
79
     */
80
    public function connection()
81
    {
82
        return $this->topic->connection();
83
    }
84
85
    /**
86
     * @return Topic
87
     */
88
    public function topic(): Topic
89
    {
90
        return $this->topic;
91
    }
92
93
    /**
94
     * @param Amqp\AmqpMessage $message
95
     * @param int              $retry   How many time should it be retried, default is 1
96
     * @param callable|null    $onRetry A callable that is called on retries, the signature must be `function (Publisher $publisher, $exception) { ... }`
97
     * @param float            $delay   In seconds
98
     *
99
     * @return $this
100
     *
101
     * @throws Throwable
102
     */
103
    public function publish(Amqp\AmqpMessage $message, int $retry = 0, ?callable $onRetry = null, float $delay = 0.5)
104
    {
105
        $retry = max(0, $retry);
106
        $error = null;
0 ignored issues
show
Unused Code introduced by
The assignment to $error is dead and can be removed.
Loading history...
107
108
        $this->autoWire($this->container);
109
110
        if (false === $this->producerWasWired) {
111
            $this->wireProducer();
112
        }
113
114
        do {
115
            try {
116
                // Reset error.
117
                $error = null;
118
119
                $this
120
                    ->producer
121
                    ->setPriority($message->getPriority())
122
                    ->setTimeToLive($message->getTimestamp())
123
                    ->send($this->topic->model(), $message);
124
125
                break;
126
            } catch (Throwable $exception) {
127
                $error = $exception;
128
                $this->onRetry($error, $onRetry, $delay);
129
            }//end try
130
        } while (--$retry > 0);
131
132
        if (isset($error)) {
133
            throw $error;
134
        }
135
136
        return $this;
137
    }
138
139
    /**
140
     * @return $this
141
     */
142
    public function reconnect()
143
    {
144
        $this->topic->reconnect();
145
        $this->wireProducer(true);
146
147
        return $this;
148
    }
149
150
    /**
151
     * @param bool $refresh
152
     *
153
     * @return Amqp\AmqpProducer
154
     */
155
    protected function wireProducer(bool $refresh = false): Amqp\AmqpProducer
156
    {
157
        if ($refresh || ! isset($this->producer)) {
158
            $context        = $this->topic->connection()->context(true);
159
            $this->producer = $context->createProducer();
160
161
            $this->producerWasWired = true;
162
        }
163
164
        return $this->producer;
165
    }
166
167
    /**
168
     * @param Throwable     $error
169
     * @param callable|null $onRetry
170
     * @param float         $delay
171
     *
172
     * @return void
173
     */
174
    protected function onRetry(Throwable $error, ?callable $onRetry = null, float $delay = 0.5): void
175
    {
176
        try {
177
            $this->reconnect();
178
        } catch (Throwable $exception) {
179
            //
180
        }
181
182
        if (isset($onRetry) && is_callable($onRetry)) {
183
            call_user_func_array($onRetry, [$this, $error]);
184
        }
185
186
        usleep((int) (1e6 * $delay));
187
    }
188
189
}
190