Passed
Pull Request — master (#29)
by Sébastien
03:40
created

ConsumeContext   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 199
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Importance

Changes 1
Bugs 0 Features 1
Metric Value
wmc 22
c 1
b 0
f 1
lcom 1
cbo 11
dl 0
loc 199
rs 10

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
A theQueueContainsTheTextMessage() 0 9 1
A theQueueContainsTheJsonMessage() 0 12 1
A iConsumeAllTheMessagesInTheQueue() 0 16 1
A process() 0 4 1
A iHaveConsumedMessage() 0 4 1
A theMessageIsATextOne() 0 4 1
A theMessageIsAJsonOne() 0 4 1
A theMessageIs() 0 7 1
A theMessageContainsTheJson() 0 4 1
A theMessageContains() 0 6 1
A oneOfTheMessagesIsATextOne() 0 4 1
A oneOfTheMessagesIsAJsonOne() 0 4 1
A oneOfTheMessagesIs() 0 16 3
A oneOfTheMessagesContainsTheJson() 0 4 1
A oneOfTheMessagesContains() 0 15 3
A theQueueContainsTheCompressedTextMessage() 0 8 1
A theMessageIsAnUncompressedTextOne() 0 4 1
1
<?php
2
3
namespace Puzzle\AMQP\Contexts;
4
5
use Puzzle\AMQP\Messages\Message;
6
use Puzzle\AMQP\Consumers\Insomniac;
7
use Puzzle\AMQP\Workers\Worker;
8
use Puzzle\AMQP\Workers\WorkerContext;
9
use Puzzle\AMQP\ReadableMessage;
10
use Psr\Log\LoggerAwareTrait;
11
use Psr\Log\NullLogger;
12
use Puzzle\AMQP\Workers\ProcessorInterfaceAdapter;
13
use Puzzle\AMQP\Messages\ContentType;
14
use Puzzle\AMQP\Messages\Bodies\Json;
15
use Puzzle\AMQP\Messages\Processors\GZip;
16
17
class ConsumeContext extends AbstractRabbitMQContext implements Worker
18
{
19
    use LoggerAwareTrait;
20
    
21
    private
22
        $consumedMessages;
23
    
24
    public function __construct($path)
25
    {
26
        parent::__construct($path);
27
        
28
        $this->logger = new NullLogger();
29
        $this->consumedMessages = [];
30
    }
31
    
32
    /**
33
     * @Given The queue :queue contains the text message :bodyContent
34
     */
35
    public function theQueueContainsTheTextMessage($bodyContent, $queue)
0 ignored issues
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
36
    {
37
        // FIXME Use RabbitMQCTL instead
0 ignored issues
show
Coding Style introduced by
Comment refers to a FIXME task "Use RabbitMQCTL instead"
Loading history...
38
        
39
        $message = new Message(self::TEXT_ROUTING_KEY);
40
        $message->setText($bodyContent);
41
        
42
        $this->client->publish($this->exchange, $message);
43
    }
44
45
    /**
46
     * @Given The queue :queue contains the json message :bodyContent
47
     */
48
    public function theQueueContainsTheJsonMessage($bodyContent, $queue)
0 ignored issues
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
49
    {
50
        // FIXME Use RabbitMQCTL instead
0 ignored issues
show
Coding Style introduced by
Comment refers to a FIXME task "Use RabbitMQCTL instead"
Loading history...
51
        
52
        $message = new Message(self::JSON_ROUTING_KEY);
53
        
54
        $body = new Json();
55
        $body->changeContentWithJson($bodyContent);
56
        $message->setBody($body);
57
        
58
        $this->client->publish($this->exchange, $message);
59
    }
60
    
61
    /**
62
     * @When I consume all the messages in the queue :queue
63
     */
64
    public function iConsumeAllTheMessagesInTheQueue($queue)
65
    {
66
        $this->consumedMessages = [];
67
        $workerContext = new WorkerContext(
68
            function() {
69
                return $this;
70
            },
71
            $consumer = new Insomniac(),
72
            $queue
73
        );
74
        
75
        $processor = new ProcessorInterfaceAdapter($workerContext);
76
        $processor->appendMessageProcessor(new GZip());
77
78
        $consumer->consume($processor, $this->client, $workerContext);
79
    }
80
    
81
    public function process(ReadableMessage $message)
82
    {
83
        $this->consumedMessages[] = $message;
84
    }
85
    
86
    /**
87
     * @Then /I have consumed (\d+) messages?/
88
     */
89
    public function iHaveConsumedMessage($nbMessages)
90
    {
91
        \PHPUnit_Framework_Assert::assertSame((int) $nbMessages, count($this->consumedMessages));
92
    }
93
    
94
    /**
95
     * @Then the message is a text one
96
     */
97
    public function theMessageIsATextOne()
98
    {
99
        $this->theMessageIs(self::TEXT_ROUTING_KEY, ContentType::TEXT);
100
    }
101
    
102
    /**
103
     * @Then the message is a json one
104
     */
105
    public function theMessageIsAJsonOne()
106
    {
107
        $this->theMessageIs(self::JSON_ROUTING_KEY, ContentType::JSON);
108
    }
109
    
110
    private function theMessageIs($routingKey, $contentType)
111
    {
112
        $firstMessage = $this->consumedMessages[0];
113
        
114
        \PHPUnit_Framework_Assert::assertSame($firstMessage->getRoutingKeyFromHeader(), $routingKey);
115
        \PHPUnit_Framework_Assert::assertSame($firstMessage->getContentType(), $contentType);
116
    }
117
    
118
    /**
119
     * @Then the message contains the json :jsonString
120
     */
121
    public function theMessageContainsTheJson($jsonString)
122
    {
123
        $this->theMessageContains(json_decode($jsonString, true));
124
    }
125
    
126
    /**
127
     * @Then the message contains :bodyContent
128
     */
129
    public function theMessageContains($bodyContent)
130
    {
131
        $firstMessage = $this->consumedMessages[0];
132
        
133
        \PHPUnit_Framework_Assert::assertSame($firstMessage->getBodyInOriginalFormat(), $bodyContent);
134
    }
135
136
    /**
137
     * @Then one of the messages is a text one
138
     */
139
    public function oneOfTheMessagesIsATextOne()
140
    {
141
        $this->oneOfTheMessagesIs(ContentType::TEXT, self::TEXT_ROUTING_KEY);
142
    }
143
144
    /**
145
     * @Then one of the messages is a json one
146
     */
147
    public function oneOfTheMessagesIsAJsonOne()
148
    {
149
        $this->oneOfTheMessagesIs(ContentType::JSON, self::JSON_ROUTING_KEY);
150
    }
151
    
152
    private function oneOfTheMessagesIs($contentType, $routingKey)
153
    {
154
        $found = null;
155
        
156
        foreach($this->consumedMessages as $message)
157
        {
158
            if($message->getContentType() === $contentType)
159
            {
160
                $found = $message;
161
                break;
162
            }
163
        }
164
        
165
        \PHPUnit_Framework_Assert::assertNotNull($found);
166
        \PHPUnit_Framework_Assert::assertSame($routingKey, $found->getRoutingKeyFromHeader());
167
    }
168
    
169
    /**
170
     * @Then one of the messages contains the json :jsonString
171
     */
172
    public function oneOfTheMessagesContainsTheJson($jsonString)
173
    {
174
        $this->oneOfTheMessagesContains(json_decode($jsonString, true));
175
    }
176
    
177
    /**
178
     * @Then one of the messages contains :bodyContent
179
     */
180
    public function oneOfTheMessagesContains($bodyContent)
181
    {
182
        $found = false;
183
        
184
        foreach($this->consumedMessages as $message)
185
        {
186
            if($message->getBodyInOriginalFormat() === $bodyContent)
187
            {
188
                $found = true;
189
                break;
190
            }
191
        }
192
        
193
        \PHPUnit_Framework_Assert::assertTrue($found);
194
    }
195
196
    /**
197
     * @Given The queue :queue contains the compressed text message :bodyContent
198
     */
199
    public function theQueueContainsTheCompressedTextMessage($bodyContent, $queue)
1 ignored issue
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
200
    {
201
        $message = new Message(self::COMPRESSED_ROUTING_KEY);
202
        $message->setText($bodyContent);
203
        $message->allowCompression();
204
205
        $this->client->publish($this->exchange, $message);
206
    }
207
208
    /**
209
     * @Then the message is an uncompressed text one
210
     */
211
    public function theMessageIsAnUncompressedTextOne()
212
    {
213
        $this->theMessageIs(self::COMPRESSED_ROUTING_KEY, ContentType::TEXT);
214
    }
215
}
216