FirehoseAdapter   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 154
Duplicated Lines 0 %

Test Coverage

Coverage 84.62%

Importance

Changes 0
Metric Value
dl 0
loc 154
ccs 55
cts 65
cp 0.8462
rs 10
c 0
b 0
f 0
wmc 14

9 Methods

Rating   Name   Duplication   Size   Complexity  
A acknowledge() 0 6 1
A __construct() 0 5 1
A delete() 0 6 1
A purge() 0 6 1
A dequeue() 0 6 1
A getOption() 0 3 2
A extend() 0 6 1
A reject() 0 6 1
B enqueue() 0 34 5
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Firehose\FirehoseClient;
19
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
20
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS Kinesis Firehose Adapter.
26
 *
27
 * This method only supports the enqueue method to send messages to a Kinesiss
28
 * Firehose stream
29
 *
30
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch
31
 */
32
final class FirehoseAdapter implements AdapterInterface
33
{
34
    const BATCHSIZE_SEND = 100;
35
36
    /** @var FirehoseClient */
37
    protected $client;
38
39
    /** @var array */
40
    protected $options;
41
42
    /** @var string */
43
    protected $deliveryStreamName;
44
45
    /**
46
     * @param FirehoseClient $client
47
     * @param string         $deliveryStreamName
48
     * @param array          $options - BatchSize <integer> The number of messages to send in each batch.
49
     */
50 8
    public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
51
    {
52 8
        $this->client = $client;
53 8
        $this->deliveryStreamName = $deliveryStreamName;
54 8
        $this->options = $options;
55 8
    }
56
57
    /**
58
     * @param MessageInterface[] $messages
59
     *
60
     * @throws MethodNotSupportedException
61
     */
62 1
    public function acknowledge(array $messages)
63
    {
64 1
        throw new MethodNotSupportedException(
65 1
            __FUNCTION__,
66 1
            $this,
67
            $messages
68 1
        );
69
    }
70
71
    /**
72
     * @param MessageInterface[] $messages
73
     * @param int                $duration Number of seconds to ensure that this message stays being processed and not
74
     *                                     put back on the queue
75
     */
76
    public function extend(array $messages, $duration)
77
    {
78
        throw new MethodNotSupportedException(
79
            __FUNCTION__,
80
            $this,
81
            $messages
82
        );
83
    }
84
85
    /**
86
     * @param MessageInterface[] $messages
87
     */
88
    public function reject(array $messages)
89
    {
90
        throw new MethodNotSupportedException(
91
            __FUNCTION__,
92
            $this,
93
            $messages
94
        );
95
    }
96
97
    /**
98
     * @param MessageFactoryInterface $factory
99
     * @param int                     $limit
100
     *
101
     * @throws MethodNotSupportedException
102
     */
103 1
    public function dequeue(MessageFactoryInterface $factory, $limit)
104
    {
105 1
        throw new MethodNotSupportedException(
106 1
            __FUNCTION__,
107 1
            $this,
108 1
            []
109 1
        );
110
    }
111
112
    /**
113
     * @param MessageInterface[] $messages
114
     *
115
     * @throws FailedEnqueueException
116
     */
117 3
    public function enqueue(array $messages)
118
    {
119 3
        $failed = [];
120 3
        $batches = array_chunk(
121 3
            $messages,
122 3
            $this->getOption('BatchSize', self::BATCHSIZE_SEND)
123 3
        );
124
125 3
        foreach ($batches as $batch) {
126 3
            $requestRecords = array_map(
127 3
                function (MessageInterface $message) {
128
                    return [
129 3
                        'Data' => $message->getBody(),
130 3
                    ];
131 3
                },
132
                $batch
133 3
            );
134
135
            $request = [
136 3
                'DeliveryStreamName' => $this->deliveryStreamName,
137 3
                'Records'            => $requestRecords,
138 3
            ];
139
140 3
            $results = $this->client->putRecordBatch($request);
141
142 3
            foreach ($results->get('RequestResponses') as $idx => $response) {
143 1
                if (isset($response['ErrorCode'])) {
144 1
                    $failed[] = $batch[$idx];
145 1
                }
146 3
            }
147 3
        }
148
149 3
        if (!empty($failed)) {
150 1
            throw new FailedEnqueueException($this, $failed);
151
        }
152 2
    }
153
154
    /**
155
     * @param string $name
156
     * @param mixed  $default
157
     *
158
     * @return mixed
159
     */
160 3
    protected function getOption($name, $default = null)
161
    {
162 3
        return isset($this->options[$name]) ? $this->options[$name] : $default;
163
    }
164
165
    /**
166
     * @throws MethodNotSupportedException
167
     */
168 1
    public function purge()
169
    {
170 1
        throw new MethodNotSupportedException(
171 1
            __FUNCTION__,
172 1
            $this,
173 1
            []
174 1
        );
175
    }
176
177
    /**
178
     * @throws MethodNotSupportedException
179
     */
180 1
    public function delete()
181
    {
182 1
        throw new MethodNotSupportedException(
183 1
            __FUNCTION__,
184 1
            $this,
185 1
            []
186 1
        );
187
    }
188
}
189