BatchRequestSubscriber   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 175
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Importance

Changes 0
Metric Value
wmc 16
lcom 1
cbo 7
dl 0
loc 175
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 11 3
A __destruct() 0 5 1
A getEvents() 0 7 1
A onPrepared() 0 20 3
A onProcess() 0 13 2
A enqueue() 0 13 3
A flush() 0 13 3
1
<?php
2
3
namespace SegmentIO\Subscriber;
4
5
use GuzzleHttp\Command\Event\PreparedEvent;
6
use GuzzleHttp\Command\Event\ProcessEvent;
7
use GuzzleHttp\Command\Guzzle\DescriptionInterface;
8
use GuzzleHttp\Event\SubscriberInterface;
9
use SegmentIO\Client;
10
11
/**
12
 * BatchRequestSubscriber Class
13
 *
14
 * @author Keith Kirk <[email protected]>
15
 */
16
class BatchRequestSubscriber implements SubscriberInterface
17
{
18
    /**
19
     * Segment.io Client
20
     *
21
     * @var Client
22
     */
23
    private $client = null;
24
25
    /**
26
     * Webservice Description.
27
     *
28
     * @var DescriptionInterface
29
     */
30
    private $description;
31
32
    /**
33
     * Queue of Operations
34
     *
35
     * @var array
36
     */
37
    private $queue = [];
38
39
    /**
40
     * Determines the maximum size the queue is allowed to reach. New items pushed
41
     * to the queue will be ignored if this size is reached and cannot be flushed.
42
     * Defaults to 10000.
43
     *
44
     * @var integer
45
     */
46
    private $maxQueueSize = 10000;
47
48
    /**
49
     * Determines how many operations are sent to Segment.io in a single request.
50
     * Defaults to 100.
51
     *
52
     * @var integer
53
     */
54
    private $batchSize = 100;
55
56
    /**
57
     * Constructor
58
     *
59
     * @param DescriptionInterface $description
60
     * @param array                $options An array of configuration options
61
     */
62
    public function __construct(DescriptionInterface $description, array $options = [])
63
    {
64
        if (isset($options['max_queue_size'])) {
65
            $this->maxQueueSize = $options['max_queue_size'];
66
        }
67
68
        if (isset($options['batch_size'])) {
69
            $this->batchSize = $options['batch_size'];
70
        }
71
        $this->description = $description;
72
    }
73
74
    /**
75
     * Destructor
76
     *
77
     * Flushes any queued Operations
78
     */
79
    public function __destruct()
80
    {
81
        $this->flush();
82
        unset($this->client);
83
    }
84
85
    /**
86
     * Returns the Subscribed Events
87
     *
88
     * @return array
89
     */
90
    public function getEvents()
91
    {
92
        return [
93
            'prepared' => ['onPrepared', 'last'],
94
            'process'  => ['onProcess', 'first']
95
        ];
96
    }
97
98
    /**
99
     * Event to add Segment.io Specific data to the Event Messages
100
     *
101
     * @param PreparedEvent $event The PreparedEvent
102
     *
103
     * @return bool
104
     */
105
    public function onPrepared(PreparedEvent $event)
106
    {
107
        if (is_null($this->client)) {
108
            $this->client = $event->getClient();
109
        }
110
111
        $command   = $event->getCommand();
112
        $operation = $this->description->getOperation($command->getName());
113
114
        if (!$operation->getData('batching')) {
115
            return false;
116
        }
117
118
        $parameters = json_decode($event->getRequest()->getBody()->getContents(), true);
119
        $this->enqueue(array_merge($parameters, ['action' => $command->getName()]));
120
121
        $event->intercept(['success' => true, 'batched' => true]);
122
123
        return true;
124
    }
125
126
    /**
127
     * Stops propagation of ProcessEvents when using Batching
128
     *
129
     * @param  ProcessEvent $event The Process Event
130
     *
131
     * @return bool
132
     */
133
    public function onProcess(ProcessEvent $event)
134
    {
135
        $command   = $event->getCommand();
136
        $operation = $this->description->getOperation($command->getName());
137
138
        if (!$operation->getData('batching')) {
139
            return false;
140
        }
141
142
        $event->stopPropagation();
143
144
        return true;
145
    }
146
147
    /**
148
     * Adds User Actions to the Queue
149
     *
150
     * Will attempt to flush the queue if the size of the queue has reached
151
     * the Max Queue Size
152
     *
153
     * @param  array   $operation Operation as an associative array
154
     *
155
     * @return boolean
156
     */
157
    public function enqueue(array $operation)
158
    {
159
        if (count($this->queue) >= $this->maxQueueSize)
160
            return false;
161
162
        array_push($this->queue, $operation);
163
164
        if (count($this->queue) >= $this->maxQueueSize) {
165
            $this->flush();
166
        }
167
168
        return true;
169
    }
170
171
    /**
172
     * Flushes the queue by batching Import operations
173
     *
174
     * @return boolean
175
     */
176
    public function flush()
177
    {
178
        if (empty($this->queue)) {
179
            return false;
180
        }
181
182
        $operations = array_chunk($this->queue, $this->batchSize);
183
        foreach ($operations as $batch) {
184
            $this->client->import(['batch' => $batch]);
185
        }
186
187
        return true;
188
    }
189
190
}
191