ProcessRegistry::removeExpiredPids()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 4
nc 3
nop 3
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
<?php
2
/**
3
 * Defines the ProcessRegistry class which uses MongoDB as a backend.
4
 */
5
6
namespace TraderInteractive\Cronus;
7
8
use MongoDB\BSON\ObjectID;
9
use MongoDB\Collection;
10
11
/**
12
 * Class that adds/removes from a process registry.
13
 */
14
final class ProcessRegistry implements ProcessRegistryInterface
15
{
16
    /** example doc:
17
     * {
18
     *     '_id': 'a unique id',
19
     *     'hosts': {
20
     *         'a hostname' : {
21
     *             'a pid': expire timestamp,
22
     *             ...
23
     *         },
24
     *         ...
25
     *     },
26
     *     'version' => ObjectID(an id),
27
     * }
28
     */
29
30
    const MONGO_INT32_MAX = 2147483647;//2147483648 can overflow in php mongo without using the MongoInt64
31
32
    /**
33
     * MongoDB collection containing the process information.
34
     *
35
     * @var Collection
36
     */
37
    private $collection;
38
39
    /**
40
     * @var string
41
     */
42
    private $encodedHostName;
43
44
    /**
45
     * @var integer
46
     */
47
    private $pid;
48
49
    /**
50
     * Construct a new instance of the registry.
51
     *
52
     * @param Collection $collection The MongoDB collection containing the process information.
53
     */
54
    public function __construct(Collection $collection)
55
    {
56
        $this->collection = $collection;
57
        $this->encodedHostName = str_replace(['.', '$'], ['_DOT_', '_DOLLAR_'], gethostname());
58
        $this->pid = getmypid();
59
    }
60
61
    /**
62
     * Add to process registry. Adds based on $maxGlobalProcesses and $maxHostProcesses after a process registry
63
     * cleaning.
64
     *
65
     * @param ProcessInterface $process The process to add.
66
     *
67
     * @return boolean true if the process was added, false if not or there is too much concurrency at the moment.
68
     */
69
    public function add(ProcessInterface $process) : bool
70
    {
71
        //loop in case the update fails its optimistic concurrency check
72
        for ($i = 0; $i < 5; ++$i) {
73
            $this->ensureProcessDocumentExists($process->getName());
74
            $existing = $this->getExistingProcessDocument($process->getName());
75
            $replacement = $this->generateReplacementDocument($existing);
76
            $totalPidCount = $this->getTotalPidCount($replacement);
77
            $thisHostPids = $replacement['hosts'][$this->encodedHostName] ?? [];
78
            if ($totalPidCount >= $process->getMaxGlobalProcesses()
79
                    || count($thisHostPids) >= $process->getMaxHostProcesses()) {
80
                return false;
81
            }
82
83
            // add our process
84
            $thisHostPids[$this->pid] = $this->getExpiresSeconds($process->getMinsBeforeExpire());
85
            $replacement['hosts'][$this->encodedHostName] = $thisHostPids;
86
87
            if ($this->updateExistingDocument($existing['_id'], $existing['version'], $replacement)) {
88
                return true;
89
            }
90
        }
91
92
        //too much concurrency at the moment, return false to signify not added.
93
        return false;
94
    }
95
96
    private function updateExistingDocument(string $id, ObjectID $version, array $replacement) : bool
97
    {
98
        $status = $this->collection->replaceOne(
99
            ['_id' => $id, 'version' => $version],
100
            $replacement,
101
            ['writeConcern' => new \MongoDB\Driver\WriteConcern(1, 100, true)]
102
        );
103
104
        return $status->getMatchedCount() === 1;
105
    }
106
107
    private function ensureProcessDocumentExists(string $id)
108
    {
109
        $this->collection->findOneAndUpdate(
110
            ['_id' => $id],
111
            ['$setOnInsert' => ['hosts' => [], 'version' => new ObjectID()]],
112
            ['upsert' => true]
113
        );
114
    }
115
116
    private function generateReplacementDocument(array $existing) : array
117
    {
118
        $replacement = $existing;
119
        $replacement['version'] = new ObjectID();
120
121
        //clean $replacement based on their pids and expire times
122
        foreach ($existing['hosts'] as $hostname => $pids) {
123
            $this->removeExpiredPids($pids, $hostname, $replacement);
124
            if (empty($replacement['hosts'][$hostname])) {
125
                unset($replacement['hosts'][$hostname]);
126
            }
127
        }
128
129
        return $replacement;
130
    }
131
132
    private function getExistingProcessDocument(string $id) : array
133
    {
134
        return $this->collection->findOne(
135
            ['_id' => $id],
136
            ['typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array']]
137
        );
138
    }
139
140
    private function removeExpiredPids(array $pids, string $hostname, array &$replacement)
141
    {
142
        foreach ($pids as $pid => $expires) {
143
            if ($this->canRemovePid($hostname, $pid, $expires)) {
144
                unset($replacement['hosts'][$hostname][$pid]);
145
            }
146
        }
147
    }
148
149
    private function canRemovePid(string $hostname, int $pid, int $expires) : bool
150
    {
151
        return (($hostname === $this->encodedHostName && !file_exists("/proc/{$pid}"))
152
                || time() > $expires
153
                || ($hostname === $this->encodedHostName && $pid === $this->pid));
154
    }
155
156
    private function getTotalPidCount(array $document) : int
157
    {
158
        $totalPidCount = 0;
159
        foreach ($document['hosts'] as $hostname => $pids) {
160
            $totalPidCount += count($pids);
161
        }
162
163
        return $totalPidCount;
164
    }
165
166
    private function getExpiresSeconds(int $minsBeforeExpire) : int
167
    {
168
         $expireSecs = time() + $minsBeforeExpire * 60;
169
         //ensure expireSecs is between 0 and self::MONGO_INT32_MAX
170
         return max(0, min($expireSecs, self::MONGO_INT32_MAX));
171
    }
172
173
    /**
174
     * Removes from process registry. Does not do anything needed for use of the add() method. Most will only use at the
175
     * end of their script so the mongo collection is up to date.
176
     *
177
     * @param ProcessInterface $process The process to remove.
178
     *
179
     * @return void
180
     */
181
    public function remove(ProcessInterface $process)
182
    {
183
        $this->collection->updateOne(
184
            ['_id' => $process->getName()],
185
            ['$unset' => ["hosts.{$this->encodedHostName}.{$this->pid}" => ''], '$set' => ['version' => new ObjectID()]]
186
        );
187
    }
188
189
    /**
190
     * Reset a process expire time in the registry.
191
     *
192
     * @param ProcessInterface $process The process to reset.
193
     *
194
     * @return void
195
     */
196
    public function reset(ProcessInterface $process)
197
    {
198
        $expireSecs = time() + $process->getMinsBeforeExpire() * 60;
199
        //ensure expireSecs is between 0 and self::MONGO_INT32_MAX
200
        $expireSecs = max(0, min($expireSecs, self::MONGO_INT32_MAX));
201
202
        $this->collection->updateOne(
203
            ['_id' => $process->getName()],
204
            [
205
                '$set' => [
206
                    "hosts.{$this->encodedHostName}.{$this->pid}" => $expireSecs,
207
                    'version' => new ObjectID(),
208
                ],
209
            ]
210
        );
211
    }
212
}
213