Config   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 91
rs 10
c 0
b 0
f 0
wmc 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A endpoints() 0 23 3
A named() 0 3 1
A attach() 0 14 4
A __construct() 0 3 1
A detach() 0 3 1
1
<?php
2
/**
3
 * Discovering via config
4
 * User: moyo
5
 * Date: 2018/8/13
6
 * Time: 11:59 AM
7
 */
8
9
namespace Carno\Cluster\Discovery\Adaptors;
10
11
use Carno\Cluster\Contracts\Tags;
12
use Carno\Cluster\Discovery\Discovered;
13
use Carno\Cluster\Managed;
14
use Carno\Config\Config as Source;
15
use Carno\Net\Address;
16
use Carno\Net\Endpoint;
17
use Carno\Net\Routing\Table;
18
use Carno\Promise\Promised;
19
20
class Config implements Discovered
21
{
22
    /**
23
     * @var Source
24
     */
25
    private $source = null;
26
27
    /**
28
     * @var Managed[]
29
     */
30
    private $managed = [];
31
32
    /**
33
     * Config constructor.
34
     * @param Source $config
35
     */
36
    public function __construct(Source $config)
37
    {
38
        $this->source = $config;
39
    }
40
41
    /**
42
     * @param string $group
43
     * @param string $server
44
     * @param Managed $managed
45
     * @return Promised
46
     */
47
    public function attach(string $group, string $server, Managed $managed) : Promised
48
    {
49
        $table = new Table;
50
51
        $this->managed[$named = $this->named($group, $server)] = $managed;
52
53
        $this->source->watching($named, static function (string $dsn = null) use ($table, $named, $managed) {
54
            foreach ($table->reset(...self::endpoints($dsn ?? '', $named)) as $op) {
55
                $op->joined() && $managed->routing()->join($op->target());
56
                $op->leaved() && $managed->routing()->leave($op->target());
57
            }
58
        });
59
60
        return $managed->joined();
61
    }
62
63
    /**
64
     * @param string $group
65
     * @param string $server
66
     * @return Promised
67
     */
68
    public function detach(string $group, string $server) : Promised
69
    {
70
        return $this->managed[$this->named($group, $server)]->shutdown();
71
    }
72
73
    /**
74
     * @param string $group
75
     * @param string $server
76
     * @return string
77
     */
78
    private function named(string $group, string $server) : string
79
    {
80
        return sprintf('%s:%s', $group, $server);
81
    }
82
83
    /**
84
     * @param string $conf
85
     * @param string $named
86
     * @return Endpoint[]
87
     */
88
    private static function endpoints(string $conf, string $named) : array
89
    {
90
        $endpoints = [];
91
92
        foreach (explode("\n", $conf) as $expr) {
93
            $expr = preg_replace('!\s+!', ' ', $expr);
94
95
            $tags = Tags::DEFAULT;
96
            $dsn = $expr;
97
98
            if ($tsp = strpos($expr, ' ')) {
99
                $tags = explode(',', substr($expr, $tsp - strlen($expr) + 1));
100
                $dsn = substr($expr, 0, $tsp);
101
            }
102
103
            $endpoints[] = (new Endpoint(new Address($dsn)))
104
                ->relatedService($named)
105
                ->assignID(md5($expr))
106
                ->setTags(...$tags);
107
            ;
108
        }
109
110
        return $endpoints;
111
    }
112
}
113