GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#208)
by Jeroen
05:43
created

Consumer   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 164
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 14
Bugs 0 Features 5
Metric Value
wmc 18
c 14
b 0
f 5
lcom 1
cbo 7
dl 0
loc 164
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A consume() 0 10 2
B tick() 0 31 6
A shutdown() 0 4 1
A pause() 0 4 1
A resume() 0 4 1
B invoke() 0 24 3
A configure() 0 10 2
A bind() 0 8 1
1
<?php
2
3
namespace Bernard;
4
5
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
6
use Bernard\Event\EnvelopeEvent;
7
use Bernard\Event\PingEvent;
8
use Bernard\Event\RejectEnvelopeEvent;
9
10
/**
11
 * @package Consumer
12
 */
13
class Consumer
14
{
15
    protected $router;
16
    protected $dispatcher;
17
    protected $shutdown = false;
18
    protected $pause = false;
19
    protected $configured = false;
20
    protected $options = [
21
        'max-runtime'  => PHP_INT_MAX,
22
        'max-messages' => null,
23
        'stop-when-empty' => false,
24
        'stop-on-error' => false,
25
    ];
26
27
    /**
28
     * @param Router                   $router
29
     * @param EventDispatcherInterface $dispatcher
30
     */
31
    public function __construct(Router $router, EventDispatcherInterface $dispatcher)
32
    {
33
        $this->router = $router;
34
        $this->dispatcher = $dispatcher;
35
    }
36
37
    /**
38
     * Starts an infinite loop calling Consumer::tick();
39
     *
40
     * @param Queue $queue
41
     * @param array $options
42
     */
43
    public function consume(Queue $queue, array $options = [])
44
    {
45
        declare(ticks=1);
46
47
        $this->bind();
48
49
        while ($this->tick($queue, $options)) {
50
            // NO op
51
        }
52
    }
53
54
    /**
55
     * Returns true do indicate it should be run again or false to indicate
56
     * it should not be run again.
57
     *
58
     * @param Queue $queue
59
     * @param array $options
60
     *
61
     * @return boolean
62
     */
63
    public function tick(Queue $queue, array $options = [])
64
    {
65
        $this->configure($options);
66
67
        if ($this->shutdown) {
68
            return false;
69
        }
70
71
        if (microtime(true) > $this->options['max-runtime']) {
72
            return false;
73
        }
74
75
        if ($this->pause) {
76
            return true;
77
        }
78
79
        $this->dispatcher->dispatch('bernard.ping', new PingEvent($queue));
80
81
        if (!$envelope = $queue->dequeue()) {
82
            return !$this->options['stop-when-empty'];
83
        }
84
85
86
        $this->invoke($envelope, $queue);
87
88
        if (null === $this->options['max-messages']) {
89
            return true;
90
        }
91
92
        return (boolean) --$this->options['max-messages'];
93
    }
94
95
    /**
96
     * Mark Consumer as shutdown
97
     */
98
    public function shutdown()
99
    {
100
        $this->shutdown = true;
101
    }
102
103
    /**
104
     * Pause consuming
105
     */
106
    public function pause()
107
    {
108
        $this->pause = true;
109
    }
110
111
    /**
112
     * Resume consuming
113
     */
114
    public function resume()
115
    {
116
        $this->pause = false;
117
    }
118
119
    /**
120
     * Until there is a real extension point to doing invoked stuff, this can be used
121
     * by wrapping the invoke method.
122
     *
123
     * @param Envelope $envelope
124
     * @param Queue    $queue
125
     */
126
    public function invoke(Envelope $envelope, Queue $queue)
127
    {
128
        try {
129
            $this->dispatcher->dispatch('bernard.invoke', new EnvelopeEvent($envelope, $queue));
130
131
            // for 5.3 support where a function name is not a callable
132
            call_user_func($this->router->map($envelope), $envelope->getMessage());
133
134
            // We successfully processed the message.
135
            $queue->acknowledge($envelope);
136
137
            $this->dispatcher->dispatch('bernard.acknowledge', new EnvelopeEvent($envelope, $queue));
138
        } catch (\Exception $e) {
139
            // Make sure the exception is not interfering.
140
            // Previously failing jobs handling have been moved to a middleware.
141
            //
142
            // Emit an event to let others log that exception
143
            $this->dispatcher->dispatch('bernard.reject', new RejectEnvelopeEvent($envelope, $queue, $e));
144
145
            if ($this->options['stop-on-error']) {
146
                throw $e;
147
            }
148
        }
149
    }
150
151
    /**
152
     * @param array $options
153
     */
154
    protected function configure(array $options)
155
    {
156
        if ($this->configured) {
157
            return $this->options;
158
        }
159
160
        $this->options = array_filter($options) + $this->options;
161
        $this->options['max-runtime'] += microtime(true);
162
        $this->configured = true;
163
    }
164
165
    /**
166
     * Setup signal handlers for unix signals.
167
     */
168
    protected function bind()
169
    {
170
        pcntl_signal(SIGTERM, [$this, 'shutdown']);
171
        pcntl_signal(SIGINT,  [$this, 'shutdown']);
0 ignored issues
show
Coding Style introduced by
Expected 1 space instead of 2 after comma in function call.
Loading history...
172
        pcntl_signal(SIGQUIT, [$this, 'shutdown']);
173
        pcntl_signal(SIGUSR2, [$this, 'pause']);
174
        pcntl_signal(SIGCONT, [$this, 'resume']);
175
    }
176
}
177