Completed
Push — master ( 924743...4f638f )
by Gaetano
03:52
created

Producer::batchPublish()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 18
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 18
rs 9.4285
cc 2
eloc 11
nc 2
nop 1
1
<?php
2
3
namespace Kaliop\Queueing\Plugins\KinesisBundle\Adapter\Kinesis;
4
5
use Kaliop\QueueingBundle\Queue\ProducerInterface;
6
use Aws\Kinesis\KinesisClient;
7
8
class Producer implements ProducerInterface
9
{
10
    /** @var  \Aws\Kinesis\KinesisClient */
11
    protected $client;
12
    protected $streamName;
13
    protected $debug;
14
15
    /**
16
     * @param array $config - minimum seems to be: 'credentials', 'region', 'version'
17
     * @see \Aws\AwsClient::__construct for the full list
18
     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/guide/guide/configuration.html
19
     */
20
    public function __construct(array $config)
21
    {
22
        $this->client = new KinesisClient($config);
23
    }
24
25
    /**
26
     * @param string $queueName
27
     * @return Producer
28
     * @todo test that we can successfully send messages to 2 queues using the same KinesisClient
29
     */
30
    public function setQueueName($queueName)
31
    {
32
        $this->streamName = $queueName;
33
34
        return $this;
35
    }
36
37
    /**
38
     * Note that this has less effect than passing a 'debug' option in constructor, as it will be
39
     * only used by publish() from now on
40
     *
41
     * @param bool $debug use null for 'undefined'
42
     * @return Producer
43
     */
44
    public function setDebug($debug)
45
    {
46
        $this->debug = $debug;
47
48
        return $this;
49
    }
50
51
    /**
52
     * Publishes the message and does nothing with the properties
53
     *
54
     * @param string $msgBody
55
     * @param string $routingKey
56
     * @param array $additionalProperties
57
     */
58
    public function publish($msgBody, $routingKey = '', $additionalProperties = array())
59
    {
60
        $result = $this->client->putRecord(array_merge(
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
61
            array(
62
                'StreamName' => $this->streamName,
63
                'Data' => $msgBody,
64
                'PartitionKey' => $routingKey
65
            ),
66
            $this->getClientParams()
67
        ));
68
    }
69
70
    public function batchPublish(array $messages)
71
    {
72
        $records = aray();
73
        foreach($messages as $message) {
74
            $records[] = array(
75
                'Data' => $message['msgBody'],
76
                'PartitionKey' => $message['routingKey']
77
            );
78
        }
79
80
        $result = $this->client->putRecords(array_merge(
0 ignored issues
show
Unused Code introduced by
$result is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
81
            array(
82
                'StreamName' => $this->streamName,
83
                'Records' => $records
84
            ),
85
            $this->getClientParams()
86
        ));
87
    }
88
89
    /**
90
     * Allows callers to do whatever they want with the client - useful to the Queue Mgr
91
     *
92
     * @param string $method
93
     * @param array $args
94
     * @return mixed
95
     */
96
    public function call($method, array $args = array())
97
    {
98
        return $this->client->$method(array_merge($args, $this->getClientParams()));
99
    }
100
101
    /**
102
     * Prepares the extra parameters to be injected into calls made via the Kinesis Client
103
     * @return array
104
     */
105
    protected function getClientParams()
106
    {
107
        if ($this->debug !== null) {
108
            return array('@http' => array('debug' => $this->debug));
109
        }
110
111
        return array();
112
    }
113
114
    /**
115
     * @param string $contentType
116
     * @return Producer
117
     * @throws \Exception if unsupported contentType is used
118
     *
119
     * @todo allow different serializations - but the Kinesis protocol does not allow to store the content type in the
120
     *       message natively, so we should have to 'invent' an encapsulation format...
121
     */
122
    public function setContentType($contentType)
123
    {
124
        if($contentType != 'application/json') {
125
            throw new \Exception("Unsupported content-type for message serialization: $contentType. Only 'application/json' is supported");
126
        }
127
128
        return $this;
129
    }
130
}