1
|
|
|
<?php /** MicroQueue */ |
2
|
|
|
|
3
|
|
|
namespace Micro\Queue\Drivers; |
4
|
|
|
|
5
|
|
|
use Micro\Base\Exception; |
6
|
|
|
|
7
|
|
|
/** |
8
|
|
|
* Queue class file. |
9
|
|
|
* |
10
|
|
|
* @author Oleg Lunegov <[email protected]> |
11
|
|
|
* @link https://github.com/linpax/microphp-framework |
12
|
|
|
* @copyright Copyright (c) 2013 Oleg Lunegov |
13
|
|
|
* @license https://github.com/linpax/microphp-framework/blob/master/LICENSE |
14
|
|
|
* @package Micro |
15
|
|
|
* @subpackage Queue |
16
|
|
|
* @version 1.0 |
17
|
|
|
* @since 1.0 |
18
|
|
|
*/ |
19
|
|
|
class Queue |
20
|
|
|
{ |
21
|
|
|
/** @var array $servers Configuration servers */ |
22
|
|
|
protected $servers = []; |
23
|
|
|
/** @var array $routes Configuration routes */ |
24
|
|
|
protected $routes = []; |
25
|
|
|
/** @var array $brokers Started servers */ |
26
|
|
|
protected $brokers = []; |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* Initialize Queue manager |
31
|
|
|
* |
32
|
|
|
* @access public |
33
|
|
|
* |
34
|
|
|
* @param array $servers |
35
|
|
|
* @param array $routes |
36
|
|
|
* |
37
|
|
|
* @result void |
38
|
|
|
*/ |
39
|
|
|
public function __construct(array $servers = [], array $routes = []) |
40
|
|
|
{ |
41
|
|
|
$this->servers = $servers; |
42
|
|
|
$this->routes = $routes; |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* Send message into service on selected server |
47
|
|
|
* |
48
|
|
|
* @access public |
49
|
|
|
* |
50
|
|
|
* @param string $route |
51
|
|
|
* @param array $data |
52
|
|
|
* @param string $type |
53
|
|
|
* @param int $retry |
54
|
|
|
* |
55
|
|
|
* @return mixed |
56
|
|
|
* @throws Exception |
57
|
|
|
*/ |
58
|
|
|
public function send($route, array $data = [], $type = 'sync', $retry = 5) |
59
|
|
|
{ |
60
|
|
|
$broker = $this->getBroker($route, $type, $retry); |
61
|
|
|
|
62
|
|
|
switch ($type) { |
63
|
|
|
case 'sync': |
64
|
|
|
case 'async': |
65
|
|
|
case 'stream': |
66
|
|
|
break; |
67
|
|
|
|
68
|
|
|
default: |
69
|
|
|
throw new Exception('Service type `'.$type.'` wrong name.'); |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
return $broker->{$type}($route, $data); |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* @param string $uri |
77
|
|
|
* @param string $type |
78
|
|
|
* @param string $retry |
79
|
|
|
* |
80
|
|
|
* @return \Micro\queue\Adapter |
81
|
|
|
* @throws Exception |
82
|
|
|
*/ |
83
|
|
|
private function getBroker($uri, $type, $retry) |
84
|
|
|
{ |
85
|
|
|
$servers = $this->getServersFromRoute($this->getRoute($uri), $type); |
86
|
|
|
$server = null; |
87
|
|
|
|
88
|
|
|
for ($counter = 0; $counter < $retry; $counter++) { |
89
|
|
|
$random = mt_rand(0, count($servers) - 1); |
90
|
|
|
|
91
|
|
|
if (!array_key_exists($servers[$random], $this->brokers)) { |
92
|
|
|
$cls = $this->servers[$servers[$random]]; |
93
|
|
|
$this->brokers[$servers[$random]] = new $cls['class']($cls); |
94
|
|
|
} |
95
|
|
|
/** @noinspection PhpUndefinedMethodInspection */ |
96
|
|
|
if ($this->brokers[$servers[$random]]->test()) { |
97
|
|
|
$server = $servers[$random]; |
98
|
|
|
} |
99
|
|
|
} |
100
|
|
|
if (!$server) { |
101
|
|
|
throw new Exception('Message not send, random servers is down into `'.$uri.'`'); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
return $this->brokers[$server]; |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* Get servers list from routing rule |
109
|
|
|
* |
110
|
|
|
* @access protected |
111
|
|
|
* |
112
|
|
|
* @param array $route Routing rule |
113
|
|
|
* @param string $type Sending type |
114
|
|
|
* |
115
|
|
|
* @return array |
116
|
|
|
* @throws Exception |
117
|
|
|
*/ |
118
|
|
|
protected function getServersFromRoute(array $route, $type = '*') |
119
|
|
|
{ |
120
|
|
|
$servers = []; |
121
|
|
|
|
122
|
|
|
foreach ($route AS $key => $val) { |
123
|
|
|
if (is_string($val)) { |
124
|
|
|
$route['*'] = [$val]; |
125
|
|
|
unset($route[$key]); |
126
|
|
|
} |
127
|
|
|
} |
128
|
|
|
if (array_key_exists($type, $route)) { |
129
|
|
|
$servers += $route[$type]; |
130
|
|
|
} |
131
|
|
|
if (array_key_exists('*', $route)) { |
132
|
|
|
$servers += $route['*']; |
133
|
|
|
} |
134
|
|
|
if (!$servers) { |
|
|
|
|
135
|
|
|
throw new Exception('Type `'.$type.'` not found into route'); |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
return $servers; |
139
|
|
|
} |
140
|
|
|
|
141
|
|
|
/** |
142
|
|
|
* Get rules from route by pattern |
143
|
|
|
* |
144
|
|
|
* @access protected |
145
|
|
|
* |
146
|
|
|
* @param string $uri URI for match |
147
|
|
|
* |
148
|
|
|
* @return array Rules for URI |
149
|
|
|
* @throws Exception |
150
|
|
|
*/ |
151
|
|
|
protected function getRoute($uri) |
152
|
|
|
{ |
153
|
|
|
$keys = array_keys($this->routes); |
154
|
|
|
|
155
|
|
|
foreach (range(0, count($keys) - 1) AS $i) { |
156
|
|
|
if (preg_match('/'.$keys[$i].'/', $uri)) { |
157
|
|
|
if (!is_array($this->routes[$keys[$i]])) { |
158
|
|
|
$this->routes[$keys[$i]] = ['*' => $this->routes[$keys[$i]]]; |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
return $this->routes[$keys[$i]]; |
162
|
|
|
} |
163
|
|
|
} |
164
|
|
|
throw new Exception('Route `'.$uri.'` not found'); |
165
|
|
|
} |
166
|
|
|
} |
167
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.