Completed
Pull Request — master (#1)
by Raymond
02:16
created

PersistentSubscriptionSettings::preferRoundRobin()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 9.4285
cc 1
eloc 3
nc 1
nop 0
1
<?php
2
3
namespace RayRutjes\GetEventStore;
4
5
final class PersistentSubscriptionSettings
6
{
7
    const STRATEGY_ROUND_ROBIN = 'RoundRobin';
8
    const STRATEGY_DISPATCH_TO_SINGLE = 'DispatchToSingle';
9
    const CURRENT_POSITION = -1;
10
11
    /**
12
     * @var bool
13
     */
14
    private $resolveLinkTos = true;
15
16
    /**
17
     * @var string
18
     */
19
    private $namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
20
21
    /**
22
     * @var int
23
     */
24
    private $startFrom = self::CURRENT_POSITION;
25
26
    /**
27
     * @var int
28
     */
29
    private $timeout = 3;
30
31
    /**
32
     * @var int
33
     */
34
    private $checkpointAfter = 3;
35
36
    /**
37
     * @var int
38
     */
39
    private $minCheckPointCount = 100;
40
41
    /**
42
     * @var int
43
     */
44
    private $maxCheckPointCount = 500;
45
46
    /**
47
     * @var int
48
     */
49
    private $maxRetries = 3;
50
51
    /**
52
     * @var bool
53
     */
54
    private $extraStatistics = false;
55
56
    /**
57
     * @var int
58
     */
59
    private $readBatch = 20;
60
61
    /**
62
     * Tells the subscription to resolve link events.
63
     *
64
     * @return PersistentSubscriptionSettings
65
     */
66
    public function resolveLinkTos(): PersistentSubscriptionSettings
67
    {
68
        $this->resolveLinkTos = true;
69
70
        return $this;
71
    }
72
73
    /**
74
     * Tells the subscription to not resolve link events.
75
     *
76
     * @return PersistentSubscriptionSettings
77
     */
78
    public function doNotResolveLinkTos(): PersistentSubscriptionSettings
79
    {
80
        $this->resolveLinkTos = false;
81
82
        return $this;
83
    }
84
85
    /**
86
     * If possible prefer to round robin between the connections with messages (if not possible will use next available).
87
     *
88
     * @return PersistentSubscriptionSettings
89
     */
90
    public function preferRoundRobin(): PersistentSubscriptionSettings
91
    {
92
        $this->namedConsumerStrategy = self::STRATEGY_ROUND_ROBIN;
93
94
        return $this;
95
    }
96
97
    /**
98
     * If possible prefer to dispatch to a single connection (if not possible will use next available).
99
     *
100
     * @return PersistentSubscriptionSettings
101
     */
102
    public function preferDispatchToSingle(): PersistentSubscriptionSettings
103
    {
104
        $this->namedConsumerStrategy = self::STRATEGY_DISPATCH_TO_SINGLE;
105
106
        return $this;
107
    }
108
109
    /**
110
     * Start the subscription from the position-th event in the stream.
111
     *
112
     * @return PersistentSubscriptionSettings
113
     */
114
    public function startFromBeginning(): PersistentSubscriptionSettings
115
    {
116
        $this->startFrom = 0;
117
118
        return $this;
119
    }
120
121
    public function startFrom(int $position): PersistentSubscriptionSettings
122
    {
123
        if ($position < 0) {
124
            throw new \InvalidArgumentException(sprintf('Position must be > 0, Got: %d', $position));
125
        }
126
        $this->startFrom = $position;
127
128
        return $this;
129
    }
130
131
    /**
132
     * Start the subscription from the current position.
133
     *
134
     * @return PersistentSubscriptionSettings
135
     */
136
    public function startFromCurrent(): PersistentSubscriptionSettings
137
    {
138
        $this->startFrom = self::CURRENT_POSITION;
139
140
        return $this;
141
    }
142
143
    /**
144
     * Sets the timeout for a client before the message will be retried.
145
     *
146
     * @param int $timeout
147
     *
148
     * @return PersistentSubscriptionSettings
149
     */
150
    public function withMessageTimeoutOf(int $timeout): PersistentSubscriptionSettings
151
    {
152
        if ($timeout <= 0) {
153
            throw new \InvalidArgumentException(sprintf('Timeout must be > 0, Got: %d', $timeout));
154
        }
155
156
        $this->timeout = $timeout;
157
158
        return $this;
159
    }
160
161
    /**
162
     * The amount of time the system should try to checkpoint after.
163
     *
164
     * @param int $time
165
     *
166
     * @return PersistentSubscriptionSettings
167
     */
168
    public function checkPointAfter(int $time): PersistentSubscriptionSettings
169
    {
170
        if ($time <= 0) {
171
            throw new \InvalidArgumentException(sprintf('Time must be > 0, Got: %d', $time));
172
        }
173
174
        $this->checkpointAfter = $time;
175
176
        return $this;
177
    }
178
179
    /**
180
     * The minimum number of messages to write a checkpoint for.
181
     *
182
     * @param int $count
183
     *
184
     * @return PersistentSubscriptionSettings
185
     */
186
    public function minCheckPointCountOf(int $count): PersistentSubscriptionSettings
187
    {
188
        if ($count <= 0) {
189
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
190
        }
191
192
        $this->minCheckPointCount = $count;
193
194
        return $this;
195
    }
196
197
    /**
198
     * The maximum number of messages not checkpointed before forcing a checkpoint.
199
     *
200
     * @param int $count
201
     *
202
     * @return PersistentSubscriptionSettings
203
     */
204
    public function maxCheckPointCountOf(int $count): PersistentSubscriptionSettings
205
    {
206
        if ($count <= 0) {
207
            throw new \InvalidArgumentException(sprintf('Count must be > 0, Got: %d', $count));
208
        }
209
210
        if ($count < $this->minCheckPointCount) {
211
            throw new \InvalidArgumentException(sprintf('Maximum checkpoint count %d should be > to min checkpoint count %d.', $count, $this->minCheckPointCount));
212
        }
213
214
        $this->maxCheckPointCount = $count;
215
216
        return $this;
217
    }
218
219
    /**
220
     * Sets the number of times a message should be retried before being considered a bad message.
221
     *
222
     * @param int $count
223
     *
224
     * @return PersistentSubscriptionSettings
225
     */
226
    public function withMaxRetriesOf(int $count): PersistentSubscriptionSettings
227
    {
228
        if ($count < 0) {
229
            throw new \InvalidArgumentException(sprintf('Max retries count must be >= 0, Got: %d', $count));
230
        }
231
232
        $this->maxRetries = $count;
233
234
        return $this;
235
    }
236
237
    /**
238
     * 	The size of the read batch when in paging mode.
239
     *
240
     * @param int $count
241
     *
242
     * @return PersistentSubscriptionSettings
243
     */
244
    public function WithReadBatchOf(int $count): PersistentSubscriptionSettings
245
    {
246
        if ($count < 0) {
247
            throw new \InvalidArgumentException(sprintf('Read batch must be > 0, Got: %d', $count));
248
        }
249
250
        $this->readBatch = $count;
251
252
        return $this;
253
    }
254
255
    /**
256
     * Tells the backend to measure timings on the clients so statistics will contain histograms of them.
257
     *
258
     * @return PersistentSubscriptionSettings
259
     */
260
    public function withExtraStatistics(): PersistentSubscriptionSettings
261
    {
262
        $this->extraStatistics = true;
263
264
        return $this;
265
    }
266
}
267