Completed
Push — master ( 29c278...727efb )
by Gaetano
04:01
created

Producer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS;
4
5
use Kaliop\QueueingBundle\Queue\ProducerInterface;
6
use Aws\Sqs\SqsClient;
7
use Aws\TraceMiddleware;
8
9
class Producer implements ProducerInterface
10
{
11
    /** @var  \Aws\Sqs\SqsClient */
12
    protected $client;
13
    protected $queueUrl;
14
    protected $debug = false;
15
    protected $contentType = 'text/plain';
16
    // The message attribute used to store content-type. To be kept in sync with the Consumer
17
    protected $contentTypeAttribute = 'contentType';
18
    protected $routingKeyAttribute = 'routingKey';
19
20
    /**
21
     * @param array $config - minimum seems to be: 'credentials', 'region', 'version'
22
     * @see \Aws\AwsClient::__construct for the full list
23
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/configuration.html
24
     */
25
    public function __construct(array $config)
26
    {
27
        $this->client = new SqsClient($config);
28
    }
29
30
    /**
31
     * Enabled debug. At the moment can not disable it
32
     *
33
     * @param $debug
34
     * @return $this
35
     *
36
     * @todo test if using $handlerList->removeByInstance we can disable debug as well
37
     */
38 View Code Duplication
    public function setDebug($debug) {
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
39
        if ($debug == $this->debug) {
40
            return $this;
41
        }
42
        if ($debug) {
43
            $handlerList = $this->client->getHandlerList();
44
            $handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug));
45
        }
46
47
        return $this;
48
    }
49
50
    /**
51
     * @param string $queueName NB: complete queue name as used by SQS
0 ignored issues
show
Bug introduced by
There is no parameter named $queueName. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
52
     * @return Producer
53
     * @todo test that we can successfully send messages to 2 queues using the same SqsClient
54
     */
55
    public function setQueueUrl($queueUrl)
56
    {
57
        $this->queueUrl = $queueUrl;
58
59
        return $this;
60
    }
61
62
    /**
63
     * @return string the complete queue name as used by SQS
64
     */
65
    public function getQueueUrl()
66
    {
67
        return $this->queueUrl;
68
    }
69
70
    /**
71
     * Publishes the message and does nothing with the properties
72
     *
73
     * @param string $msgBody
74
     * @param string $routingKey
75
     * @param array $additionalProperties
76
     *
77
     * @todo support custom message attributes
78
     * @todo support custom delaySeconds
79
     */
80
    public function publish($msgBody, $routingKey = '', $additionalProperties = array())
81
    {
82
        $this->client->sendMessage(array_merge(
83
            array(
84
                'QueueUrl' => $this->queueUrl,
85
                'MessageBody' => $msgBody,
86
            ),
87
            $this->getClientParams($routingKey, $additionalProperties)
88
        ));
89
    }
90
91
    /**
92
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#sendmessagebatch
93
     * @param array $messages
94
     */
95
    public function batchPublish(array $messages)
96
    {
97
        $j = 0;
98
        for ($i = 0; $i < count($messages); $i += 10) {
0 ignored issues
show
Performance Best Practice introduced by
It seems like you are calling the size function count() as part of the test condition. You might want to compute the size beforehand, and not on each iteration.

If the size of the collection does not change during the iteration, it is generally a good practice to compute it beforehand, and not on each iteration:

for ($i=0; $i<count($array); $i++) { // calls count() on each iteration
}

// Better
for ($i=0, $c=count($array); $i<$c; $i++) { // calls count() just once
}
Loading history...
99
            $entries = array();
100
            $toSend = array_slice($messages, $i, 10);
101
            foreach($toSend as $message) {
102
                $entries[] = array_merge(
103
                    array(
104
                        'MessageBody' => $message['msgBody'],
105
                        'Id' => $j++
106
                    ),
107
                    $this->getClientParams(@$message['routingKey'], @$message['additionalProperties'])
108
                );
109
            }
110
111
            $result = $this->client->sendMessageBatch(
112
                array(
113
                    'QueueUrl' => $this->queueUrl,
114
                    'Entries' => $entries,
115
                )
116
            );
117
118
            if (($ok = count($result->get('Successful'))) != ($tot = count($toSend))) {
119
                throw new \RuntimeException("Batch sending of messages failed - $ok ok out of $tot");
120
            }
121
        }
122
    }
123
124
    /**
125
     * Allows callers to do whatever they want with the client - useful to the Queue Mgr
126
     *
127
     * @param string $method
128
     * @param array $args
129
     * @return mixed
130
     */
131
    public function call($method, array $args = array())
132
    {
133
        return $this->client->$method(array_merge($args, $this->getClientParams()));
134
    }
135
136
    /**
137
     * Prepares the extra parameters to be injected into calls made via the SQS Client
138
     * @param string $routingKey
139
     * @param array $additionalProperties
140
     * @return array
141
     */
142
    protected function getClientParams($routingKey = '', array $additionalProperties = array())
0 ignored issues
show
Unused Code introduced by
The parameter $additionalProperties is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
143
    {
144
        $result = array(
145
            'MessageAttributes' => array(
146
                $this->contentTypeAttribute => array('StringValue' => $this->contentType, 'DataType' => 'String'),
147
            )
148
        );
149
        if ($routingKey != '') {
150
            $result['MessageAttributes'][$this->routingKeyAttribute] = array('StringValue' => $routingKey, 'DataType' => 'String');
151
        }
152
153
        return $result;
154
    }
155
156
    /**
157
     * @param string $contentType
158
     * @return Producer
159
     * @throws \Exception if unsupported contentType is used
160
     */
161
    public function setContentType($contentType)
162
    {
163
        $this->contentType = $contentType;
164
165
        return $this;
166
    }
167
}
168