SqsExtendedClient::embedS3PointerInReceiptHandle()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 8
rs 9.4285
ccs 6
cts 6
cp 1
nc 1
cc 1
eloc 5
nop 3
crap 1
1
<?php
2
3
namespace Abacaphiliac\AwsSdk\ClaimCheck\Sqs;
4
5
use Abacaphiliac\AwsSdk\ClaimCheck\ClaimCheck;
6
use Abacaphiliac\AwsSdk\ClaimCheck\Exception\ExceptionInterface;
7
use Abacaphiliac\AwsSdk\ClaimCheck\Exception\RuntimeException;
8
use Aws\Result;
9
use Aws\Sqs\SqsClient;
10
use GuzzleHttp\Promise\Promise;
11
use Psr\Http\Message\StreamInterface;
12
13
class SqsExtendedClient extends SqsClient
14
{
15
    /** @var  SqsClient */
16
    private $sqsClient;
17
    
18
    /** @var  SqsExtendedClientConfiguration */
19
    private $configuration;
20
21
    /**
22
     * SqsExtendedClient constructor.
23
     * @param SqsClient $sqsClient
24
     * @param SqsExtendedClientConfiguration $configuration
25
     */
26 5
    public function __construct(SqsClient $sqsClient, SqsExtendedClientConfiguration $configuration)
27
    {
28 5
        $this->sqsClient = $sqsClient;
29 5
        $this->configuration = $configuration;
30 5
    }
31
32
    /**
33
     * @param string $name
34
     * @param mixed[] $args
35
     * @return Result|Promise
36
     * @throws ExceptionInterface
37
     */
38 5
    public function __call($name, array $args)
39
    {
40 5
        $params = array_key_exists(0, $args) ? $args[0] : [];
41
        
42
        switch ($name) {
43 5
            case 'sendMessage':
44 1
                $result = $this->sendSqsMessage($params);
45 1
                break;
46 4
            case 'receiveMessage':
47 3
                $result = $this->receiveSqsMessage($params);
48 3
                break;
49 1
            case 'deleteMessage':
50 1
                $result = $this->deleteSqsMessage($params);
51 1
                break;
52
            default:
53
                $result = $this->sqsClient->{$name}($params);
54
                break;
55
        }
56
        
57 5
        return $result;
58
    }
59
60
    /**
61
     * @param mixed[] $args
62
     * @return Result
63
     * @throws ExceptionInterface
64
     */
65 1 View Code Duplication
    private function sendSqsMessage(array $args = [])
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
66
    {
67 1
        $claimCheckSerializer = $this->configuration->getClaimCheckSerializer();
68
69 1
        $message = array_key_exists('MessageBody', $args) ? $args['MessageBody'] : '';
70
71 1
        $claimCheck = $this->storeMessageInS3($message);
72
73 1
        $args['MessageBody'] = $claimCheckSerializer->serialize($claimCheck);
74
75 1
        return $this->sqsClient->sendMessage($args);
76
    }
77
78
    /**
79
     * @param mixed[] $args
80
     * @return Result
81
     * @throws ExceptionInterface
82
     */
83 3
    private function receiveSqsMessage(array $args = [])
84
    {
85 3
        $result = $this->sqsClient->receiveMessage($args);
86
87 3
        $messages = array();
88
89 3
        foreach ($result->search('Messages[]') as $i => $message) {
90 3
            $messages[$i] = $this->decodeSqsMessage($message);
91 3
        }
92
93 3
        $result->offsetSet('Messages', $messages);
94
        
95 3
        return $result;
96
    }
97
98
    /**
99
     * @param mixed[] $args
100
     * @return Result
101
     * @throws ExceptionInterface
102
     */
103 1
    private function deleteSqsMessage(array $args = [])
104
    {
105 1
        if (array_key_exists('ReceiptHandle', $args) && $this->configuration->getDeleteFromS3()) {
106
            // Split receipt handle into S3 and SQS information.
107 1
            $decodedReceiptHandle = json_decode($args['ReceiptHandle'], true);
108 1
            if (json_last_error() === JSON_ERROR_NONE) {
109 1
                $s3Client = $this->configuration->getS3Client();
110
                
111
                // Delete from S3.
112 1
                $s3Client->deleteObject(array(
113 1
                    'Bucket' => $decodedReceiptHandle['s3_bucket_name'],
114 1
                    'Key' => $decodedReceiptHandle['s3_key'],
115 1
                ));
116
                
117
                // Adjust SQS args.
118 1
                $args['ReceiptHandle'] = $decodedReceiptHandle['original_receipt_handle'];
119 1
            }
120 1
        }
121
        
122 1
        return $this->sqsClient->deleteMessage($args);
123
    }
124
125
    /**
126
     * @param mixed[] $message
127
     * @return string|bool
128
     * @throws ExceptionInterface
129
     */
130 3
    private function decodeSqsMessage(array $message)
131
    {
132 3
        if (!array_key_exists('Body', $message)) {
133
            // Unknown message body. Skip processing.
134
            return $message;
135
        }
136
137
        try {
138 3
            $claimCheck = $this->configuration->getClaimCheckSerializer()->unserialize($message['Body']);
139 3
        } catch (ExceptionInterface $e) {
140
            // Unknown message body. Skip processing.
141 1
            return $message;
142
        }
143
        
144 2
        if (!$claimCheck instanceof ClaimCheck) {
145
            // Unknown message body. Skip processing.
146
            return $message;
147
        }
148
149
        try {
150 2
            $message['Body'] = $this->fetchClaimCheckFromS3($claimCheck);
151 2
        } catch (ExceptionInterface $e) {
152
            // Unknown message body. Skip processing.
153
            return $message;
154
        }
155
        
156 2
        if (array_key_exists('ReceiptHandle', $message) && $this->configuration->getDeleteFromS3()) {
157
            // Prepend S3 information to receipt handle.
158 1
            $message['ReceiptHandle'] = $this->embedS3PointerInReceiptHandle(
159 1
                $message['ReceiptHandle'],
160 1
                $claimCheck->getS3BucketName(),
161 1
                $claimCheck->getS3Key()
162 1
            );
163 1
        }
164
        
165 2
        return $message;
166
    }
167
168
    /**
169
     * @param string $message
170
     * @return ClaimCheck
171
     */
172 1 View Code Duplication
    private function storeMessageInS3($message)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
173
    {
174 1
        $s3Client = $this->configuration->getS3Client();
175 1
        $s3BucketName = $this->configuration->getS3BucketName();
176 1
        $claimCheckFactory = $this->configuration->getClaimCheckFactory();
177
        
178 1
        $claimCheck = $claimCheckFactory->create();
179
180 1
        $s3Client->putObject([
181 1
            'Bucket' => $s3BucketName,
182 1
            'Key' => $claimCheck->getS3Key(),
183 1
            'Body' => $message,
184 1
        ]);
185
        
186 1
        return $claimCheck;
187
    }
188
189
    /**
190
     * @param ClaimCheck $claimCheck
191
     * @return string
192
     * @throws ExceptionInterface
193
     */
194 2
    private function fetchClaimCheckFromS3(ClaimCheck $claimCheck)
195
    {
196 2
        $s3Client = $this->configuration->getS3Client();
197
198 2
        $result = $s3Client->getObject([
199 2
            'Bucket' => $claimCheck->getS3BucketName(),
200 2
            'Key' => $claimCheck->getS3Key(),
201 2
        ]);
202
203 2
        $body = $result->get('Body');
204
205
        // Unpack the message.
206 2
        if ($body instanceof StreamInterface) {
207
            try {
208
                return $body->getContents();
209
            } catch (\RuntimeException $e) {
210
                throw new RuntimeException($e->getMessage(), 0, $e);
211
            }
212
        }
213
214 2
        return $body;
215
    }
216
217
    /**
218
     * @param string $receiptHandle
219
     * @param string $s3MsgBucketName
220
     * @param string $s3MsgKey
221
     * @return string
222
     */
223 1
    private function embedS3PointerInReceiptHandle($receiptHandle, $s3MsgBucketName, $s3MsgKey)
224
    {
225 1
        return json_encode(array(
226 1
            'original_receipt_handle' => $receiptHandle,
227 1
            's3_bucket_name' => $s3MsgBucketName,
228 1
            's3_key' => $s3MsgKey,
229 1
        ));
230
    }
231
}
232