Completed
Push — master ( 96e614...1e74ba )
by Vladimir
03:46
created

BeanstalkdAdapter::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nc 1
nop 5
dl 0
loc 13
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace FondBot\Queue\Adapters;
6
7
use FondBot\Queue\Job;
8
use FondBot\Queue\Adapter;
9
use Pheanstalk\Pheanstalk;
10
use FondBot\Drivers\Driver;
11
use FondBot\Drivers\Command;
12
use FondBot\Channels\Channel;
13
use Pheanstalk\Job as PheanstalkJob;
14
use FondBot\Queue\SerializableForQueue;
15
16
class BeanstalkdAdapter extends Adapter
17
{
18
    /** @var Pheanstalk */
19
    private $connection;
20
21
    private $host;
22
    private $port;
23
    private $queue;
24
    private $timeout;
25
    private $persistent;
26
27
    public function __construct(
28
        string $host,
29
        int $port = 11300,
30
        string $queue = 'default',
31
        int $timeout = null,
32
        bool $persistent = false
33
    ) {
34
        $this->host = $host;
35
        $this->port = $port;
36
        $this->queue = $queue;
37
        $this->timeout = $timeout;
38
        $this->persistent = $persistent;
39
    }
40
41
    /**
42
     * Establish connection to the queue.
43
     */
44
    public function connect(): void
45
    {
46
        $this->connection = new Pheanstalk($this->host, $this->port, $this->timeout, $this->persistent);
47
    }
48
49
    /**
50
     * Pull next job from the queue.
51
     *
52
     * @return Job|SerializableForQueue
53
     */
54
    public function next(): ?Job
55
    {
56
        $pheanstalkJob = $this->connection->watch($this->queue)->reserve();
57
58
        if ($pheanstalkJob instanceof PheanstalkJob) {
59
            $job = $this->unserialize($pheanstalkJob->getData());
60
61
            $this->connection->delete($pheanstalkJob);
62
63
            return $job;
64
        }
65
66
        return null;
67
    }
68
69
    /**
70
     * Push command onto the queue.
71
     *
72
     * @param Channel $channel
73
     * @param Driver  $driver
74
     * @param Command $command
75
     */
76
    public function push(Channel $channel, Driver $driver, Command $command): void
77
    {
78
        if ($this->connection === null) {
79
            $this->connect();
80
        }
81
82
        $job = new Job($channel, $driver, $command);
83
        $this->connection->putInTube($this->queue, $this->serialize($job));
84
    }
85
86
    /**
87
     * Push command onto the queue with a delay.
88
     *
89
     * @param Channel $channel
90
     * @param Driver  $driver
91
     * @param Command $command
92
     * @param int     $delay
93
     *
94
     * @return mixed|void
95
     */
96
    public function later(Channel $channel, Driver $driver, Command $command, int $delay): void
97
    {
98
        if ($this->connection === null) {
99
            $this->connect();
100
        }
101
102
        $job = new Job($channel, $driver, $command);
103
        $this->connection->putInTube($this->queue, $this->serialize($job), Pheanstalk::DEFAULT_PRIORITY, $delay);
104
    }
105
}
106