Completed
Push — master ( 61c7e0...bbf351 )
by Gaetano
07:00
created

Producer::setMessageDeduplicationIdCalculator()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
c 0
b 0
f 0
ccs 3
cts 3
cp 1
rs 10
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\ProducerInterface;
6
use Aws\Sqs\SqsClient;
7
use Aws\TraceMiddleware;
8
9
class Producer implements ProducerInterface
10
{
11
    /** @var  \Aws\Sqs\SqsClient */
12
    protected $client;
13
    protected $queueUrl;
14
    protected $debug = false;
15
    protected $contentType = 'text/plain';
16
    // The message attribute used to store content-type. To be kept in sync with the Consumer
17
    protected $contentTypeAttribute = 'contentType';
18
    protected $routingKeyAttribute = 'routingKey';
19
    protected $messageGroupId;
20
    /** @var MessageDeduplicationIdCalculatorInterface $messageDeduplicationIdCalculator */
21
    protected $messageDeduplicationIdCalculator;
22
23
    /**
24
     * @param array $config - minimum seems to be: 'credentials', 'region', 'version'
25
     * @see \Aws\AwsClient::__construct for the full list
26
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/configuration.html
27
     */
28 11
    public function __construct(array $config)
29
    {
30 11
        $this->client = new SqsClient($config);
31 11
    }
32
33
    /**
34
     * Enabled debug. At the moment can not disable it
35
     *
36
     * @param $debug
37
     * @return $this
38
     *
39
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
40
     */
41 11 View Code Duplication
    public function setDebug($debug)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
42
    {
43 11
        if ($debug == $this->debug) {
44 11
            return $this;
45
        }
46
        if ($debug) {
47
            $handlerList = $this->client->getHandlerList();
48
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
49
        }
50
51
        return $this;
52
    }
53
54
    /**
55
     * @param string $queueName NB: complete queue name as used by SQS
0 ignored issues
show
Bug introduced by
There is no parameter named $queueName. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
56
     * @param string $queueUrl
57
     * @return Producer
58
     * @todo test that we can successfully send messages to 2 queues using the same SqsClient
59
     */
60 11
    public function setQueueUrl($queueUrl)
61
    {
62 11
        $this->queueUrl = $queueUrl;
63
64 11
        return $this;
65
    }
66
67
    /**
68
     * @return string the complete queue name as used by SQS
69
     */
70 11
    public function getQueueUrl()
71
    {
72 11
        return $this->queueUrl;
73
    }
74
75 2
    public function setMessageGroupId($messageGroupId)
76
    {
77 2
        $this->messageGroupId = $messageGroupId;
78
79 2
        return $this;
80
    }
81
82 1
    public function setMessageDeduplicationIdCalculator(MessageDeduplicationIdCalculatorInterface $messageDeduplicationIdCalculator)
83
    {
84 1
        $this->messageDeduplicationIdCalculator = $messageDeduplicationIdCalculator;
85
86 1
        return $this;
87
    }
88
89
    /**
90
     * Publishes the message and does nothing with the properties
91
     *
92
     * @param string $msgBody
93
     * @param string $routingKey
94
     * @param array $additionalProperties see https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#sendmessage
95
     *
96
     * @todo support custom message attributes (already possible via $additionalProperties)
97
     * @todo support custom delaySeconds (already possible via $additionalProperties)
98
     * @todo support custom MessageDeduplicationId (already possible via $additionalProperties)
99
     */
100 6
    public function publish($msgBody, $routingKey = '', $additionalProperties = array())
101
    {
102 6
        $this->client->sendMessage(array_merge(
103
            array(
104 6
                'QueueUrl' => $this->queueUrl,
105 6
                'MessageBody' => $msgBody,
106
            ),
107 6
            $this->getClientParams($msgBody, $routingKey, $additionalProperties)
108
        ));
109 6
    }
110
111
    /**
112
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#sendmessagebatch
113
     * @param array[] $messages each element is an array that must contain:
114
     *                          - msgBody (string)
115
     *                          - routingKey (string, optional)
116
     *                          - additionalProperties (array, optional)
117
     */
118
    public function batchPublish(array $messages)
119
    {
120
        $j = 0;
121
        for ($i = 0; $i < count($messages); $i += 10) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
122
            $entries = array();
123
            $toSend = array_slice($messages, $i, 10);
124
            foreach($toSend as $message) {
125
                $entries[] = array_merge(
126
                    array(
127
                        'MessageBody' => $message['msgBody'],
128
                        'Id' => $j++
129
                    ),
130
                    $this->getClientParams($message['msgBody'], @$message['routingKey'], @$message['additionalProperties'])
131
                );
132
            }
133
134
            $result = $this->client->sendMessageBatch(
135
                array(
136
                    'QueueUrl' => $this->queueUrl,
137
                    'Entries' => $entries,
138
                )
139
            );
140
141
            if (($ok = count($result->get('Successful'))) != ($tot = count($toSend))) {
142
                throw new \RuntimeException("Batch sending of messages failed - $ok ok out of $tot");
143
            }
144
        }
145
    }
146
147
    /**
148
     * Allows callers to do whatever they want with the client - useful to the Queue Mgr
149
     *
150
     * @param string $method
151
     * @param array $args
152
     * @return mixed
153
     */
154 11
    public function call($method, array $args = array())
155
    {
156 11
        return $this->client->$method($args);
157
    }
158
159
    /**
160
     * Prepares the extra parameters to be injected into sendMessage calls made via the SQS Client
161
     * @param string $msgBody
162
     * @param string $routingKey
163
     * @param array $additionalProperties
164
     * @return array
165
     *
166
     * @todo shall we throw if $additionalProperties['expiration'] is set, since we don't support it ?
167
     */
168 6
    protected function getClientParams($msgBody = null, $routingKey = '', array $additionalProperties = array())
169
    {
170
        $result = array(
171
            'MessageAttributes' => array(
172 6
                $this->contentTypeAttribute => array('StringValue' => $this->contentType, 'DataType' => 'String'),
173
            )
174
        );
175
176 6
        if ($routingKey != '') {
177 3
            $result['MessageAttributes'][$this->routingKeyAttribute] = array('StringValue' => $routingKey, 'DataType' => 'String');
178
        }
179
180 6
        if ($this->messageGroupId != null) {
181 2
            $result['MessageGroupId'] = $this->messageGroupId;
182
        }
183
184 6
        if ($this->messageDeduplicationIdCalculator != null) {
185 1
            $result['MessageDeduplicationId'] = $this->messageDeduplicationIdCalculator->getMessageId($msgBody, $routingKey, $additionalProperties);
186
        }
187
188 6
        $result = array_merge($result, $additionalProperties);
189
190 6
        return $result;
191
    }
192
193
    /**
194
     * @param string $contentType
195
     * @return Producer
196
     */
197 6
    public function setContentType($contentType)
198
    {
199 6
        $this->contentType = $contentType;
200
201 6
        return $this;
202
    }
203
}
204