Passed
Pull Request — master (#41)
by Nicolas
07:46 queued 33s
created

ChunkedMessageClient::publish()   B

Complexity

Conditions 3
Paths 3

Size

Total Lines 38
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 38
ccs 25
cts 25
cp 1
rs 8.8571
c 0
b 0
f 0
cc 3
eloc 21
nc 3
nop 2
crap 3
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\AMQP\Client;
6
use Puzzle\AMQP\Clients\MemoryManagementStrategies\NullMemoryManagementStrategy;
7
use Puzzle\AMQP\Messages\Message;
8
9
class ChunkedMessageClient
10
{
11
    const
12
        DEFAULT_ROUTING_KEY_PREFIX = 'part';
13
14
    private
15
        $prefix,
16
        $client,
17
        $memory;
18
19 3
    public function __construct(Client $client, MemoryManagementStrategy $memory = null)
20
    {
21 3
        $this->changeRoutingKeyPrefix(self::DEFAULT_ROUTING_KEY_PREFIX);
22
23 3
        if(! $memory instanceof MemoryManagementStrategy)
24 3
        {
25 3
            $memory = new NullMemoryManagementStrategy();
26 3
        }
27
28 3
        $this->memory = $memory;
29 3
        $this->client = $client;
30 3
    }
31
32 3
    public function changeRoutingKeyPrefix($prefix)
33
    {
34 3
        if(is_string($prefix))
35 3
        {
36 3
            $prefix = rtrim($prefix, '.');
37
38 3
            $this->prefix = $prefix . ".";
39 3
        }
40 3
    }
41
42 3
    public function publish($exchangeName, Message $chunkedMessage)
43
    {
44 3
        $streamedContent = $chunkedMessage->getBodyInTransportFormat();
45
46 3
        if(! $streamedContent instanceof \Generator)
47 3
        {
48 1
            return $this->client->publish($exchangeName, $chunkedMessage);
49
        }
50
51 2
        $this->memory->init();
52
53 2
        $allowCompression = $chunkedMessage->isCompressionAllowed();
54
55 2
        foreach($streamedContent as $index => $chunk)
56
        {
57 2
            $message = new Message($this->prefix . $chunkedMessage->getRoutingKey());
58 2
            $message->setBinary($chunk->getContent());
59 2
            $message->allowCompression($allowCompression);
60
61 2
            $message->addHeaders($chunk->getHeaders());
62 2
            $message->addHeaders([
63
                'message' => [
64 2
                    'routingKey' => $chunkedMessage->getRoutingKey(),
65 2
                    'contentType' => $chunkedMessage->getContentType(),
66 2
                ],
67 2
            ]);
68 2
            $message->addHeaders($chunkedMessage->getHeaders());
69
70 2
            $this->client->publish($exchangeName, $message);
71
72 2
            $size = $chunk->size();
73
74 2
            unset($message);
75 2
            unset($chunk);
76
77 2
            $this->memory->manage($size);
78 2
        }
79 2
    }
80
}
81