Features::__construct()   F
last analyzed

Complexity

Conditions 14
Paths 1201

Size

Total Lines 50
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 31
nc 1201
nop 6
dl 0
loc 50
rs 2.1
c 0
b 0
f 0

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Clustered features
4
 * User: moyo
5
 * Date: 2018/7/9
6
 * Time: 11:26 AM
7
 */
8
9
namespace Carno\NSQ\Clustered;
10
11
use function Carno\Coroutine\all;
12
use Carno\DSN\DSN;
13
use Carno\Net\Endpoint;
14
use Carno\NSQ\Components\Lookupd;
15
use Carno\NSQ\Consumer;
16
use Carno\NSQ\Exception\ClusterEndpointInitException;
17
use Carno\NSQ\Producer;
18
use Carno\NSQ\Types\Consuming;
19
use Carno\Promise\Promise;
20
use Carno\Promise\Promised;
21
22
class Features
23
{
24
    /**
25
     * @var Producer
26
     */
27
    private $producer = null;
28
29
    /**
30
     * @var Consumer
31
     */
32
    private $consumer = null;
33
34
    /**
35
     * Features constructor.
36
     * @param string $topic
37
     * @param string $channel
38
     * @param Endpoint $endpoint
39
     * @param Promised $subscribed
40
     * @param int $producing
41
     * @param Consuming $consuming
42
     */
43
    public function __construct(
44
        string $topic,
45
        string $channel,
46
        Endpoint $endpoint,
47
        Promised $subscribed,
48
        int $producing = null,
49
        Consuming $consuming = null
50
    ) {
51
        $host = $endpoint->address()->host();
52
        $port = $endpoint->address()->port();
53
54
        if ($port <= 0) {
55
            $dsn = new DSN($host);
56
            $host = $dsn->host();
57
            $port = $dsn->port();
58
            switch ($dsn->scheme()) {
59
                case 'lookupd':
60
                    $lookupd = new Lookupd($host, $port);
61
                    break;
62
                case 'nsqd':
63
                    $nsqd = $endpoint;
64
                    break;
65
                default:
66
                    throw new ClusterEndpointInitException('Not supported scheme');
67
            }
68
        } else {
69
            $lookupd = new Lookupd($host, $port);
70
        }
71
72
        $producing === null || $this->producer = new Producer($producing);
73
        $consuming === null || $this->consumer = new Consumer($consuming);
74
75
        if (isset($lookupd)) {
76
            $this->producer && $this->producer->setLookupd($lookupd);
77
            $this->consumer && $this->consumer->setLookupd($lookupd);
78
        }
79
80
        if (isset($nsqd)) {
81
            $this->producer && $this->producer->addEndpoint($nsqd);
82
            $this->consumer && $this->consumer->addEndpoint($nsqd);
83
        }
84
85
        if ($this->producer) {
86
            $this->producer->setTopic($topic)->startup();
87
        }
88
89
        if ($this->consumer) {
90
            $this->consumer->setTopic($topic)->setChannel($channel);
91
            $subscribed->then(function () {
92
                $this->consumer->startup();
93
            });
94
        }
95
    }
96
97
    /**
98
     * @return Producer
99
     */
100
    public function producer() : ?Producer
101
    {
102
        return $this->producer;
103
    }
104
105
    /**
106
     * @return Consumer
107
     */
108
    public function consumer() : ?Consumer
109
    {
110
        return $this->consumer;
111
    }
112
113
    /**
114
     * @return Promised
115
     */
116
    public function shutdown() : Promised
117
    {
118
        return all(
119
            $this->producer ? $this->producer->shutdown() : Promise::resolved(),
120
            $this->consumer ? $this->consumer->shutdown() : Promise::resolved()
121
        );
122
    }
123
}
124