Completed
Pull Request — master (#1)
by Raymond
02:17
created

PersistentSubscriptionSettings::preferRoundRobin()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4285
cc 1
eloc 3
nc 1
nop 0
1
<?php
2
3
namespace RayRutjes\GetEventStore;
4
5
final class PersistentSubscriptionSettings
6
{
7
    const STRATEGY_ROUND_ROBIN = 'RoundRobin';
8
    const STRATEGY_DISPATCH_TO_SINGLE = 'DispatchToSingle';
9
    const CURRENT_POSITION = -1;
10
11
    /**
12
     * @var bool
13
     */
14
    private $resolveLinkTos = true;
15
16
    /**
17
     * @var string
18
     */
19
    private $namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
20
21
    /**
22
     * @var int
23
     */
24
    private $startFrom = self::CURRENT_POSITION;
25
26
    /**
27
     * @var int
28
     */
29
    private $timeout = 3;
30
31
    /**
32
     * @var int
33
     */
34
    private $checkpointAfter = 3;
35
36
    /**
37
     * @var int
38
     */
39
    private $minCheckPointCount = 100;
40
41
    /**
42
     * @var int
43
     */
44
    private $maxCheckPointCount = 500;
45
46
    /**
47
     * @var int
48
     */
49
    private $maxRetries = 3;
50
51
    /**
52
     * @var int
53
     */
54
    private $bufferSize = 20;
55
56
    /**
57
     * @var int
58
     */
59
    private $liveBufferSize = 20;
60
61
    /**
62
     * @var bool
63
     */
64
    private $extraStatistics = false;
65
66
    /**
67
     * @var int
68
     */
69
    private $readBatch = 20;
70
71
    /**
72
     * Tells the subscription to resolve link events.
73
     *
74
     * @return PersistentSubscriptionSettings
75
     */
76
    public function resolveLinkTos(): PersistentSubscriptionSettings
77
    {
78
        $this->resolveLinkTos = true;
79
80
        return $this;
81
    }
82
83
    /**
84
     * Tells the subscription to not resolve link events.
85
     *
86
     * @return PersistentSubscriptionSettings
87
     */
88
    public function doNotResolveLinkTos(): PersistentSubscriptionSettings
89
    {
90
        $this->resolveLinkTos = false;
91
92
        return $this;
93
    }
94
95
    /**
96
     * If possible prefer to round robin between the connections with messages (if not possible will use next available).
97
     *
98
     * @return PersistentSubscriptionSettings
99
     */
100
    public function preferRoundRobin(): PersistentSubscriptionSettings
101
    {
102
        $this->namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
103
104
        return $this;
105
    }
106
107
    /**
108
     * If possible prefer to dispatch to a single connection (if not possible will use next available).
109
     *
110
     * @return PersistentSubscriptionSettings
111
     */
112
    public function preferDispatchToSingle(): PersistentSubscriptionSettings
113
    {
114
        $this->namedConsumerStrategy = self::STRATEGY_DISPATCH_TO_SINGLE;
115
116
        return $this;
117
    }
118
119
    /**
120
     * Start the subscription from the position-th event in the stream.
121
     *
122
     * @return PersistentSubscriptionSettings
123
     */
124
    public function startFromBeginning(): PersistentSubscriptionSettings
125
    {
126
        $this->startFrom = 0;
127
128
        return $this;
129
    }
130
131
    public function startFrom(int $position): PersistentSubscriptionSettings
132
    {
133
        if ($position < 0) {
134
            throw new \InvalidArgumentException(sprintf('Position must be > 0, Got: %d', $position));
135
        }
136
        $this->startFrom = $position;
137
138
        return $this;
139
    }
140
141
    /**
142
     * Start the subscription from the current position.
143
     *
144
     * @return PersistentSubscriptionSettings
145
     */
146
    public function startFromCurrent(): PersistentSubscriptionSettings
147
    {
148
        $this->startFrom = self::CURRENT_POSITION;
149
150
        return $this;
151
    }
152
153
    /**
154
     * Sets the timeout for a client before the message will be retried.
155
     *
156
     * @param int $timeout
157
     *
158
     * @return PersistentSubscriptionSettings
159
     */
160
    public function withMessageTimeoutOf(int $timeout): PersistentSubscriptionSettings
161
    {
162
        if ($timeout <= 0) {
163
            throw new \InvalidArgumentException(sprintf('Timeout must be > 0, Got: %d', $timeout));
164
        }
165
166
        $this->timeout = $timeout;
167
168
        return $this;
169
    }
170
171
    /**
172
     * The amount of time the system should try to checkpoint after.
173
     *
174
     * @param int $time
175
     *
176
     * @return PersistentSubscriptionSettings
177
     */
178
    public function checkPointAfter(int $time): PersistentSubscriptionSettings
179
    {
180
        if ($time <= 0) {
181
            throw new \InvalidArgumentException(sprintf('Time must be > 0, Got: %d', $time));
182
        }
183
184
        $this->checkpointAfter = $time;
185
186
        return $this;
187
    }
188
189
    /**
190
     * The minimum number of messages to write a checkpoint for.
191
     *
192
     * @param int $count
193
     *
194
     * @return PersistentSubscriptionSettings
195
     */
196
    public function minCheckPointCountOf(int $count): PersistentSubscriptionSettings
197
    {
198
        if ($count <= 0) {
199
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
200
        }
201
202
        $this->minCheckPointCount = $count;
203
204
        return $this;
205
    }
206
207
    /**
208
     * The maximum number of messages not checkpointed before forcing a checkpoint.
209
     *
210
     * @param int $count
211
     *
212
     * @return PersistentSubscriptionSettings
213
     */
214
    public function maxCheckPointCountOf(int $count): PersistentSubscriptionSettings
215
    {
216
        if ($count <= 0) {
217
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
218
        }
219
220
        if ($count < $this->minCheckPointCount) {
221
            throw new \InvalidArgumentException(sprintf('Maximum checkpoint count %d should be > to min checkpoint count %d.', $count, $this->minCheckPointCount));
222
        }
223
224
        $this->maxCheckPointCount = $count;
225
226
        return $this;
227
    }
228
229
    /**
230
     * Sets the number of times a message should be retried before being considered a bad message.
231
     *
232
     * @param int $count
233
     *
234
     * @return PersistentSubscriptionSettings
235
     */
236
    public function withMaxRetriesOf(int $count): PersistentSubscriptionSettings
237
    {
238
        if ($count < 0) {
239
            throw new \InvalidArgumentException(sprintf('Max retries count must be >= 0, Got: %d', $count));
240
        }
241
242
        $this->maxRetries = $count;
243
244
        return $this;
245
    }
246
247
    /**
248
     * 	The size of the read batch when in paging mode.
249
     *
250
     * @param int $count
251
     *
252
     * @return PersistentSubscriptionSettings
253
     */
254
    public function WithReadBatchOf(int $count): PersistentSubscriptionSettings
255
    {
256
        if ($count < 0) {
257
            throw new \InvalidArgumentException(sprintf('Read batch must be > 0, Got: %d', $count));
258
        }
259
260
        $this->readBatch = $count;
261
262
        return $this;
263
    }
264
265
    /**
266
     * The number of messages that should be buffered when in paging mode.
267
     *
268
     * @param int $count
269
     *
270
     * @return PersistentSubscriptionSettings
271
     */
272
    public function withBufferSizeOf(int $count): PersistentSubscriptionSettings
273
    {
274
        if ($count <= 0) {
275
            throw new \InvalidArgumentException(sprintf('Buffer size should be > 0, Got: %d', $count));
276
        }
277
278
        $this->bufferSize = $count;
279
280
        return $this;
281
    }
282
283
    /**
284
     * The size of the live buffer (in memory) before resorting to paging.
285
     *
286
     * @param int $count
287
     *
288
     * @return PersistentSubscriptionSettings
289
     */
290
    public function withLiveBufferSizeOf(int $count): PersistentSubscriptionSettings
291
    {
292
        if ($count <= 0) {
293
            throw new \InvalidArgumentException(sprintf('Live buffer size should be > 0, Got: %d', $count));
294
        }
295
296
        $this->liveBufferSize = $count;
297
298
        return $this;
299
    }
300
301
    /**
302
     * Tells the backend to measure timings on the clients so statistics will contain histograms of them.
303
     *
304
     * @return PersistentSubscriptionSettings
305
     */
306
    public function withExtraStatistics(): PersistentSubscriptionSettings
307
    {
308
        $this->extraStatistics = true;
309
310
        return $this;
311
    }
312
}
313