Test Failed
Pull Request — master (#29)
by Nicolas
02:57
created

GZip::setEncodingMode()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 16
c 0
b 0
f 0
rs 9.4285
cc 3
eloc 8
nc 2
nop 1
1
<?php
2
3
namespace Puzzle\AMQP\Messages\Processors;
4
5
use Puzzle\AMQP\WritableMessage;
6
use Puzzle\AMQP\Messages\Bodies\Binary;
7
use Psr\Log\LoggerAwareTrait;
8
use Psr\Log\NullLogger;
9
use Puzzle\Pieces\StringManipulation;
10
use Puzzle\AMQP\Messages\OnPublishProcessor;
11
use Puzzle\AMQP\Messages\OnConsumeProcessor;
12
use Puzzle\AMQP\ReadableMessage;
13
use Puzzle\AMQP\Workers\ReadableMessageModifier;
14
use Puzzle\AMQP\Messages\BodyFactory;
15
16
class GZip implements OnPublishProcessor, OnConsumeProcessor
17
{
18
    use
19
        LoggerAwareTrait,
20
        StringManipulation;
21
    
22
    const
23
        HEADER_COMPRESSION = 'compression',
24
        HEADER_COMPRESSION_CONTENT_TYPE = 'compression_content-type',
25
        COMPRESSION_ALGORITHM = 'gzip';
26
    
27
    private
28
        $compressionLevel,
29
        $encodingMode;
30
    
31
    public function __construct()
32
    {
33
        $this->compressionLevel = -1;
34
        $this->encodingMode = FORCE_GZIP;
35
        $this->logger = new NullLogger();
36
    }
37
        
38
    /**
39
     * @return self
40
     */
41
    public function setCompressionLevel($compressionLevel = -1)
42
    {
43
        if(! $this->isCompressionLevelValid($compressionLevel))
44
        {
45
            $this->logWarning(sprintf(
46
                "Invalid compression level (%s)",
47
                $this->convertToString($compressionLevel)
48
            ));
49
            
50
            return $this;
51
        }
52
        
53
        $this->compressionLevel = (int) $compressionLevel;
54
        
55
        return $this;
56
    }
57
    
58
    private function isCompressionLevelValid($level)
59
    {
60
        return is_numeric($level) && $level >= -1 && $level <= 9;
61
    }
62
    
63
    private function logWarning($message)
64
    {
65
        $this->logger->warning(sprintf(
66
            "[%s] : %s",
67
            "PROCESSOR " . __CLASS__,
68
            $message
69
        ));
70
    }
71
    
72
    public function setEncodingMode($mode)
73
    {
74
        if(! is_scalar($mode) || ! in_array($mode, [FORCE_GZIP, FORCE_DEFLATE]))
75
        {
76
            $this->logWarning(sprintf(
77
                "Invalid encoding mode (%s)",
78
                $this->convertToString($mode)
79
            ));
80
            
81
            return $this;
82
        }
83
        
84
        $this->encodingMode = $mode;
85
        
86
        return $this;
87
    }
88
    
89
    public function onPublish(WritableMessage $message)
90
    {
91
        if($message->isCompressionAllowed() === false)
92
        {
93
            return;
94
        }
95
        
96
        $compressedContent = gzencode($message->getBodyInTransportFormat(), $this->compressionLevel);
97
        
98
        $this->updateCompressedMessage($message, $compressedContent);
99
    }
100
    
101
    private function updateCompressedMessage(WritableMessage $message, $compressedContent)
102
    {
103
        $message->addHeaders([
104
            self::HEADER_COMPRESSION => self::COMPRESSION_ALGORITHM,
105
            self::HEADER_COMPRESSION_CONTENT_TYPE => $message->getContentType(),
106
        ]);
107
        
108
        $message->setBody(new Binary($compressedContent));
109
    }
110
    
111
    public function onConsume(ReadableMessage $message)
112
    {
113
        if($this->isCompressed($message))
114
        {
115
            $message = $this->updateUncompressedMessage(
116
                $message,
117
                gzdecode($message->getBodyInOriginalFormat())
118
            );
119
        }
120
        
121
        return $message;
122
    }
123
    
124
    private function isCompressed(ReadableMessage $message)
125
    {
126
        $headers = $message->getHeaders();
127
        
128
        if(isset($headers[self::HEADER_COMPRESSION])
129
        && isset($headers[self::HEADER_COMPRESSION_CONTENT_TYPE]))
130
        {
131
            return self::COMPRESSION_ALGORITHM === $headers[self::HEADER_COMPRESSION];
132
        }
133
        
134
        return false;
135
    }
136
    
137
    private function updateUncompressedMessage(ReadableMessage $message, $uncompressedContent)
138
    {
139
        $builder = new ReadableMessageModifier($message);
140
        
141
        $headers = $message->getHeaders();
142
        $newContentType = $headers[self::HEADER_COMPRESSION_CONTENT_TYPE];
143
        
144
        $newBody = (new BodyFactory())->create($newContentType, $uncompressedContent);
145
        
146
        $builder
147
            ->changeBody($newBody)
148
            ->changeAttribute('content_type', $newContentType)
149
            ->dropHeader(self::HEADER_COMPRESSION)
150
            ->dropHeader(self::HEADER_COMPRESSION_CONTENT_TYPE);
151
            
152
        return $builder->build();
153
    }
154
}
155