Passed
Pull Request — master (#29)
by Nicolas
02:56
created

GZip::updateCompressedMessage()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 7
cts 7
cp 1
rs 9.6666
c 0
b 0
f 0
cc 1
eloc 5
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Puzzle\AMQP\Messages\Processors;
4
5
use Puzzle\AMQP\WritableMessage;
6
use Puzzle\AMQP\Messages\Bodies\Binary;
7
use Psr\Log\LoggerAwareTrait;
8
use Psr\Log\NullLogger;
9
use Puzzle\Pieces\StringManipulation;
10
use Puzzle\AMQP\Messages\OnPublishProcessor;
11
use Puzzle\AMQP\Messages\OnConsumeProcessor;
12
use Puzzle\AMQP\ReadableMessage;
13
use Puzzle\AMQP\Workers\ReadableMessageModifier;
14
use Puzzle\AMQP\Messages\BodyFactory;
15
16
class GZip implements OnPublishProcessor, OnConsumeProcessor
17
{
18
    use
19
        LoggerAwareTrait,
20
        StringManipulation;
21
    
22
    const
23
        HEADER_COMPRESSION = 'compression',
24
        HEADER_COMPRESSION_CONTENT_TYPE = 'compression_content-type',
25
        COMPRESSION_ALGORITHM = 'gzip';
26
    
27
    private
28
        $compressionLevel,
29
        $encodingMode;
30
    
31 12
    public function __construct()
32
    {
33 12
        $this->compressionLevel = -1;
34 12
        $this->encodingMode = FORCE_GZIP;
35 12
        $this->logger = new NullLogger();
36 12
    }
37
        
38
    /**
39
     * @return self
40
     */
41 6
    public function setCompressionLevel($compressionLevel = -1)
42
    {
43 6
        if(! $this->isCompressionLevelValid($compressionLevel))
44 6
        {
45 5
            $this->logWarning(sprintf(
46 5
                "Invalid compression level (%s)",
47 5
                $this->convertToString($compressionLevel)
48 5
            ));
49
            
50 5
            return $this;
51
        }
52
        
53 1
        $this->compressionLevel = (int) $compressionLevel;
54
        
55 1
        return $this;
56
    }
57
    
58 6
    private function isCompressionLevelValid($level)
59
    {
60 6
        return is_numeric($level) && $level >= -1 && $level <= 9;
61
    }
62
    
63 6
    private function logWarning($message)
64
    {
65 6
        $this->logger->warning(sprintf(
66 6
            "[%s] : %s",
67 6
            "PROCESSOR " . __CLASS__,
68
            $message
69 6
        ));
70 6
    }
71
    
72 1
    public function setEncodingMode($mode)
73
    {
74 1
        if(! is_scalar($mode) || ! in_array($mode, [FORCE_GZIP, FORCE_DEFLATE]))
75 1
        {
76 1
            $this->logWarning(sprintf(
77 1
                "Invalid encoding mode (%s)",
78 1
                $this->convertToString($mode)
79 1
            ));
80
            
81 1
            return $this;
82
        }
83
        
84 1
        $this->encodingMode = $mode;
85
        
86 1
        return $this;
87
    }
88
    
89 3
    public function onPublish(WritableMessage $message)
90
    {
91 3
        if($message->isCompressionAllowed() === false)
92 3
        {
93 1
            return;
94
        }
95
        
96 2
        $compressedContent = gzencode($message->getBodyInTransportFormat(), $this->compressionLevel);
97
        
98 2
        $this->updateCompressedMessage($message, $compressedContent);
99 2
    }
100
    
101 2
    private function updateCompressedMessage(WritableMessage $message, $compressedContent)
102
    {
103 2
        $message->addHeaders([
104 2
            self::HEADER_COMPRESSION => self::COMPRESSION_ALGORITHM,
105 2
            self::HEADER_COMPRESSION_CONTENT_TYPE => $message->getContentType(),
106 2
        ]);
107
        
108 2
        $message->setBody(new Binary($compressedContent));
109 2
    }
110
    
111 4
    public function onConsume(ReadableMessage $message)
112
    {
113 4
        if($this->isCompressed($message))
114 4
        {
115 1
            $message = $this->updateUncompressedMessage(
116 1
                $message,
117 1
                gzdecode($message->getBodyInOriginalFormat())
118 1
            );
119 1
        }
120
        
121 4
        return $message;
122
    }
123
    
124 4
    private function isCompressed(ReadableMessage $message)
125
    {
126 4
        $headers = $message->getHeaders();
127
        
128 4
        if(isset($headers[self::HEADER_COMPRESSION])
129 4
        && isset($headers[self::HEADER_COMPRESSION_CONTENT_TYPE]))
130 4
        {
131 2
            return self::COMPRESSION_ALGORITHM === $headers[self::HEADER_COMPRESSION];
132
        }
133
        
134 2
        return false;
135
    }
136
    
137 1
    private function updateUncompressedMessage(ReadableMessage $message, $uncompressedContent)
138
    {
139 1
        $builder = new ReadableMessageModifier($message);
140
        
141 1
        $headers = $message->getHeaders();
142 1
        $newContentType = $headers[self::HEADER_COMPRESSION_CONTENT_TYPE];
143
        
144 1
        $newBody = (new BodyFactory())->create($newContentType, $uncompressedContent);
145
        
146
        $builder
147 1
            ->changeBody($newBody)
148 1
            ->changeAttribute('content_type', $newContentType)
149 1
            ->dropHeader(self::HEADER_COMPRESSION)
150 1
            ->dropHeader(self::HEADER_COMPRESSION_CONTENT_TYPE);
151
            
152 1
        return $builder->build();
153
    }
154
}
155