Completed
Push — master ( ed97c6...a00967 )
by Vladimir
06:23
created

BeanstalkdAdapter::connect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 0
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace FondBot\Queue\Adapters;
6
7
use FondBot\Queue\Job;
8
use FondBot\Queue\Adapter;
9
use Pheanstalk\Pheanstalk;
10
use FondBot\Drivers\Driver;
11
use FondBot\Drivers\Command;
12
use FondBot\Channels\Channel;
13
use Pheanstalk\Job as PheanstalkJob;
14
use FondBot\Queue\SerializableForQueue;
15
16
class BeanstalkdAdapter extends Adapter
17
{
18
    /** @var Pheanstalk */
19
    private $connection;
20
21
    private $queue;
22
23 4
    public function __construct(Pheanstalk $connection, string $queue = 'default')
24
    {
25 4
        $this->connection = $connection;
26 4
        $this->queue = $queue;
27 4
    }
28
29
    /**
30
     * Pull next job from the queue.
31
     *
32
     * @return Job|SerializableForQueue
33
     */
34 2
    public function next(): ?Job
35
    {
36 2
        $pheanstalkJob = $this->connection->watch($this->queue)->reserve();
37
38 2
        if ($pheanstalkJob instanceof PheanstalkJob) {
39 1
            $job = $this->unserialize($pheanstalkJob->getData());
40
41 1
            $this->connection->delete($pheanstalkJob);
42
43 1
            return $job;
44
        }
45
46 1
        return null;
47
    }
48
49
    /**
50
     * Push command onto the queue.
51
     *
52
     * @param Channel $channel
53
     * @param Driver  $driver
54
     * @param Command $command
55
     */
56 1
    public function push(Channel $channel, Driver $driver, Command $command): void
57
    {
58 1
        $job = new Job($channel, $driver, $command);
59 1
        $this->connection->putInTube($this->queue, $this->serialize($job));
60 1
    }
61
62
    /**
63
     * Push command onto the queue with a delay.
64
     *
65
     * @param Channel $channel
66
     * @param Driver  $driver
67
     * @param Command $command
68
     * @param int     $delay
69
     *
70
     * @return mixed|void
71
     */
72 1
    public function later(Channel $channel, Driver $driver, Command $command, int $delay): void
73
    {
74 1
        $job = new Job($channel, $driver, $command);
75 1
        $this->connection->putInTube($this->queue, $this->serialize($job), Pheanstalk::DEFAULT_PRIORITY, $delay);
76 1
    }
77
}
78