Completed
Push — master ( c1b2ad...3084e3 )
by Harry
02:22
created

FirehoseAdapter::getOption()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 2
crap 2
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Firehose\FirehoseClient;
19
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS Kinesis Firehose Adapter.
26
 *
27
 * This method only supports the enqueue method to send messages to a Kinesiss
28
 * Firehose stream
29
 *
30
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Firehose.FirehoseClient.html#putRecordBatch
31
 */
32
final class FirehoseAdapter implements AdapterInterface
33
{
34
    const BATCHSIZE_SEND    = 100;
35
36
    /** @var FirehoseClient */
37
    protected $client;
38
39
    /** @var array */
40
    protected $options;
41
42
    /** @var string */
43
    protected $deliveryStreamName;
44
45
    /**
46
     * @param FirehoseClient $client
47
     * @param string    $deliveryStreamName
48
     * @param array     $options - BatchSize <integer> The number of messages to send in each batch.
49
     */
50 8
    public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
51
    {
52 8
        $this->client = $client;
53 8
        $this->deliveryStreamName = $deliveryStreamName;
54 8
        $this->options = $options;
55 8
    }
56
57
    /**
58
     * @param MessageInterface[] $messages
59
     *
60
     * @throws MethodNotSupportedException
61
     */
62 1
    public function acknowledge(array $messages)
63
    {
64 1
        throw new MethodNotSupportedException(
65 1
            __FUNCTION__,
66 1
            $this,
67
            $messages
68 1
        );
69
    }
70
71
    /**
72
     * @param MessageFactoryInterface $factory
73
     * @param int                     $limit
74
     *
75
     * @throws MethodNotSupportedException
76
     */
77 1
    public function dequeue(MessageFactoryInterface $factory, $limit)
78
    {
79 1
        throw new MethodNotSupportedException(
80 1
            __FUNCTION__,
81 1
            $this,
82 1
            []
83 1
        );
84
    }
85
86
    /**
87
     * @param MessageInterface[] $messages
88
     *
89
     * @throws FailedEnqueueException
90
     */
91 3
    public function enqueue(array $messages)
92
    {
93 3
        $failed = [];
94 3
        $batches = array_chunk(
95 3
            $messages,
96 3
            $this->getOption('BatchSize', self::BATCHSIZE_SEND)
97 3
        );
98
99 3
        foreach ($batches as $batch) {
100 3
            $requestRecords = array_map(function (MessageInterface $message) {
101
                return [
102 3
                    'Data' => $message->getBody()
103 3
                ];
104 3
            }, $batch);
105
106
            $request = [
107 3
                'DeliveryStreamName' => $this->deliveryStreamName,
108 3
                'Records'  => $requestRecords,
109 3
            ];
110
111 3
            $results = $this->client->putRecordBatch($request);
112
113 3
            foreach ($results->get('RequestResponses') as $idx => $response) {
114 1
                if (isset($response['ErrorCode'])) {
115 1
                    $failed[] = $batch[$idx];
116 1
                }
117 3
            }
118 3
        }
119
120 3
        if (!empty($failed)) {
121 1
            throw new FailedEnqueueException($this, $failed);
122
        }
123 2
    }
124
125
    /**
126
     * @param string $name
127
     * @param mixed  $default
128
     *
129
     * @return mixed
130
     */
131 3
    protected function getOption($name, $default = null)
132
    {
133 3
        return isset($this->options[$name]) ? $this->options[$name] : $default;
134
    }
135
136
    /**
137
     * @throws MethodNotSupportedException
138
     */
139 1
    public function purge()
140
    {
141 1
        throw new MethodNotSupportedException(
142 1
            __FUNCTION__,
143 1
            $this,
144 1
            []
145 1
        );
146
    }
147
148
    /**
149
     * @throws MethodNotSupportedException
150
     */
151 1
    public function delete()
152
    {
153 1
        throw new MethodNotSupportedException(
154 1
            __FUNCTION__,
155 1
            $this,
156 1
            []
157 1
        );
158
    }
159
}
160