Completed
Push — master ( 673fc6...a88edb )
by Oleg
03:40
created

Queue::getBroker()   B

Complexity

Conditions 5
Paths 10

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 23
rs 8.5906
cc 5
eloc 13
nc 10
nop 3
1
<?php /** MicroQueue */
2
3
namespace Micro\Queue\Drivers;
4
5
use Micro\Base\Exception;
6
7
/**
8
 * Queue class file.
9
 *
10
 * @author Oleg Lunegov <[email protected]>
11
 * @link https://github.com/linpax/microphp-framework
12
 * @copyright Copyright (c) 2013 Oleg Lunegov
13
 * @license https://github.com/linpax/microphp-framework/blob/master/LICENSE
14
 * @package Micro
15
 * @subpackage Queue
16
 * @version 1.0
17
 * @since 1.0
18
 */
19
class Queue
20
{
21
    /** @var array $servers Configuration servers */
22
    protected $servers = [];
23
    /** @var array $routes Configuration routes */
24
    protected $routes = [];
25
    /** @var array $brokers Started servers */
26
    protected $brokers = [];
27
28
29
    /**
30
     * Initialize Queue manager
31
     *
32
     * @access public
33
     *
34
     * @param array $servers
35
     * @param array $routes
36
     *
37
     * @result void
38
     */
39
    public function __construct(array $servers = [], array $routes = [])
40
    {
41
        $this->servers = $servers;
42
        $this->routes = $routes;
43
    }
44
45
    /**
46
     * Send message into service on selected server
47
     *
48
     * @access public
49
     *
50
     * @param string $route
51
     * @param array $data
52
     * @param string $type
53
     * @param int $retry
54
     *
55
     * @return mixed
56
     * @throws Exception
57
     */
58
    public function send($route, array $data = [], $type = 'sync', $retry = 5)
59
    {
60
        $broker = $this->getBroker($route, $type, $retry);
61
62
        switch ($type) {
63
            case 'sync':
64
            case 'async':
65
            case 'stream':
66
                break;
67
68
            default:
69
                throw new Exception('Service type `'.$type.'` wrong name.');
70
        }
71
72
        return $broker->{$type}($route, $data);
73
    }
74
75
    /**
76
     * @param string $uri
77
     * @param string $type
78
     * @param string $retry
79
     *
80
     * @return \Micro\queue\Adapter
81
     * @throws Exception
82
     */
83
    private function getBroker($uri, $type, $retry)
84
    {
85
        $servers = $this->getServersFromRoute($this->getRoute($uri), $type);
86
        $server = null;
87
88
        for ($counter = 0; $counter < $retry; $counter++) {
89
            $random = mt_rand(0, count($servers) - 1);
90
91
            if (!array_key_exists($servers[$random], $this->brokers)) {
92
                $cls = $this->servers[$servers[$random]];
93
                $this->brokers[$servers[$random]] = new $cls['class']($cls);
94
            }
95
            /** @noinspection PhpUndefinedMethodInspection */
96
            if ($this->brokers[$servers[$random]]->test()) {
97
                $server = $servers[$random];
98
            }
99
        }
100
        if (!$server) {
101
            throw new Exception('Message not send, random servers is down into `'.$uri.'`');
102
        }
103
104
        return $this->brokers[$server];
105
    }
106
107
    /**
108
     * Get servers list from routing rule
109
     *
110
     * @access protected
111
     *
112
     * @param array $route Routing rule
113
     * @param string $type Sending type
114
     *
115
     * @return array
116
     * @throws Exception
117
     */
118
    protected function getServersFromRoute(array $route, $type = '*')
119
    {
120
        $servers = [];
121
122
        foreach ($route AS $key => $val) {
123
            if (is_string($val)) {
124
                $route['*'] = [$val];
125
                unset($route[$key]);
126
            }
127
        }
128
        if (array_key_exists($type, $route)) {
129
            $servers += $route[$type];
130
        }
131
        if (array_key_exists('*', $route)) {
132
            $servers += $route['*'];
133
        }
134
        if (!$servers) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $servers of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
135
            throw new Exception('Type `'.$type.'` not found into route');
136
        }
137
138
        return $servers;
139
    }
140
141
    /**
142
     * Get rules from route by pattern
143
     *
144
     * @access protected
145
     *
146
     * @param string $uri URI for match
147
     *
148
     * @return array Rules for URI
149
     * @throws Exception
150
     */
151
    protected function getRoute($uri)
152
    {
153
        $keys = array_keys($this->routes);
154
155
        foreach (range(0, count($keys) - 1) AS $i) {
156
            if (preg_match('/'.$keys[$i].'/', $uri)) {
157
                if (!is_array($this->routes[$keys[$i]])) {
158
                    $this->routes[$keys[$i]] = ['*' => $this->routes[$keys[$i]]];
159
                }
160
161
                return $this->routes[$keys[$i]];
162
            }
163
        }
164
        throw new Exception('Route `'.$uri.'` not found');
165
    }
166
}
167