1
|
|
|
<?php |
2
|
|
|
namespace Disque\Connection\Node; |
3
|
|
|
|
4
|
|
|
use InvalidArgumentException; |
5
|
|
|
|
6
|
|
|
/** |
7
|
|
|
* A prioritizer switching nodes if they have more jobs by a given margin |
8
|
|
|
* |
9
|
|
|
* This class prioritizes nodes by job count and its Disque priority. Because |
10
|
|
|
* there is a cost to switch, it doesn't switch from the current node unless |
11
|
|
|
* the new candidate has a safe margin over the current node. |
12
|
|
|
* |
13
|
|
|
* This margin can be set manually and defaults to 5%, ie. the new candidate |
14
|
|
|
* must have 5% more jobs than the current node. |
15
|
|
|
* |
16
|
|
|
* This parameter makes the prioritizer behave conservatively - it prefers |
17
|
|
|
* the status quo and won't switch immediately if the difference is small. |
18
|
|
|
* |
19
|
|
|
* You can make the prioritizer eager by setting the margin to 0, or more |
20
|
|
|
* conservative by setting it higher. Setting the margin to negative values |
21
|
|
|
* is not allowed. |
22
|
|
|
*/ |
23
|
|
|
class ConservativeJobCountPrioritizer implements NodePrioritizerInterface |
24
|
|
|
{ |
25
|
|
|
/** |
26
|
|
|
* @var float A margin to switch from the current node |
27
|
|
|
* |
28
|
|
|
* 0.05 means the new node must have 5% more jobs than the current node |
29
|
|
|
* in order to recommend switching over. |
30
|
|
|
*/ |
31
|
|
|
private $marginToSwitch = 0.05; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* Get the margin to switch |
35
|
|
|
* |
36
|
|
|
* @return float |
37
|
|
|
*/ |
38
|
|
|
public function getMarginToSwitch() |
39
|
|
|
{ |
40
|
|
|
return $this->marginToSwitch; |
41
|
|
|
} |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* Set the margin to switch |
45
|
|
|
* |
46
|
|
|
* @param float $marginToSwitch A positive float or 0 |
47
|
|
|
* |
48
|
|
|
* @throws InvalidArgumentException |
49
|
|
|
*/ |
50
|
|
|
public function setMarginToSwitch($marginToSwitch) |
51
|
|
|
{ |
52
|
|
|
if ($marginToSwitch < 0) { |
53
|
|
|
throw new InvalidArgumentException('Margin to switch must not be negative'); |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
$this->marginToSwitch = $marginToSwitch; |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @inheritdoc |
61
|
|
|
*/ |
62
|
|
|
public function sort(array $nodes, $currentNodeId) |
63
|
|
|
{ |
64
|
|
|
// Optimize for a "cluster" consisting of just 1 node - skip everything |
65
|
|
|
if (count($nodes) === 1) { |
66
|
|
|
return $nodes; |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
uasort($nodes, function(Node $nodeA, Node $nodeB) use ($currentNodeId) { |
70
|
|
|
$priorityA = $this->calculateNodePriority($nodeA, $currentNodeId); |
71
|
|
|
$priorityB = $this->calculateNodePriority($nodeB, $currentNodeId); |
72
|
|
|
|
73
|
|
|
if ($priorityA === $priorityB) { |
74
|
|
|
return 0; |
75
|
|
|
} |
76
|
|
|
|
77
|
|
|
// Nodes with a higher priority should go first |
78
|
|
|
return ($priorityA < $priorityB) ? 1 : -1; |
79
|
|
|
}); |
80
|
|
|
|
81
|
|
|
return $nodes; |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* Calculate the node priority from its job count, stick to the current node |
86
|
|
|
* |
87
|
|
|
* As the priority is based on the number of jobs, higher is better. |
88
|
|
|
* |
89
|
|
|
* @param Node $node |
90
|
|
|
* @param string $currentNodeId |
91
|
|
|
* |
92
|
|
|
* @return float Node priority |
93
|
|
|
*/ |
94
|
|
|
private function calculateNodePriority(Node $node, $currentNodeId) |
95
|
|
|
{ |
96
|
|
|
$priority = $node->getJobCount(); |
97
|
|
|
|
98
|
|
|
if ($node->getId() === $currentNodeId) { |
99
|
|
|
$margin = 1 + $this->marginToSwitch; |
100
|
|
|
$priority = $priority * $margin; |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
// Apply a weight determined by the node priority as assigned by Disque. |
104
|
|
|
// Priority 1 means the node is healthy. |
105
|
|
|
// Priority 10 to 100 means the node is probably failing, or has failed |
106
|
|
|
$disquePriority = $node->getPriority(); |
107
|
|
|
|
108
|
|
|
// Disque node priority should never be lower than 1, but let's be sure |
109
|
|
|
if ($disquePriority < Node::PRIORITY_OK) { |
110
|
|
|
$disquePriorityWeight = 1; |
111
|
|
|
} elseif (Node::PRIORITY_OK <= $disquePriority && $disquePriority < Node::PRIORITY_POSSIBLE_FAILURE) { |
112
|
|
|
// Node is OK, but Disque may have assigned a lower priority to it |
113
|
|
|
// We use the base-10 logarithm in the formula, so priorities |
114
|
|
|
// 1 to 10 transform into a weight of 1 to 0.5. When Disque starts |
115
|
|
|
// using more priority steps, priority 9 will discount about a half |
116
|
|
|
// of the job count. |
117
|
|
|
$disquePriorityWeight = 1 / (1 + log10($disquePriority)); |
118
|
|
|
} else { |
119
|
|
|
// Node is failing, or it has failed |
120
|
|
|
$disquePriorityWeight = 0; |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$priority = $priority * $disquePriorityWeight; |
124
|
|
|
return (float) $priority; |
125
|
|
|
} |
126
|
|
|
} |