Consumer   A
last analyzed

Complexity

Total Complexity 9

Size/Duplication

Total Lines 117
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 100%

Importance

Changes 8
Bugs 2 Features 4
Metric Value
wmc 9
c 8
b 2
f 4
lcom 1
cbo 3
dl 0
loc 117
ccs 30
cts 30
cp 1
rs 10

7 Methods

Rating   Name   Duplication   Size   Complexity  
A getJobs() 0 14 1
A getJob() 0 9 2
A ack() 0 5 1
A fastAck() 0 5 1
A show() 0 10 2
A nack() 0 4 1
A working() 0 4 1
1
<?php
2
namespace Phloppy\Client;
3
4
use Phloppy\Exception\CommandException;
5
use Phloppy\Job;
6
use Phloppy\RespUtils;
7
8
/**
9
 * Consumer client implementation.
10
 */
11
class Consumer extends AbstractClient {
12
13
    /**
14
     * @param string|string[] $queues
15
     * @param int $count
16
     * @param int $timeoutMs
17
     * @return Job[]
18
     */
19 7
    public function getJobs($queues, $count = 1, $timeoutMs = 200)
20
    {
21 7
        return $this->mapJobs((array) $this->send(array_merge(
22
            [
23 7
                'GETJOB',
24 7
                'TIMEOUT',
25 7
                $timeoutMs,
26 7
                'COUNT',
27 7
                (int) $count,
28 7
                'FROM',
29 7
            ],
30
            (array) $queues
31 7
        )));
32
    }
33
34
35
    /**
36
     * Retrieve a single job from the given queues
37
     *
38
     * @param string|string[] $queues
39
     * @param int $timeoutMs How long to block client when waiting for new jobs.
40
     * @return Job|null A Job if found, null otherwise.
41
     */
42 7
    public function getJob($queues, $timeoutMs = 200) {
43 7
        $jobs = $this->getJobs($queues, 1, $timeoutMs);
44
45 7
        if (empty($jobs)) {
46 2
            return null;
47
        }
48
49 6
        return $jobs[0];
50
    }
51
52
53
    /**
54
     * Acknowledge a job execution.
55
     *
56
     * @param Job $job
57
     *
58
     * @return int Number of Jobs acknowledged.
59
     */
60 4
    public function ack(Job $job)
61
    {
62 4
        assert($job->getId() != null);
63 4
        return (int) $this->send(['ACKJOB', $job->getId()]);
64
    }
65
66
67
    /**
68
     * Fast Acknowledge a job execution.
69
     *
70
     * @param Job $job
71
     *
72
     * @return int Number of Jobs acknowledged.
73
     * @see https://github.com/antirez/disque#fast-acknowledges
74
     */
75 1
    public function fastAck(Job $job)
76
    {
77 1
        assert($job->getId() != null);
78 1
        return (int) $this->send(['FASTACK', $job->getId()]);
79
    }
80
81
82
    /**
83
     * Return the job with the given jobId.
84
     *
85
     * @param string $jobId
86
     *
87
     * @return Job|null
88
     * @throws CommandException
89
     */
90 2
    public function show($jobId)
91
    {
92 2
        $result = $this->send(['SHOW', (string) $jobId]);
93
94 1
        if (!$result) {
95 1
            return null;
96
        }
97
98 1
        return Job::create(RespUtils::toAssoc($result));
99
    }
100
101
102
    /**
103
     * NACK the given job ids.
104
     *
105
     * @param string[] $jobIds
106
     * @return int Number of NACK'd jobs
107
     */
108 1
    public function nack(array $jobIds)
109
    {
110 1
        return (int) $this->send(array_merge(['NACK'], $jobIds));
111
    }
112
113
114
    /**
115
     * Send a notice to the server that this client is still processing the job.
116
     *
117
     * @param Job $job
118
     *
119
     * @return int The current retry time for the job
120
     *
121
     * @see https://github.com/antirez/disque#working-jobid
122
     */
123 1
    public function working(Job $job)
124
    {
125 1
        return (int) $this->send(['WORKING', $job->getId()]);
126
    }
127
}
128