BatchAcknowledgementHandler::acknowledge()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 3
dl 0
loc 9
ccs 4
cts 4
cp 1
crap 2
rs 9.6666
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Handler;
17
18
use Graze\Queue\Adapter\AdapterInterface;
19
use Graze\Queue\Message\MessageInterface;
20
21
class BatchAcknowledgementHandler extends AbstractAcknowledgementHandler
22
{
23
    /** @var int */
24
    protected $batchSize;
25
26
    /** @var MessageInterface[] */
27
    protected $acknowledged = [];
28
29
    /** @var MessageInterface[] */
30
    protected $rejected = [];
31
32
    /** @var MessageInterface[][] */
33
    protected $delayed = [];
34
35
    /**
36
     * @param int $batchSize
37
     */
38 19
    public function __construct($batchSize = 0)
39
    {
40 19
        $this->batchSize = (integer) $batchSize;
41 19
    }
42
43
    /**
44
     * @param MessageInterface $message
45
     * @param AdapterInterface $adapter
46
     * @param mixed            $result
47
     */
48 9
    protected function acknowledge(
49
        MessageInterface $message,
50
        AdapterInterface $adapter,
51
        $result = null
52
    ) {
53 9
        $this->acknowledged[] = $message;
54
55 9
        if (count($this->acknowledged) === $this->batchSize) {
56 1
            $this->flush($adapter);
57 1
        }
58 9
    }
59
60
    /**
61
     * @param MessageInterface $message
62
     * @param AdapterInterface $adapter
63
     * @param int              $duration
64
     */
65
    protected function extend(
66
        MessageInterface $message,
67
        AdapterInterface $adapter,
68
        $duration
69
    ) {
70
        $this->delayed[$duration][] = $message;
71
72
        if (count($this->delayed[$duration]) === $this->batchSize) {
73
            $this->flush($adapter);
74
        }
75
    }
76
77
    /**
78
     * @param MessageInterface $message
79
     * @param AdapterInterface $adapter
80
     * @param mixed            $result
81
     */
82
    protected function reject(
83
        MessageInterface $message,
84
        AdapterInterface $adapter,
85
        $result = null
86
    ) {
87
        $this->rejected[] = $message;
88
89
        if (count($this->rejected) === $this->batchSize) {
90
            $this->flush($adapter);
91
        }
92
    }
93
94
    /**
95
     * @param AdapterInterface $adapter
96
     */
97 14
    protected function flush(AdapterInterface $adapter)
98
    {
99 14
        if (!empty($this->acknowledged)) {
100 9
            $adapter->acknowledge($this->acknowledged);
101
102 9
            $this->acknowledged = [];
103 9
        }
104 14
        if (!empty($this->rejected)) {
105
            $adapter->acknowledge($this->rejected);
106
107
            $this->rejected = [];
108
        }
109 14
        if (!empty($this->delayed)) {
110
            foreach ($this->delayed as $duration => $messages) {
111
                $adapter->extend($messages, $duration);
112
            }
113
114
            $this->delayed = [];
115
        }
116 14
    }
117
}
118