Completed
Push — master ( 71cc85...b00a6a )
by Alexey
37:06
created

JobQueue::addConnector()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 2
1
<?php
2
3
namespace SfCod\QueueBundle\Service;
4
5
use Illuminate\Queue\Capsule\Manager;
6
use Illuminate\Queue\Connectors\ConnectorInterface;
7
use Illuminate\Queue\QueueManager;
8
use SfCod\QueueBundle\Base\JobQueueInterface;
9
use SfCod\QueueBundle\Connector\MongoConnector;
10
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
11
use Symfony\Component\DependencyInjection\ContainerInterface;
12
13
/**
14
 * Service for illuminate queues to work with mongodb
15
 *
16
 * @author Virchenko Maksim <[email protected]>
17
 * @author Orlov Alexey <[email protected]>
18
 */
19
class JobQueue implements JobQueueInterface
20
{
21
    use ContainerAwareTrait;
22
23
    /**
24
     * Available connections
25
     *
26
     * @var array
27
     */
28
    protected $connections = [
29
        'default' => [
30
            'driver' => 'mongo-thread',
31
            'collection' => 'queue_jobs',
32
            'queue' => 'default',
33
            'expire' => 60,
34
            'limit' => 2,
35
            'connection' => MongoDriverInterface::class, // Default mongo connection
36
        ],
37
    ];
38
39
    /**
40
     * Manager instance
41
     *
42
     * @var Manager
43
     */
44
    protected $manager;
45
46
    /**
47
     * JobQueue constructor.
48
     *
49
     * @param ContainerInterface $container
50
     * @param array $connections
51
     *
52
     * @internal param array $config
53
     */
54
    public function __construct(ContainerInterface $container, array $connections = [])
55
    {
56
        $this->connections = array_merge($this->connections, $connections);
57
        $this->container = $container;
58
59
        $this->connect();
60
    }
61
62
    /**
63
     * Get queue manager instance
64
     *
65
     * @return QueueManager
66
     */
67
    public function getQueueManager(): QueueManager
68
    {
69
        return $this->manager->getQueueManager();
70
    }
71
72
    /**
73
     * Push new job to queue
74
     *
75
     * @author Virchenko Maksim <[email protected]>
76
     *
77
     * @param string $job
78
     * @param array $data
79
     * @param string $queue
80
     * @param string $connection
81
     *
82
     * @return mixed
83
     */
84
    public function push(string $job, array $data = [], string $queue = 'default', string $connection = 'default')
85
    {
86
        return Manager::push($job, $data, $queue, $connection);
87
    }
88
89
    /**
90
     * Push new job to queue if this job is not exist
91
     *
92
     * @author Virchenko Maksim <[email protected]>
93
     *
94
     * @param string $job
95
     * @param array $data
96
     * @param string $queue
97
     * @param string $connection
98
     *
99
     * @return mixed
100
     */
101
    public function pushUnique(string $job, array $data = [], string $queue = 'default', string $connection = 'default')
102
    {
103
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
0 ignored issues
show
Bug introduced by
The method exists() does not seem to exist on object<Illuminate\Contracts\Queue\Queue>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
104
            return Manager::push($job, $data, $queue, $connection);
105
        }
106
107
        return null;
108
    }
109
110
    /**
111
     * Push a new an array of jobs onto the queue.
112
     *
113
     * @param array $jobs
114
     * @param mixed $data
115
     * @param string $queue
116
     * @param string $connection
117
     *
118
     * @return mixed
119
     */
120
    public function bulk(array $jobs, array $data = [], string $queue = 'default', string $connection = 'default')
121
    {
122
        return Manager::bulk($jobs, $data, $queue, $connection);
123
    }
124
125
    /**
126
     * Push a new job onto the queue after a delay.
127
     *
128
     * @param \DateTime|int $delay
129
     * @param string $job
130
     * @param mixed $data
131
     * @param string $queue
132
     * @param string $connection
133
     *
134
     * @return mixed
135
     */
136
    public function later(int $delay, string $job, array $data = [], string $queue = 'default', string $connection = 'default')
137
    {
138
        return Manager::later($delay, $job, $data, $queue, $connection);
139
    }
140
141
    /**
142
     * Push a new job into the queue after a delay if job does not exist.
143
     *
144
     * @param \DateTime|int $delay
145
     * @param string $job
146
     * @param mixed $data
147
     * @param string $queue
148
     * @param string $connection
149
     *
150
     * @return mixed
151
     */
152
    public function laterUnique(int $delay, string $job, array $data = [], string $queue = 'default', string $connection = 'default')
153
    {
154
        if (false === Manager::connection($connection)->exists($job, $data, $queue)) {
0 ignored issues
show
Bug introduced by
The method exists() does not seem to exist on object<Illuminate\Contracts\Queue\Queue>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
155
            return Manager::later($delay, $job, $data, $queue, $connection);
156
        }
157
158
        return null;
159
    }
160
161
    /**
162
     * Add connector
163
     *
164
     * @param string $name
165
     * @param Closure $resolver
0 ignored issues
show
Bug introduced by
There is no parameter named $resolver. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
166
     */
167
    public function addConnector(string $name, ConnectorInterface $connector)
168
    {
169
        $this->manager->addConnector($name, function () use ($connector) {
170
            return $connector;
171
        });
172
    }
173
174
    /**
175
     * Connect queue manager for mongo database
176
     *
177
     * @author Virchenko Maksim <[email protected]>
178
     *
179
     * @return Manager
180
     */
181
    protected function connect()
182
    {
183
        if (is_null($this->manager)) {
184
            $this->manager = new Manager();
185
186
            $this->addConnector('mongo-thread', new MongoConnector($this->container));
187
188
            foreach ($this->connections as $name => $params) {
189
                $this->manager->addConnection($params, $name);
190
            }
191
192
            $this->manager->setAsGlobal();
193
        }
194
195
        return $this->manager;
196
    }
197
}
198