GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#393)
by
unknown
06:35
created

Consumer   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 193
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 57.47%

Importance

Changes 0
Metric Value
wmc 21
lcom 1
cbo 8
dl 0
loc 193
ccs 50
cts 87
cp 0.5747
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
B tick() 0 30 6
A shutdown() 0 4 1
A pause() 0 4 1
A resume() 0 4 1
A consume() 0 8 2
A invoke() 0 21 3
A configure() 0 12 2
A bind() 0 10 2
A rejectDispatch() 0 12 2
1
<?php
2
3
namespace Bernard;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Bernard\Event\EnvelopeEvent;
7
use Bernard\Event\PingEvent;
8
use Bernard\Event\RejectEnvelopeEvent;
9
10
class Consumer
11
{
12
    protected $router;
13
    protected $dispatcher;
14
    protected $shutdown = false;
15
    protected $pause = false;
16
    protected $configured = false;
17
    protected $options = [
18
        'max-runtime' => PHP_INT_MAX,
19
        'max-messages' => null,
20
        'stop-when-empty' => false,
21
        'stop-on-error' => false,
22
    ];
23
24
    /**
25
     * @param Router                   $router
26
     * @param EventDispatcherInterface $dispatcher
27
     */
28 11
    public function __construct(Router $router, EventDispatcherInterface $dispatcher)
29
    {
30 11
        $this->router = $router;
31 11
        $this->dispatcher = $dispatcher;
32 11
    }
33
34
    /**
35
     * Starts an infinite loop calling Consumer::tick();.
36
     *
37
     * @param Queue $queue
38
     * @param array $options
39
     * @throws \Throwable
40
     */
41
    public function consume(Queue $queue, array $options = [])
42
    {
43
        $this->bind();
44
45
        while ($this->tick($queue, $options)) {
46
            pcntl_signal_dispatch();
47
        }
48
    }
49
50
    /**
51
     * Returns true do indicate it should be run again or false to indicate
52
     * it should not be run again.
53
     *
54
     * @param Queue $queue
55
     * @param array $options
56
     *
57
     * @return bool
58
     * @throws \Throwable
59
     */
60 10
    public function tick(Queue $queue, array $options = [])
61
    {
62 10
        $this->configure($options);
63
64 10
        if ($this->shutdown) {
65 1
            return false;
66
        }
67
68 9
        if (microtime(true) > $this->options['max-runtime']) {
69 1
            return false;
70
        }
71
72 8
        if ($this->pause) {
73 1
            return true;
74
        }
75
76 8
        $this->dispatcher->dispatch(BernardEvents::PING, new PingEvent($queue));
77
78 8
        if (!$envelope = $queue->dequeue()) {
79 2
            return !$this->options['stop-when-empty'];
80
        }
81
82 7
        $this->invoke($envelope, $queue);
83
84 6
        if (null === $this->options['max-messages']) {
85 5
            return true;
86
        }
87
88 1
        return (bool) --$this->options['max-messages'];
89
    }
90
91
    /**
92
     * Mark Consumer as shutdown.
93
     */
94 1
    public function shutdown()
95
    {
96 1
        $this->shutdown = true;
97 1
    }
98
99
    /**
100
     * Pause consuming.
101
     */
102 1
    public function pause()
103
    {
104 1
        $this->pause = true;
105 1
    }
106
107
    /**
108
     * Resume consuming.
109
     */
110 1
    public function resume()
111
    {
112 1
        $this->pause = false;
113 1
    }
114
115
    /**
116
     * Until there is a real extension point to doing invoked stuff, this can be used
117
     * by wrapping the invoke method.
118
     *
119
     * @param Envelope $envelope
120
     * @param Queue    $queue
121
     *
122
     * @throws \Exception
123
     * @throws \Throwable
124
     */
125 8
    public function invoke(Envelope $envelope, Queue $queue)
126
    {
127
        try {
128 8
            $this->dispatcher->dispatch(BernardEvents::INVOKE, new EnvelopeEvent($envelope, $queue));
129
130 8
            $receiver = $this->router->route($envelope);
131 7
            $receiver->receive($envelope->getMessage());
132
133
            // We successfully processed the message.
134 6
            $queue->acknowledge($envelope);
135
136 6
            $this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue));
137
138 8
        } catch (\Throwable $error) {
0 ignored issues
show
Bug introduced by
The class Throwable does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
139
            // php 7
140
            $this->rejectDispatch($error, $envelope, $queue);
141 2
        } catch (\Exception $exception) {
142
            // php 5
143 2
            $this->rejectDispatch($exception, $envelope, $queue);
144
        }
145 7
    }
146
147
    /**
148
     * @param array $options
149
     * @return array
150
     */
151 10
    protected function configure(array $options)
152
    {
153 10
        if ($this->configured) {
154 3
            return $this->options;
155
        }
156
157 10
        $this->options = array_filter($options) + $this->options;
158 10
        $this->options['max-runtime'] += microtime(true);
159 10
        $this->configured = true;
160
161 10
        return $this->options;
162
    }
163
164
    /**
165
     * Setup signal handlers for unix signals.
166
     *
167
     * If the process control extension does not exist (e.g. on Windows), ignore the signal handlers.
168
     * The difference is that when terminating the consumer, running processes will not stop gracefully
169
     * and will terminate immediately.
170
     */
171
    protected function bind()
172
    {
173
        if (function_exists('pcntl_signal')) {
174
            pcntl_signal(SIGTERM, [$this, 'shutdown']);
175
            pcntl_signal(SIGINT, [$this, 'shutdown']);
176
            pcntl_signal(SIGQUIT, [$this, 'shutdown']);
177
            pcntl_signal(SIGUSR2, [$this, 'pause']);
178
            pcntl_signal(SIGCONT, [$this, 'resume']);
179
        }
180
    }
181
182
    /**
183
     * @param \Throwable|\Exception $exception note that the type-hint is missing due to PHP 5.x compat
184
     * @param Envelope              $envelope
185
     * @param Queue                 $queue
186
     *
187
     * @throws \Exception
188
     * @throws \Throwable
189
     */
190 2
    private function rejectDispatch($exception, Envelope $envelope, Queue $queue)
191
    {
192
        // Make sure the exception is not interfering.
193
        // Previously failing jobs handling have been moved to a middleware.
194
        //
195
        // Emit an event to let others log that exception
196 2
        $this->dispatcher->dispatch(BernardEvents::REJECT, new RejectEnvelopeEvent($envelope, $queue, $exception));
197
198 2
        if ($this->options['stop-on-error']) {
199 1
            throw $exception;
200
        }
201 1
    }
202
}
203