IncomingContext   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 218
Duplicated Lines 3.21 %

Coupling/Cohesion

Components 3
Dependencies 5

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 22
lcom 3
cbo 5
dl 7
loc 218
ccs 54
cts 54
cp 1
rs 10
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 20 1
A send() 0 6 2
A sendLocal() 7 7 2
A publish() 0 6 2
A subscribe() 0 6 2
A unsubscribe() 0 6 2
A reply() 0 6 2
A getMessageId() 0 4 1
A getHeaders() 0 4 1
A setHeader() 0 4 1
A replaceHeaders() 0 4 1
A getIncomingPhysicalMessage() 0 4 1
A getPendingTransportOperations() 0 4 1
A forwardCurrentMessageTo() 0 4 1
A shutdownThisEndpointAfterCurrentMessage() 0 4 1
A getEndpointControlToken() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
namespace PSB\Core\Pipeline\Incoming;
3
4
5
use PSB\Core\EndpointControlToken;
6
use PSB\Core\MessageProcessingContextInterface;
7
use PSB\Core\OutgoingOptionsFactory;
8
use PSB\Core\Pipeline\BusOperations;
9
use PSB\Core\Pipeline\PendingTransportOperations;
10
use PSB\Core\Pipeline\PipelineFactory;
11
use PSB\Core\Pipeline\PipelineStageContext;
12
use PSB\Core\PublishOptions;
13
use PSB\Core\ReplyOptions;
14
use PSB\Core\SendOptions;
15
use PSB\Core\SubscribeOptions;
16
use PSB\Core\Transport\IncomingPhysicalMessage;
17
use PSB\Core\UnsubscribeOptions;
18
19
abstract class IncomingContext extends PipelineStageContext implements MessageProcessingContextInterface
20
{
21
    /**
22
     * @var string
23
     */
24
    protected $messageId;
25
26
    /**
27
     * @var array
28
     */
29
    protected $headers;
30
31
    /**
32
     * @var IncomingPhysicalMessage
33
     */
34
    protected $incomingPhysicalMessage;
35
36
    /**
37
     * @var PendingTransportOperations
38
     */
39
    protected $pendingTransportOperations;
40
41
    /**
42
     * @var BusOperations
43
     */
44
    protected $busOperations;
45
46
    /**
47
     * @var OutgoingOptionsFactory
48
     */
49
    protected $outgoingOptionsFactory;
50
51
    /**
52
     * @var PipelineFactory
53
     */
54
    protected $pipelineFactory;
55
56
    /**
57
     * @var EndpointControlToken
58
     */
59
    protected $endpointControlToken;
60
61
    /**
62
     * @param string                     $messageId
63
     * @param array                      $headers
64
     * @param IncomingPhysicalMessage    $incomingPhysicalMessage
65
     * @param PendingTransportOperations $pendingTransportOperations
66
     * @param BusOperations              $busOperations
67
     * @param OutgoingOptionsFactory     $outgoingOptionsFactory
68
     * @param EndpointControlToken       $endpointControlToken
69
     * @param PipelineStageContext       $parentContext
70
     */
71 44
    public function __construct(
72
        $messageId,
73
        array $headers,
74
        IncomingPhysicalMessage $incomingPhysicalMessage,
75
        PendingTransportOperations $pendingTransportOperations,
76
        BusOperations $busOperations,
77
        OutgoingOptionsFactory $outgoingOptionsFactory,
78
        EndpointControlToken $endpointControlToken,
79
        PipelineStageContext $parentContext
80
    ) {
81 44
        parent::__construct($parentContext);
82
83 44
        $this->messageId = $messageId;
84 44
        $this->headers = $headers;
85 44
        $this->incomingPhysicalMessage = $incomingPhysicalMessage;
86 44
        $this->pendingTransportOperations = $pendingTransportOperations;
87 44
        $this->busOperations = $busOperations;
88 44
        $this->outgoingOptionsFactory = $outgoingOptionsFactory;
89 44
        $this->endpointControlToken = $endpointControlToken;
90 44
    }
91
92
    /**
93
     * @param object           $message
94
     * @param SendOptions|null $options
95
     */
96 4
    public function send($message, SendOptions $options = null)
97
    {
98 4
        $options = $options ?: $this->outgoingOptionsFactory->createSendOptions();
99
100 4
        $this->busOperations->send($message, $options, $this);
101 4
    }
102
103
    /**
104
     * @param object           $message
105
     * @param SendOptions|null $options
106
     */
107 2 View Code Duplication
    public function sendLocal($message, SendOptions $options = null)
108
    {
109 2
        $options = $options ?: $this->outgoingOptionsFactory->createSendOptions();
110 2
        $options->routeToLocalEndpointInstance();
111
112 2
        $this->send($message, $options);
113 2
    }
114
115
    /**
116
     * @param object              $message
117
     * @param PublishOptions|null $options
118
     */
119 2
    public function publish($message, PublishOptions $options = null)
120
    {
121 2
        $options = $options ?: $this->outgoingOptionsFactory->createPublishOptions();
122
123 2
        $this->busOperations->publish($message, $options, $this);
124 2
    }
125
126
    /**
127
     * @param string                $messageFqcn
128
     * @param SubscribeOptions|null $options
129
     */
130 2
    public function subscribe($messageFqcn, SubscribeOptions $options = null)
131
    {
132 2
        $options = $options ?: $this->outgoingOptionsFactory->createSubscribeOptions();
133
134 2
        $this->busOperations->subscribe($messageFqcn, $options, $this);
135 2
    }
136
137
    /**
138
     * @param string                  $messageFqcn
139
     * @param UnsubscribeOptions|null $options
140
     */
141 2
    public function unsubscribe($messageFqcn, UnsubscribeOptions $options = null)
142
    {
143 2
        $options = $options ?: $this->outgoingOptionsFactory->createUnsubscribeOptions();
144
145 2
        $this->busOperations->unsubscribe($messageFqcn, $options, $this);
146 2
    }
147
148
    /**
149
     * @param object            $message
150
     * @param ReplyOptions|null $options
151
     */
152 2
    public function reply($message, ReplyOptions $options = null)
153
    {
154 2
        $options = $options ?: $this->outgoingOptionsFactory->createReplyOptions();
155
156 2
        $this->busOperations->reply($message, $options, $this);
157 2
    }
158
159
    /**
160
     * @return string
161
     */
162 4
    public function getMessageId()
163
    {
164 4
        return $this->messageId;
165
    }
166
167
    /**
168
     * @return array
169
     */
170 6
    public function getHeaders()
171
    {
172 6
        return $this->headers;
173
    }
174
175
    /**
176
     * @param string $name
177
     * @param string $value
178
     */
179 1
    public function setHeader($name, $value)
180
    {
181 1
        $this->headers[$name] = $value;
182 1
    }
183
184
    /**
185
     * @param array $headers
186
     */
187 1
    public function replaceHeaders(array $headers)
188
    {
189 1
        $this->headers = $headers;
190 1
    }
191
192
    /**
193
     * @return IncomingPhysicalMessage
194
     */
195 4
    public function getIncomingPhysicalMessage()
196
    {
197 4
        return $this->incomingPhysicalMessage;
198
    }
199
200
    /**
201
     * @return PendingTransportOperations
202
     */
203 4
    public function getPendingTransportOperations()
204
    {
205 4
        return $this->pendingTransportOperations;
206
    }
207
208
    /**
209
     * @param string $address
210
     *
211
     * @codeCoverageIgnore
212
     */
213
    public function forwardCurrentMessageTo($address)
214
    {
215
        // TODO: Implement forwardCurrentMessageTo() method.
216
    }
217
218
    /**
219
     * It requests an endpoint shutdown to take place after the current attempt of processing this message completes,
220
     * regardless of the way it completes (with success or failure/exception).
221
     *
222
     * It effectively pulls out of listening for more messages on the queue.
223
     */
224 1
    public function shutdownThisEndpointAfterCurrentMessage()
225
    {
226 1
        $this->endpointControlToken->requestShutdown();
227 1
    }
228
229
    /**
230
     * @return EndpointControlToken
231
     */
232 4
    public function getEndpointControlToken()
233
    {
234 4
        return $this->endpointControlToken;
235
    }
236
}
237