Completed
Push — master ( 0c4600...eadd90 )
by Eugene
07:09
created

DefaultConfigFactory::getConnectionUri()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
3
namespace Tarantool\JobQueue;
4
5
use Monolog\Handler\StreamHandler;
6
use Monolog\Logger as MonologLogger;
7
use Psr\Log\LoggerInterface as Logger;
8
use Psr\Log\NullLogger;
9
use Tarantool\Client\Client;
10
use Tarantool\Client\Connection\Retryable;
11
use Tarantool\Client\Connection\StreamConnection;
12
use Tarantool\Client\Packer\PurePacker;
13
use Tarantool\JobQueue\Handler\AckHandler;
14
use Tarantool\JobQueue\Handler\BuryHandler;
15
use Tarantool\JobQueue\Handler\Handler;
16
use Tarantool\JobQueue\Handler\RecurrenceHandler;
17
use Tarantool\JobQueue\Handler\RetryHandler;
18
use Tarantool\JobQueue\Handler\RetryStrategy\RetryStrategyFactory;
19
use Tarantool\JobQueue\Runner\Amp\ParallelRunner;
20
use Tarantool\JobQueue\Runner\Runner;
21
use Tarantool\Queue\Queue;
22
23
class DefaultConfigFactory
24
{
25
    private $queueName;
26
    private $connectionUri;
27
    private $connectionOptions;
28
    private $username;
29
    private $password;
30
    private $logFile;
31
    private $logLevel = MonologLogger::DEBUG;
32
    private $executorsConfigFile;
33
34
    public function setQueueName(string $name): self
35
    {
36
        $this->queueName = $name;
37
38
        return $this;
39
    }
40
41
    public function setConnectionUri(string $uri): self
42
    {
43
        $this->connectionUri = $uri;
44
45
        return $this;
46
    }
47
48
    public function getConnectionUri(): ?string
49
    {
50
        return $this->connectionUri;
51
    }
52
53
    public function setConnectionOptions(array $options): self
54
    {
55
        $this->connectionOptions = $options;
56
57
        return $this;
58
    }
59
60
    public function setCredentials(string $username, string $password): self
61
    {
62
        $this->username = $username;
63
        $this->password = $password;
64
65
        return $this;
66
    }
67
68
    public function setLogFile(string $logFile): self
69
    {
70
        $this->logFile = $logFile;
71
72
        return $this;
73
    }
74
75
    public function setLogLevel($logLevel): self
76
    {
77
        $this->logLevel = self::normalizeLogLevel($logLevel);
78
79
        return $this;
80
    }
81
82
    public function setExecutorsConfigFile(string $configFile): self
83
    {
84
        $this->executorsConfigFile = $configFile;
85
86
        return $this;
87
    }
88
89
    public function createRunner(): Runner
90
    {
91
        return new ParallelRunner(
92
            $this->createQueue(),
93
            $this->createSuccessHandler(),
94
            $this->createFailureHandler(),
95
            $this->createLogger(),
96
            $this->executorsConfigFile
97
        );
98
    }
99
100
    public function createQueue(): Queue
101
    {
102
        $this->ensureQueueName();
103
104
        return new Queue($this->createClient(), $this->queueName);
105
    }
106
107
    public function createClient(): Client
108
    {
109
        if (!$this->connectionUri) {
110
            throw new \LogicException('Connection URI is not defined.');
111
        }
112
113
        $conn = new StreamConnection($this->connectionUri, $this->connectionOptions);
114
        $conn = new Retryable($conn);
115
        $client = new Client($conn, new PurePacker());
116
117
        if ($this->username) {
118
            // TODO make it lazy
119
            $client->authenticate($this->username, $this->password);
120
        }
121
122
        return $client;
123
    }
124
125
    public function createLogger(): Logger
126
    {
127
        if (!$this->logFile) {
128
            return new NullLogger();
129
        }
130
131
        $this->ensureQueueName();
132
        $handlers = [new StreamHandler($this->logFile, $this->logLevel)];
133
134
        return new MonologLogger("$this->queueName:worker", $handlers);
135
    }
136
137
    public function createSuccessHandler(): Handler
138
    {
139
        return new RecurrenceHandler(new AckHandler());
140
    }
141
142
    public function createFailureHandler(): Handler
143
    {
144
        return new RetryHandler(
145
            new BuryHandler(),
146
            $this->createRetryStrategyFactory()
147
        );
148
    }
149
150
    public function createRetryStrategyFactory(): RetryStrategyFactory
151
    {
152
        return new RetryStrategyFactory();
153
    }
154
155
    private function ensureQueueName(): void
156
    {
157
        if (!$this->queueName) {
158
            throw new \LogicException('Queue name is not defined.');
159
        }
160
    }
161
162
    private static function normalizeLogLevel($name): int
163
    {
164
        // level is already translated to logger constant, return as-is
165
        if (is_int($name)) {
166
            return $name;
167
        }
168
169
        $levels = MonologLogger::getLevels();
170
        $upper = strtoupper($name);
171
172
        if (!isset($levels[$upper])) {
173
            throw new \InvalidArgumentException("Provided logging level '$name' does not exist. Must be a valid monolog logging level.");
174
        }
175
176
        return $levels[$upper];
177
    }
178
}
179