Completed
Pull Request — master (#43)
by Harry
02:11
created

FirehoseAdapter::reject()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 8
c 0
b 0
f 0
ccs 0
cts 5
cp 0
rs 9.4285
cc 1
eloc 5
nc 1
nop 1
crap 2
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Firehose\FirehoseClient;
19
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
20
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS Kinesis Firehose Adapter.
26
 *
27
 * This method only supports the enqueue method to send messages to a Kinesiss
28
 * Firehose stream
29
 *
30
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch
31
 */
32
final class FirehoseAdapter implements AdapterInterface
33
{
34
    const BATCHSIZE_SEND = 100;
35
36
    /** @var FirehoseClient */
37
    protected $client;
38
39
    /** @var array */
40
    protected $options;
41
42
    /** @var string */
43
    protected $deliveryStreamName;
44
45
    /**
46
     * @param FirehoseClient $client
47
     * @param string         $deliveryStreamName
48
     * @param array          $options - BatchSize <integer> The number of messages to send in each batch.
49
     */
50 8
    public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
51
    {
52 8
        $this->client = $client;
53 8
        $this->deliveryStreamName = $deliveryStreamName;
54 8
        $this->options = $options;
55 8
    }
56
57
    /**
58
     * @param MessageInterface[] $messages
59
     *
60
     * @throws MethodNotSupportedException
61
     */
62 1
    public function acknowledge(array $messages)
63
    {
64 1
        throw new MethodNotSupportedException(
65 1
            __FUNCTION__,
66 1
            $this,
67 1
            $messages
68
        );
69
    }
70
71
    /**
72
     * @param MessageInterface[] $messages
73
     */
74
    public function reject(array $messages)
75
    {
76
        throw new MethodNotSupportedException(
77
            __FUNCTION__,
78
            $this,
79
            $messages
80
        );
81
    }
82
83
    /**
84
     * @param MessageFactoryInterface $factory
85
     * @param int                     $limit
86
     *
87
     * @throws MethodNotSupportedException
88
     */
89 1
    public function dequeue(MessageFactoryInterface $factory, $limit)
90
    {
91 1
        throw new MethodNotSupportedException(
92 1
            __FUNCTION__,
93 1
            $this,
94 1
            []
95
        );
96
    }
97
98
    /**
99
     * @param MessageInterface[] $messages
100
     *
101
     * @throws FailedEnqueueException
102
     */
103 3
    public function enqueue(array $messages)
104
    {
105 3
        $failed = [];
106 3
        $batches = array_chunk(
107 3
            $messages,
108 3
            $this->getOption('BatchSize', self::BATCHSIZE_SEND)
109
        );
110
111 3
        foreach ($batches as $batch) {
112 3
            $requestRecords = array_map(
113 3
                function (MessageInterface $message) {
114
                    return [
115 3
                        'Data' => $message->getBody(),
116
                    ];
117 3
                },
118 3
                $batch
119
            );
120
121
            $request = [
122 3
                'DeliveryStreamName' => $this->deliveryStreamName,
123 3
                'Records'            => $requestRecords,
124
            ];
125
126 3
            $results = $this->client->putRecordBatch($request);
127
128 3
            foreach ($results->get('RequestResponses') as $idx => $response) {
129 1
                if (isset($response['ErrorCode'])) {
130 3
                    $failed[] = $batch[$idx];
131
                }
132
            }
133
        }
134
135 3
        if (!empty($failed)) {
136 1
            throw new FailedEnqueueException($this, $failed);
137
        }
138 2
    }
139
140
    /**
141
     * @param string $name
142
     * @param mixed  $default
143
     *
144
     * @return mixed
145
     */
146 3
    protected function getOption($name, $default = null)
147
    {
148 3
        return isset($this->options[$name]) ? $this->options[$name] : $default;
149
    }
150
151
    /**
152
     * @throws MethodNotSupportedException
153
     */
154 1
    public function purge()
155
    {
156 1
        throw new MethodNotSupportedException(
157 1
            __FUNCTION__,
158 1
            $this,
159 1
            []
160
        );
161
    }
162
163
    /**
164
     * @throws MethodNotSupportedException
165
     */
166 1
    public function delete()
167
    {
168 1
        throw new MethodNotSupportedException(
169 1
            __FUNCTION__,
170 1
            $this,
171 1
            []
172
        );
173
    }
174
}
175