Producer::background()   A
last analyzed

Complexity

Conditions 3
Paths 2

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 12
nc 2
nop 0
dl 0
loc 22
rs 9.8666
c 0
b 0
f 0
1
<?php
2
/**
3
 * NSQ producer
4
 * User: moyo
5
 * Date: 15/11/2017
6
 * Time: 3:40 PM
7
 */
8
9
namespace Carno\NSQ;
10
11
use Carno\Channel\Worker;
12
use function Carno\Coroutine\async;
13
use Carno\NSQ\Chips\Daemon;
14
use Carno\NSQ\Chips\EndpointsPersistent;
15
use Carno\NSQ\Chips\EndpointsRefresher;
16
use Carno\NSQ\Chips\INSLinker;
17
use Carno\NSQ\Chips\InteractiveChan;
18
use Carno\NSQ\Chips\LookupAssigned;
19
use Carno\NSQ\Chips\N2Daemon;
20
use Carno\NSQ\Chips\TCAssigned;
21
use Carno\NSQ\Connector\Nsqd;
22
use Carno\NSQ\Contracts\Daemonize;
23
use Carno\NSQ\Types\Message;
24
use Carno\Promise\Promise;
25
use Carno\Promise\Promised;
26
use Throwable;
27
28
class Producer extends Daemon implements Daemonize
29
{
30
    use N2Daemon, LookupAssigned, TCAssigned, INSLinker;
31
    use EndpointsRefresher, EndpointsPersistent;
32
    use InteractiveChan;
33
34
    /**
35
     * Producer constructor.
36
     * @param int $queued
37
     */
38
    public function __construct(int $queued = 0)
39
    {
40
        $this->init($queued);
41
    }
42
43
    /**
44
     * @return Promised
45
     */
46
    protected function background() : Promised
47
    {
48
        if ($this->queued()) {
49
            $this->exited()->then(function () {
50
                $this->chan()->close();
51
            });
52
53
            new Worker($this->chan(), function (array $messages) {
54
                try {
55
                    yield $this->sending(...$messages);
56
                } catch (Throwable $e) {
57
                    logger('nsq')->info(
58
                        'Queued publish failure',
59
                        ['error' => sprintf('%s::%s', get_class($e), $e->getMessage())]
60
                    );
61
                }
62
            });
63
64
            return $this->chan()->closed();
65
        }
66
67
        return Promise::resolved();
68
    }
69
70
    /**
71
     * @param Message ...$messages
72
     * @return Promised|bool
73
     */
74
    public function publish(Message ...$messages) : Promised
75
    {
76
        return $this->queued() ? $this->chan()->send($messages) : async($this->sending(...$messages));
77
    }
78
79
    /**
80
     * @param Message ...$messages
81
     * @return mixed
82
     */
83
    private function sending(Message ...$messages)
84
    {
85
        $endpoint = $this->hasLookupd()
86
            ? yield $this->lookupEndpoint($this->getLookupd(), $this->getTopic())
87
            : $this->staticEndpoint()
88
        ;
89
90
        /**
91
         * @var Nsqd $nsq
92
         */
93
94
        $nsq = $this->linking($endpoint);
95
96
        return yield $nsq->pub($this->getTopic(), ...$messages);
97
    }
98
}
99