Client::push()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 7
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 4
c 1
b 0
f 0
nc 1
nop 4
dl 7
loc 7
ccs 0
cts 6
cp 0
crap 2
rs 9.4285
1
<?php
2
3
namespace SidekiqJob;
4
5
/**
6
 * Class Client
7
 * @package SidekiqJob
8
 */
9
class Client
10
{
11
    /** Default queue name */
12
    const QUEUE = 'default';
13
14
    /** Namespace  */
15
    protected $namespace;
16
17
    /** @var \Predis\Client */
18
    public $redis;
19
20
    /** @var Serializer */
21
    protected $serializer;
22
23
    /** @var IdGenerator */
24
    protected $idGenerator;
25
26
    /**
27
     * Sidekiq job pusher init
28
     *
29
     * @param \Predis\Client $redis
30
     * @param string         $namespace
31
     * @param Serializer     $serializer
32
     * @param IdGenerator    $idGenerator
33
     */
34
    public function __construct(\Predis\Client $redis, $namespace = null, $serializer = null, $idGenerator = null)
35
    {
36
        $this->redis = $redis;
37
        $this->namespace = ($namespace === null) ? '' : (string) $namespace;
38
        $this->serializer = ($serializer === null) ? new Serializer() : $serializer;
39
        $this->idGenerator = ($idGenerator === null) ? new IdGenerator() : $idGenerator;
40
    }
41
42
    /**
43
     * Push a job
44
     *
45
     * @param string $class
46
     * @param array  $args
47
     * @param bool   $retry
48
     * @param string $queue
49
     * @return string
50
     */
51 View Code Duplication
    public function push($class, $args = [], $retry = true, $queue = self::QUEUE)
52
    {
53
        $jobId = $this->idGenerator->generate();
54
        $this->atomicPush($jobId, $class, $args, $queue, $retry);
55
56
        return $jobId;
57
    }
58
59
    /**
60
     * Schedule a job at a certain time
61
     *
62
     * @param float  $doAt
63
     * @param string $class
64
     * @param array  $args
65
     * @param bool   $retry
66
     * @param string $queue
67
     * @return string
68
     */
69 View Code Duplication
    public function schedule($doAt, $class, $args = [], $retry = true, $queue = self::QUEUE)
70
    {
71
        $jobId = $this->idGenerator->generate();
72
        $this->atomicPush($jobId, $class, $args, $queue, $retry, $doAt);
73
74
        return $jobId;
75
    }
76
77
    /**
78
     * Push multiple jobs to queue
79
     *
80
     * Format:
81
     * $jobs = [
82
     *  [
83
     *      'class' => 'SomeClass',
84
     *      'args' => array(),
85
     *      'retry' => false,
86
     *      'at' => microtime(true)
87
     *  ]
88
     * ];
89
     *
90
     * @param array  $jobs
91
     * @param string $queue
92
     * @return string
93
     * @throws exception Exception
94
     */
95
    public function pushBulk($jobs = [], $queue = self::QUEUE)
96
    {
97
        $ids = [];
98
        foreach ($jobs as $job) {
99
            if (!isset($job['class'])) {
100
                throw new Exception('pushBulk: each job needs a job class');
101
            }
102
            if (!isset($job['args']) || !is_array($job['args'])) {
103
                throw new Exception('pushBulk: each job needs args');
104
            }
105
106
            $retry = isset($job['retry']) ? $job['retry'] : true;
107
            $doAt = isset($job['at']) ? $job['at'] : null;
108
109
            $jobId = $this->idGenerator->generate();
110
            array_push($ids, $jobId);
111
            $this->atomicPush($jobId, $job['class'], $job['args'], $queue, $retry, $doAt);
112
        }
113
114
        return $ids;
115
    }
116
117
    /**
118
     * Push job to redis
119
     *
120
     * @param string     $jobId
121
     * @param string     $class
122
     * @param array      $args
123
     * @param string     $queue
124
     * @param bool       $retry
125
     * @param float|null $doAt
126
     * @throws exception Exception
127
     */
128
    private function atomicPush($jobId, $class, $args = [], $queue = self::QUEUE, $retry = true, $doAt = null)
129
    {
130
        if (array_values($args) !== $args) {
131
            throw new Exception('Associative arrays in job args are not allowed');
132
        }
133
134
        if (!is_null($doAt) && !is_float($doAt) && is_string($doAt)) {
135
            throw new Exception('at argument needs to be in a unix epoch format. Use microtime(true).');
136
        }
137
138
        $job = $this->serializer->serialize($jobId, $class, $args, $retry, $queue);
139
140
        if ($doAt === null) {
141
            $this->redis->sadd($this->name('queues'), $queue);
142
            $this->redis->lpush($this->name('queue', $queue), $job);
143
        } else {
144
            $this->redis->zadd($this->name('schedule'), $doAt, $job);
145
        }
146
    }
147
148
    /**
149
     * @param string ...$key
150
     * @return string
151
     */
152
    private function name()
153
    {
154
        return implode(':', array_filter(array_merge([$this->namespace], func_get_args()), 'strlen'));
155
    }
156
157
    /**
158
     * @return \Predis\Client
159
     */
160
    public function getRedis()
161
    {
162
        return $this->redis;
163
    }
164
}
165