Completed
Pull Request — master (#41)
by Nicolas
07:53
created

ChunkedMessageClient   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 72
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 7
lcom 1
cbo 4
dl 0
loc 72
ccs 41
cts 41
cp 1
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 12 2
A changeRoutingKeyPrefix() 0 9 2
B publish() 0 38 3
1
<?php
2
3
namespace Puzzle\AMQP\Clients;
4
5
use Puzzle\AMQP\Client;
6
use Puzzle\AMQP\Clients\MemoryManagementStrategies\NullMemoryManagementStrategy;
7
use Puzzle\AMQP\Messages\Message;
8
9
class ChunkedMessageClient
10
{
11
    const
12
        DEFAULT_ROUTING_KEY_PREFIX = 'part';
13
14
    private
15
        $prefix,
16
        $client,
17
        $memory;
18
19 3
    public function __construct(Client $client, MemoryManagementStrategy $memory = null)
20
    {
21 3
        $this->changeRoutingKeyPrefix(self::DEFAULT_ROUTING_KEY_PREFIX);
22
23 3
        if(! $memory instanceof MemoryManagementStrategy)
24 3
        {
25 3
            $memory = new NullMemoryManagementStrategy();
26 3
        }
27
28 3
        $this->memory = $memory;
29 3
        $this->client = $client;
30 3
    }
31
32 3
    public function changeRoutingKeyPrefix($prefix)
33
    {
34 3
        if(is_string($prefix))
35 3
        {
36 3
            $prefix = rtrim($prefix, '.');
37
38 3
            $this->prefix = $prefix . ".";
39 3
        }
40 3
    }
41
42 3
    public function publish($exchangeName, Message $chunkedMessage)
43
    {
44 3
        $streamedContent = $chunkedMessage->getBodyInTransportFormat();
45
46 3
        if(! $streamedContent instanceof \Generator)
47 3
        {
48 1
            return $this->client->publish($exchangeName, $chunkedMessage);
49
        }
50
51 2
        $this->memory->init();
52
53 2
        $allowCompression = $chunkedMessage->isCompressionAllowed();
54
55 2
        foreach($streamedContent as $index => $chunk)
56
        {
57 2
            $message = new Message($this->prefix . $chunkedMessage->getRoutingKey());
58 2
            $message->setBinary($chunk->getContent());
59 2
            $message->allowCompression($allowCompression);
60
61 2
            $message->addHeaders($chunk->getHeaders());
62 2
            $message->addHeaders([
63
                'message' => [
64 2
                    'routingKey' => $chunkedMessage->getRoutingKey(),
65 2
                    'contentType' => $chunkedMessage->getContentType(),
66 2
                ],
67 2
            ]);
68 2
            $message->addHeaders($chunkedMessage->getHeaders());
69
70 2
            $this->client->publish($exchangeName, $message);
71
72 2
            $size = $chunk->size();
73
74 2
            unset($message);
75 2
            unset($chunk);
76
77 2
            $this->memory->manage($size);
78 2
        }
79 2
    }
80
}
81