Cluster::connecting()   A
last analyzed

Complexity

Conditions 5
Paths 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 7
nc 1
nop 1
dl 0
loc 9
rs 9.6111
c 0
b 0
f 0
1
<?php
2
/**
3
 * NSQ via cluster
4
 * User: moyo
5
 * Date: 2018/7/6
6
 * Time: 6:22 PM
7
 */
8
9
namespace Carno\NSQ;
10
11
use Carno\Cluster\Classify\Scenes;
12
use Carno\Cluster\Managed;
13
use Carno\Cluster\Resources;
14
use Carno\Net\Endpoint;
15
use Carno\NSQ\Clustered\Features;
16
use Carno\NSQ\Types\Consuming;
17
use Carno\NSQ\Types\Message;
18
use Carno\Promise\Promise;
19
use Carno\Promise\Promised;
20
21
abstract class Cluster extends Managed
22
{
23
    protected const PUB = 'publish';
24
    protected const SUB = 'subscribe';
25
26
    /**
27
     * @var array
28
     */
29
    protected $tags = [self::PUB, self::SUB];
30
31
    /**
32
     * @var string
33
     */
34
    protected $type = 'nsq';
35
36
    /**
37
     * @var int
38
     */
39
    protected $port = 4161;
40
41
    /**
42
     * @var string
43
     */
44
    protected $topic = 'topic';
45
46
    /**
47
     * @var string
48
     */
49
    protected $channel = 'channel';
50
51
    /**
52
     * @var Promised
53
     */
54
    private $subscribed = null;
55
56
    /**
57
     * Cluster constructor.
58
     * @param Resources $resources
59
     */
60
    public function __construct(Resources $resources)
61
    {
62
        $resources->initialize(Scenes::RESOURCE, $this->type, $this->server, $this);
63
    }
64
65
    /**
66
     * get producer queued cap
67
     * @return int
68
     */
69
    abstract protected function producing() : int;
70
71
    /**
72
     * get consumer processor
73
     * @return Consuming
74
     */
75
    abstract protected function consuming() : Consuming;
76
77
    /**
78
     * @param Endpoint $node
79
     */
80
    protected function discovered(Endpoint $node) : void
81
    {
82
        in_array(self::SUB, $node->getTags()) && $this->picked($node);
83
    }
84
85
    /**
86
     * @param Endpoint $endpoint
87
     * @return Features
88
     */
89
    protected function connecting(Endpoint $endpoint) : Features
90
    {
91
        return new Features(
92
            $this->topic,
93
            $this->channel,
94
            $endpoint,
95
            $this->subscribed ?? $this->subscribed = Promise::deferred(),
96
            (isset($this->withoutProducer) || !in_array(self::PUB, $endpoint->getTags())) ? null : $this->producing(),
97
            (isset($this->withoutConsumer) || !in_array(self::SUB, $endpoint->getTags())) ? null : $this->consuming()
98
        );
99
    }
100
101
    /**
102
     * @param Features $connected
103
     * @return Promised
104
     */
105
    protected function disconnecting($connected) : Promised
106
    {
107
        return $connected->shutdown();
108
    }
109
110
    /**
111
     * @param Message ...$messages
112
     * @return mixed|bool
113
     */
114
    final public function publish(Message ...$messages)
115
    {
116
        /**
117
         * @var Features $features
118
         */
119
120
        $features = yield $this->picking(self::PUB);
121
122
        return $features->producer()->publish(...$messages);
123
    }
124
125
    /**
126
     */
127
    final public function subscribe()
128
    {
129
        $this->subscribed ? $this->subscribed->resolve() : $this->subscribed = Promise::resolved();
130
    }
131
}
132