Completed
Push — master ( 987339...3717b7 )
by Mike
02:45
created

SchedulerWorker::handleReceivedCommand()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 7
nc 1
nop 1
crap 1
1
<?php
2
3
namespace MGDigital\BusQue;
4
5
class SchedulerWorker
6
{
7
8
    const DEFAULT_THROTTLE = 100;
9
10
    private $implementation;
11
12 2
    public function __construct(Implementation $implementation)
13
    {
14 2
        $this->implementation = $implementation;
15 2
    }
16
17
    /**
18
     * @param int|null $limit The maximum number of scheduled commands to queue.
19
     * @param int $throttle The maximum number of scheduled commands to receive at a time.
20
     * @param int|null $time The maximum amount of time in seconds to work.
21
     * @param int $uSleepTime The number of microseconds to usleep between each query to the scheduler.
22
     * @param \DateInterval|null $expiry The expiry interval for an overdue unqueued command.
23
     */
24 1
    public function work(
25
        int $limit = null,
26
        int $throttle = self::DEFAULT_THROTTLE,
27
        int $time = null,
28
        int $uSleepTime = 5000000,
29
        \DateInterval $expiry = null
30
    ) {
31 1
        $stopwatchStart = time();
32 1
        while ($limit === null || $limit > 0) {
33 1
            $queuedCount = $this->iterate($throttle, $expiry);
34 1
            if ($limit !== null) {
35 1
                $limit -= $queuedCount;
36 1
                if ($limit <= 0) {
37 1
                    break;
38
                }
39
            }
40
            if ($time !== null && (time() - $stopwatchStart >= $time)) {
41
                break;
42
            }
43
            usleep($uSleepTime);
44
        }
45 1
    }
46
47 1
    private function iterate(int $throttle, \DateInterval $expiry = null): int
48
    {
49 1
        $count = 0;
50 1
        $now = $this->implementation->getClock()->getTime();
51 1
        if ($expiry === null) {
52 1
            $start = null;
53
        } else {
54
            $start = clone $now;
55
            $start = $start->sub($expiry);
56
        }
57 1
        $receivedCommands = $this->implementation->getSchedulerDriver()
58 1
            ->receiveDueCommands($now, $throttle, $start);
59 1
        foreach ($receivedCommands as $command) {
60 1
            $this->handleReceivedCommand($command);
61 1
            $count++;
62
        }
63 1
        return $count;
64
    }
65
66 1
    private function handleReceivedCommand(ReceivedScheduledCommand $command)
67
    {
68 1
        $this->implementation->getQueueDriver()
69 1
            ->queueCommand(
70 1
                $command->getQueueName(),
71 1
                $command->getId(),
72 1
                $command->getSerialized()
73
            );
74 1
        $this->implementation->getLogger()->debug('Scheduled command queued', compact('command'));
75 1
    }
76
}
77