Completed
Pull Request — master (#64)
by Frederik
03:38 queued 01:09
created

TimeLimitedQueue::fetch()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 9.9
c 0
b 0
f 0
cc 3
nc 3
nop 0
crap 3
1
<?php
2
declare(strict_types=1);
3
4
namespace Genkgo\Mail\Queue;
5
6
use Genkgo\Mail\MessageInterface;
7
use Genkgo\Mail\Transport\QueueIfFailedTransport;
8
9
final class TimeLimitedQueue implements QueueInterface
10
{
11
    /**
12
     * @var QueueInterface
13
     */
14
    private $decoratedQueue;
15
16
    /**
17
     * @var int
18
     */
19
    private $timeLimit;
20
21 4
    public function __construct(QueueInterface $queue, int $timeLimit)
22
    {
23 4
        $this->decoratedQueue = $queue;
24 4
        $this->timeLimit = $timeLimit;
25 4
    }
26
27
    /**
28
     * {@inheritdoc}
29
     */
30 1
    public function store(MessageInterface $message): void
31
    {
32 1
        $this->decoratedQueue->store($message);
33 1
    }
34
35
    /**
36
     * {@inheritdoc}
37
     */
38 3
    public function fetch(): MessageInterface
39
    {
40 3
        while ($message = $this->decoratedQueue->fetch()) {
41 3
            if ($this->hasMessageExpired($message)) {
42 1
                continue;
43
            }
44 3
            break;
45
        }
46
47 3
        return $message;
48
    }
49
50
    /**
51
     * @param MessageInterface $message
52
     * @return bool
53
     */
54 3
    private function hasMessageExpired(MessageInterface $message): bool
55
    {
56 3
        if ($this->timeLimit === 0) {
57 2
            return false;
58
        }
59 1
        if ($this->getSubmissionTimestamp($message) + $this->timeLimit > \time()) {
60 1
            return false;
61
        }
62
63 1
        return true;
64
    }
65
66
    /**
67
     * @param MessageInterface $message
68
     * @return int
69
     */
70 1
    private function getSubmissionTimestamp(MessageInterface $message): int
71
    {
72 1
        $queuedAt = $message->getHeader(QueueIfFailedTransport::QUEUED_HEADER);
73 1
        if (!\is_array($queuedAt)) {
74
            $queuedAt = \iterator_to_array($queuedAt);
75
        }
76
77 1
        $header = \reset($queuedAt);
78 1
        if ($header === false) {
79
            throw new \UnexpectedValueException('Cannot extract queue header from message');
80
        }
81
82 1
        return (int) \strtotime($header->getValue()->getRaw());
83
    }
84
}
85