Passed
Push — master ( d469f2...2c948a )
by Harry
02:12
created

BatchAcknowledgementHandler::acknowledge()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 3
dl 0
loc 9
ccs 4
cts 4
cp 1
crap 2
rs 9.6666
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Handler;
17
18
use Graze\Queue\Adapter\AdapterInterface;
19
use Graze\Queue\Message\MessageInterface;
20
21
class BatchAcknowledgementHandler extends AbstractAcknowledgementHandler
22
{
23
    /** @var int */
24
    protected $batchSize;
25
26
    /** @var MessageInterface[] */
27
    protected $acknowledged = [];
28
29
    /** @var MessageInterface[] */
30
    protected $rejected = [];
31
32
    /**
33
     * @param int $batchSize
34
     */
35 19
    public function __construct($batchSize = 0)
36
    {
37 19
        $this->batchSize = (integer) $batchSize;
38 19
    }
39
40
    /**
41
     * @param MessageInterface $message
42
     * @param AdapterInterface $adapter
43
     * @param mixed            $result
44
     */
45 9
    protected function acknowledge(
46
        MessageInterface $message,
47
        AdapterInterface $adapter,
48
        $result = null
49
    ) {
50 9
        $this->acknowledged[] = $message;
51
52 9
        if (count($this->acknowledged) === $this->batchSize) {
53 1
            $this->flush($adapter);
54
        }
55 9
    }
56
57
    /**
58
     * @param MessageInterface $message
59
     * @param AdapterInterface $adapter
60
     * @param mixed            $result
61
     */
62
    protected function reject(
63
        MessageInterface $message,
64
        AdapterInterface $adapter,
65
        $result = null
66
    ) {
67
        $this->rejected[] = $message;
68
69
        if (count($this->rejected) === $this->batchSize) {
70
            $this->flush($adapter);
71
        }
72
    }
73
74
    /**
75
     * @param AdapterInterface $adapter
76
     */
77 14
    protected function flush(AdapterInterface $adapter)
78
    {
79 14
        if (!empty($this->acknowledged)) {
80 9
            $adapter->acknowledge($this->acknowledged);
81
82 9
            $this->acknowledged = [];
83
        }
84 14
        if (!empty($this->rejected)) {
85
            $adapter->acknowledge($this->rejected);
86
87
            $this->rejected = [];
88
        }
89 14
    }
90
}
91