LoopedProcessRunner::hasExceededTimeLimit()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 4
cp 0
rs 9.9666
c 0
b 0
f 0
cc 2
nc 2
nop 0
crap 6
1
<?php
2
3
namespace Wems\LoopRunner;
4
5
use React\EventLoop\LoopInterface;
6
7
class LoopedProcessRunner
8
{
9
    /** @var LoopInterface */
10
    private $loop;
11
12
    /** @var callable */
13
    private $process;
14
15
    /** @var int */
16
    private $elapsedTime = 0;
17
18
    /** @var int */
19
    private $startTime;
20
21
    /** @var int|null */
22
    private $timeLimit;
23
24
    /** @var int */
25
    private $interval = 1;
26
27
    /**
28
     * @param LoopInterface $loop
29
     * @param callable      $process function that returns a boolean to decide whether to go again immediately
30
     */
31
    public function __construct(LoopInterface $loop, callable $process)
32
    {
33
        $this->loop = $loop;
34
        $this->process = $process;
35
    }
36
37
    /**
38
     * @param callable $process function that returns a boolean to decide whether to go again immediately
39
     *
40
     * @return self
41
     */
42
    public function setProcess(callable $process): self
43
    {
44
        $this->process = $process;
45
46
        return $this;
47
    }
48
49
    /**
50
     * @param int $timeLimit in seconds
51
     *
52
     * @return self
53
     */
54
    public function setTimeLimit(int $timeLimit): self
55
    {
56
        $this->timeLimit = $timeLimit;
57
58
        return $this;
59
    }
60
61
    /**
62
     * @param int $interval seconds to wait between runs that didn't process anything
63
     *
64
     * @return self
65
     */
66
    public function setInterval(int $interval): self
67
    {
68
        $this->interval = $interval;
69
70
        return $this;
71
    }
72
73
    /**
74
     * run the callable process repeatedly, every second unless it returns true in which case more than once per second
75
     * in the context of pulling from a Redis queue, we return true if we processed an item because that suggests that
76
     * there may be more items to process, so we should try again immediately instead of waiting for the next call
77
     * this should allow us to process many items very quickly whilst retaining low overheads
78
     * it will keep the loop active until the given time limit, and then expire
79
     */
80
    public function run()
81
    {
82
        $this->startTime = time();
83
        $this->elapsedTime = 0;
84
85
        $this->loop->addPeriodicTimer($this->interval, function () {
86
            $process = $this->process;
87
88
            do {
89
                $didSomethingThisRun = $process();
90
                $this->updateElapsedTime();
91
            } while ($didSomethingThisRun && $this->notExceededTimeLimit());
92
93
            if ($this->hasExceededTimeLimit()) {
94
                $this->loop->stop();
95
            }
96
        });
97
98
        $this->loop->run();
99
    }
100
101
    private function updateElapsedTime()
102
    {
103
        $this->elapsedTime = time() - $this->startTime;
104
    }
105
106
    private function hasExceededTimeLimit(): bool
107
    {
108
        // if there is no time limit then we have nothing to exceed
109
        if (is_null($this->timeLimit)) {
110
            return false;
111
        }
112
113
        return $this->elapsedTime >= $this->timeLimit;
114
    }
115
116
    private function notExceededTimeLimit(): bool
117
    {
118
        return !$this->hasExceededTimeLimit();
119
    }
120
}
121