Completed
Pull Request — master (#1)
by Raymond
03:28
created

PersistentSubscriptionSettings::preferRoundRobin()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4285
cc 1
eloc 3
nc 1
nop 0
1
<?php
2
3
namespace RayRutjes\GetEventStore;
4
5
final class PersistentSubscriptionSettings
6
{
7
    const STRATEGY_ROUND_ROBIN = 'RoundRobin';
8
    const STRATEGY_DISPATCH_TO_SINGLE = 'DispatchToSingle';
9
    const CURRENT_POSITION = -1;
10
11
    /**
12
     * @var bool
13
     */
14
    private $resolveLinktos = true;
15
16
    /**
17
     * @var string
18
     */
19
    private $namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
20
21
    /**
22
     * @var int
23
     */
24
    private $startFrom = self::CURRENT_POSITION;
25
26
    /**
27
     * @var int
28
     */
29
    private $messageTimeoutMilliseconds = 3;
30
31
    /**
32
     * @var int
33
     */
34
    private $checkPointAfterMilliseconds = 3;
35
36
    /**
37
     * @var int
38
     */
39
    private $minCheckPointCount = 100;
40
41
    /**
42
     * @var int
43
     */
44
    private $maxCheckPointCount = 500;
45
46
    /**
47
     * @var int
48
     */
49
    private $maxRetryCount = 3;
50
51
    /**
52
     * @var bool
53
     */
54
    private $extraStatistics = false;
55
56
    /**
57
     * @var int
58
     */
59
    private $readBatchSize = 20;
60
61
    /**
62
     * @var int
63
     */
64
    private $maxSubscriberCount = 10;
65
66
    /**
67
     * Tells the subscription to resolve link events.
68
     *
69
     * @return PersistentSubscriptionSettings
70
     */
71
    public function resolveLinktos(): PersistentSubscriptionSettings
72
    {
73
        $this->resolveLinktos = true;
74
75
        return $this;
76
    }
77
78
    /**
79
     * Tells the subscription to not resolve link events.
80
     *
81
     * @return PersistentSubscriptionSettings
82
     */
83
    public function doNotResolveLinktos(): PersistentSubscriptionSettings
84
    {
85
        $this->resolveLinktos = false;
86
87
        return $this;
88
    }
89
90
    /**
91
     * If possible prefer to round robin between the connections with messages (if not possible will use next available).
92
     *
93
     * @return PersistentSubscriptionSettings
94
     */
95
    public function preferRoundRobin(): PersistentSubscriptionSettings
96
    {
97
        $this->namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
98
99
        return $this;
100
    }
101
102
    /**
103
     * If possible prefer to dispatch to a single connection (if not possible will use next available).
104
     *
105
     * @return PersistentSubscriptionSettings
106
     */
107
    public function preferDispatchToSingle(): PersistentSubscriptionSettings
108
    {
109
        $this->namedConsumerStrategy = self::STRATEGY_DISPATCH_TO_SINGLE;
110
111
        return $this;
112
    }
113
114
    /**
115
     * Start the subscription from the position-th event in the stream.
116
     *
117
     * @return PersistentSubscriptionSettings
118
     */
119
    public function startFromBeginning(): PersistentSubscriptionSettings
120
    {
121
        $this->startFrom = 0;
122
123
        return $this;
124
    }
125
126
    /**
127
     * Start the subscription from the position-th event in the stream.
128
     *
129
     * @param int $position
130
     *
131
     * @return PersistentSubscriptionSettings
132
     */
133
    public function startFrom(int $position): PersistentSubscriptionSettings
134
    {
135
        if ($position < 0) {
136
            throw new \InvalidArgumentException(sprintf('Position must be > 0, Got: %d', $position));
137
        }
138
        $this->startFrom = $position;
139
140
        return $this;
141
    }
142
143
    /**
144
     * Start the subscription from the current position.
145
     *
146
     * @return PersistentSubscriptionSettings
147
     */
148
    public function startFromCurrent(): PersistentSubscriptionSettings
149
    {
150
        $this->startFrom = self::CURRENT_POSITION;
151
152
        return $this;
153
    }
154
155
    /**
156
     * Sets the timeout in milliseconds for a client before the message will be retried.
157
     *
158
     * @param int $timeout
159
     *
160
     * @return PersistentSubscriptionSettings
161
     */
162
    public function withMessageTimeoutInMillisecondsOf(int $timeout): PersistentSubscriptionSettings
163
    {
164
        if ($timeout <= 0) {
165
            throw new \InvalidArgumentException(sprintf('Timeout must be > 0, Got: %d', $timeout));
166
        }
167
168
        $this->messageTimeoutMilliseconds = $timeout;
169
170
        return $this;
171
    }
172
173
    /**
174
     * The amount of time the system should try to checkpoint after.
175
     *
176
     * @param int $time
177
     *
178
     * @return PersistentSubscriptionSettings
179
     */
180
    public function checkPointAfter(int $time): PersistentSubscriptionSettings
181
    {
182
        if ($time <= 0) {
183
            throw new \InvalidArgumentException(sprintf('Time must be > 0, Got: %d', $time));
184
        }
185
186
        $this->checkPointAfterMilliseconds = $time;
187
188
        return $this;
189
    }
190
191
    /**
192
     * The minimum number of messages to write a checkpoint for.
193
     *
194
     * @param int $count
195
     *
196
     * @return PersistentSubscriptionSettings
197
     */
198
    public function minCheckPointOf(int $count): PersistentSubscriptionSettings
199
    {
200
        if ($count <= 0) {
201
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
202
        }
203
204
        $this->minCheckPointCount = $count;
205
206
        return $this;
207
    }
208
209
    /**
210
     * The maximum number of messages not checkpointed before forcing a checkpoint.
211
     *
212
     * @param int $count
213
     *
214
     * @return PersistentSubscriptionSettings
215
     */
216
    public function maxCheckPointOf(int $count): PersistentSubscriptionSettings
217
    {
218
        if ($count <= 0) {
219
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
220
        }
221
222
        if ($count < $this->minCheckPointCount) {
223
            throw new \InvalidArgumentException(sprintf('Maximum checkpoint count %d should be > to min checkpoint count %d.', $count, $this->minCheckPointCount));
224
        }
225
226
        $this->maxCheckPointCount = $count;
227
228
        return $this;
229
    }
230
231
    /**
232
     * Sets the number of times a message should be retried before being considered a bad message.
233
     *
234
     * @param int $count
235
     *
236
     * @return PersistentSubscriptionSettings
237
     */
238
    public function withMaxRetriesOf(int $count): PersistentSubscriptionSettings
239
    {
240
        if ($count < 0) {
241
            throw new \InvalidArgumentException(sprintf('Max retries count must be >= 0, Got: %d', $count));
242
        }
243
244
        $this->maxRetryCount = $count;
245
246
        return $this;
247
    }
248
249
    /**
250
     * 	The size of the read batch when in paging mode.
251
     *
252
     * @param int $count
253
     *
254
     * @return PersistentSubscriptionSettings
255
     */
256
    public function WithReadBatchOf(int $count): PersistentSubscriptionSettings
257
    {
258
        if ($count < 0) {
259
            throw new \InvalidArgumentException(sprintf('Read batch must be > 0, Got: %d', $count));
260
        }
261
262
        $this->readBatchSize = $count;
263
264
        return $this;
265
    }
266
267
    /**
268
     * Tells the backend to measure timings on the clients so statistics will contain histograms of them.
269
     *
270
     * @return PersistentSubscriptionSettings
271
     */
272
    public function withExtraStatistics(): PersistentSubscriptionSettings
273
    {
274
        $this->extraStatistics = true;
275
276
        return $this;
277
    }
278
279
    /**
280
     * @param int $count
281
     */
282
    public function withMaxSubscribersOf(int $count)
283
    {
284
        if ($count <= 0) {
285
            throw new \InvalidArgumentException(sprintf('Max subscribers count must be > 0, Got: %d', $count));
286
        }
287
288
        $this->maxSubscriberCount = $count;
289
290
        return $this;
291
    }
292
293
    /**
294
     * Sets the maximum number of allowed subscribers.
295
     *
296
     * @return array
297
     */
298
    public function toArray()
299
    {
300
        return [
301
            'checkPointAfterMilliseconds' => $this->checkPointAfterMilliseconds,
302
            'extraStatistics'             => $this->extraStatistics,
303
            'maxCheckPointCount'          => $this->maxCheckPointCount,
304
            'minCheckPointCount'          => $this->minCheckPointCount,
305
            'maxRetryCount'               => $this->maxRetryCount,
306
            'namedConsumerStrategy'       => $this->namedConsumerStrategy,
307
            'readBatchSize'               => $this->readBatchSize,
308
            'messageTimeoutMilliseconds'  => $this->messageTimeoutMilliseconds,
309
            'namedConsumerStrategy'       => $this->namedConsumerStrategy,
310
            'maxSubscriberCount'          => $this->maxSubscriberCount,
311
            'resolveLinktos'              => $this->resolveLinktos,
312
        ];
313
    }
314
}
315