Completed
Pull Request — master (#42)
by
unknown
06:09
created

FirehoseAdapter::purge()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 0
crap 1
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Firehose\FirehoseClient;
19
use Graze\Queue\Adapter\Exception\MethodNotSupportedException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS Kinesis Firehose Adapter.
26
 *
27
 * This method only supports the enqueue method to send messages to a Kinesiss
28
 * Firehose stream
29
 *
30
 */
31
final class FirehoseAdapter implements AdapterInterface
32
{
33
    const BATCHSIZE_SEND    = 100;
34
35
    /** @var FirehoseClient */
36
    protected $client;
37
38
    /** @var array */
39
    protected $options;
40
41
    /** @var string */
42
    protected $deliveryStreamName;
43
44
    /**
45
     * @param FirehoseClient $client
46
     * @param string    $deliveryStreamName
47
     * @param array     $options
48
     */
49 7
    public function __construct(FirehoseClient $client, $deliveryStreamName, array $options = [])
50
    {
51 7
        $this->client = $client;
52 7
        $this->deliveryStreamName = $deliveryStreamName;
53 7
        $this->options = $options;
54 7
    }
55
56
    /**
57
     * @param MessageInterface[] $messages
58
     *
59
     * @throws MethodNotSupportedException
60
     */
61 1
    public function acknowledge(array $messages)
62
    {
63 1
        throw new MethodNotSupportedException(
64 1
            'acknowledge',
65 1
            $this,
66
            $messages
67 1
        );
68
    }
69
70
    /**
71
     * @param MessageFactoryInterface $factory
72
     * @param int                     $limit
73
     *
74
     * @throws MethodNotSupportedException
75
     */
76 1
    public function dequeue(MessageFactoryInterface $factory, $limit)
77
    {
78 1
        throw new MethodNotSupportedException(
79 1
            'dequeue',
80 1
            $this,
81 1
            []
82 1
        );
83
    }
84
85
    /**
86
     * @param MessageInterface[] $messages
87
     *
88
     * @throws FailedEnqueueException
89
     */
90 2
    public function enqueue(array $messages)
91
    {
92 2
        $failed = [];
93 2
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
94
95 2
        foreach ($batches as $batch) {
96
            $requestRecords = array_map(function ($a) {
97
                return [
98 2
                    'Data' => json_encode($a)
99 2
                ];
100 2
            }, $batch);
101
102
            $request = [
103 2
                'DeliveryStreamName' => $this->deliveryStreamName,
104 2
                'Records'  => $requestRecords,
105 2
            ];
106
107 2
            $results = $this->client->putRecordBatch($request);
108
109 2
            foreach ($results->get('RequestResponses') as $idx => $response) {
110
                if (isset($response['ErrorCode'])) {
111
                    $failed[] = $messages[$batch[$idx]['Id']];
112
                }
113 2
            }
114 2
        }
115
116 2
        if (!empty($failed)) {
117
            throw new FailedEnqueueException($this, $failed);
118
        }
119 2
    }
120
121
    /**
122
     * @throws MethodNotSupportedException
123
     */
124 1
    public function purge()
125
    {
126 1
        throw new MethodNotSupportedException(
127 1
            'purge',
128 1
            $this,
129 1
            []
130 1
        );
131
    }
132
133
    /**
134
     * @throws MethodNotSupportedException
135
     */
136 1
    public function delete()
137
    {
138 1
        throw new MethodNotSupportedException(
139 1
            'delete',
140 1
            $this,
141 1
            []
142 1
        );
143
    }
144
145
    /**
146
     * @param MessageInterface[] $messages
147
     *
148
     * @return array
149
     */
150
    protected function createEnqueueEntries(array $messages)
151
    {
152 2
        array_walk($messages, function (MessageInterface &$message, $id) {
153 2
            $metadata = $message->getMetadata();
154
            $message = [
155 2
                'Id'                => $id,
156 2
                'MessageBody'       => $message->getBody(),
157 2
                'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
158 2
            ];
159 2
        });
160
161 2
        return $messages;
162
    }
163
}
164