Completed
Push — master ( 0fd676...83be6d )
by Alexey
37:37
created

JobQueue.php (1 issue)

Labels
Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace yiicod\jobqueue;
4
5
use Illuminate\Queue\Capsule\Manager;
6
use Illuminate\Queue\Connectors\ConnectorInterface;
7
use yii\base\BootstrapInterface;
8
use yii\base\Component;
9
use yiicod\jobqueue\connectors\MongoThreadConnector;
10
11
/**
12
 * Yii component for laravel 5 queues to work with mongodb
13
 *
14
 * @author Orlov Alexey <[email protected]>
15
 * @author Virchenko Maksim <[email protected]>
16
 */
17
class JobQueue extends Component implements BootstrapInterface
18
{
19
    /**
20
     * Available connections
21
     *
22
     * @var array
23
     */
24
    public $connections = [
25
        'thread' => [
26
            'driver' => 'mongo-thread',
27
            'table' => 'yii_jobs_thread',
28
            'queue' => 'default',
29
            'expire' => 60,
30
            'limit' => 15, // Or 1
31
            'connection' => 'mongodb',
32
        ],
33
    ];
34
35
    /**
36
     * Encryption key
37
     *
38
     * @var string
39
     */
40
    public $privateKey = 'rc5lgpue80sr17nx';
41
42
    /**
43
     * Manager instance
44
     *
45
     * @var Manager
46
     */
47
    private $manager = null;
48
49
    /**
50
     * Initialize
51
     *
52
     * @param \yii\base\Application $app
53
     */
54
    public function bootstrap($app)
55
    {
56
        $this->connect();
57
    }
58
59
    /**
60
     * Connect queue manager for mongo database
61
     *
62
     * @return Manager
63
     */
64
    protected function connect()
65
    {
66
        /* @var  Manager manager */
67
        $this->manager = new Manager();
68
69
        $this->addConnector('mongo-thread', new MongoThreadConnector());
70
71
        foreach ($this->connections as $name => $params) {
72
            $this->manager->addConnection($params, $name);
73
        }
74
75
        //Set as global to access
76
        $this->manager->setAsGlobal();
77
    }
78
79
    /**
80
     * Add connector
81
     *
82
     * @param string $name
83
     * @param Closure $resolver
0 ignored issues
show
There is no parameter named $resolver. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
84
     */
85
    public function addConnector(string $name, ConnectorInterface $connector)
86
    {
87
        $this->manager->addConnector($name, function () use ($connector) {
88
            return $connector;
89
        });
90
    }
91
92
    /**
93
     * Get queue manager instance
94
     *
95
     * @return mixed
96
     */
97
    public function getQueueManager()
98
    {
99
        return $this->manager->getQueueManager();
100
    }
101
102
    /**
103
     * Push new job to queue
104
     *
105
     * @param mixed $job
106
     * @param array $data
107
     * @param string $queue
108
     * @param string $connection
109
     *
110
     * @return mixed
111
     */
112
    public static function push($job, $data = [], $queue = 'default', $connection = 'default')
113
    {
114
        return Manager::push($job, $data, $queue, $connection);
115
    }
116
117
    /**
118
     * Push new job to queue if this job is not exist
119
     *
120
     * @param mixed $job
121
     * @param array $data
122
     * @param string $queue
123
     * @param string $connection
124
     *
125
     * @return mixed
126
     */
127
    public static function pushUnique($job, $data = [], $queue = 'default', $connection = 'default')
128
    {
129
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
130
            return Manager::push($job, $data, $queue, $connection);
131
        }
132
133
        return null;
134
    }
135
136
    /**
137
     * Push a new an array of jobs onto the queue.
138
     *
139
     * @param  array $jobs
140
     * @param  mixed $data
141
     * @param  string $queue
142
     * @param  string $connection
143
     *
144
     * @return mixed
145
     */
146
    public static function bulk($jobs, $data = '', $queue = null, $connection = null)
147
    {
148
        return Manager::bulk($jobs, $data, $queue, $connection);
149
    }
150
151
    /**
152
     * Push a new job onto the queue after a delay.
153
     *
154
     * @param  \DateTime|int $delay
155
     * @param  string $job
156
     * @param  mixed $data
157
     * @param  string $queue
158
     * @param  string $connection
159
     *
160
     * @return mixed
161
     */
162
    public static function later($delay, $job, $data = '', $queue = null, $connection = null)
163
    {
164
        return Manager::later($delay, $job, $data, $queue, $connection);
165
    }
166
167
    /**
168
     * Push a new job into the queue after a delay if job does not exist.
169
     *
170
     * @param  \DateTime|int $delay
171
     * @param  string $job
172
     * @param  mixed $data
173
     * @param  string $queue
174
     * @param  string $connection
175
     *
176
     * @return mixed
177
     */
178
    public static function laterUnique($delay, $job, $data = '', $queue = null, $connection = null)
179
    {
180
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
181
            return Manager::later($delay, $job, $data, $queue, $connection);
182
        }
183
184
        return null;
185
    }
186
}
187