services.*HeartbeatService.Monitor   C
last analyzed

Complexity

Conditions 10

Size

Total Lines 41
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 26
dl 0
loc 41
rs 5.9999
c 0
b 0
f 0
nop 2

How to fix   Complexity   

Complexity

Complex classes like services.*HeartbeatService.Monitor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"time"
7
8
	"github.com/NdoleStudio/httpsms/pkg/events"
9
	cloudevents "github.com/cloudevents/sdk-go/v2"
10
11
	"github.com/google/uuid"
12
13
	"github.com/NdoleStudio/httpsms/pkg/repositories"
14
	"github.com/palantir/stacktrace"
15
16
	"github.com/NdoleStudio/httpsms/pkg/entities"
17
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
18
)
19
20
const (
21
	// select id, a.timestamp, a.owner,  a.timestamp - (SELECT timestamp from heartbeats b where  b.timestamp < a.timestamp and a.owner = b.owner and a.user_id = b.user_id order by b.timestamp desc  limit 1) as diff  from heartbeats a;
22
	heartbeatCheckInterval = 16 * time.Minute
23
)
24
25
// HeartbeatService is handles heartbeat requests
26
type HeartbeatService struct {
27
	service
28
	logger            telemetry.Logger
29
	tracer            telemetry.Tracer
30
	repository        repositories.HeartbeatRepository
31
	monitorRepository repositories.HeartbeatMonitorRepository
32
	dispatcher        *EventDispatcher
33
}
34
35
// NewHeartbeatService creates a new HeartbeatService
36
func NewHeartbeatService(
37
	logger telemetry.Logger,
38
	tracer telemetry.Tracer,
39
	repository repositories.HeartbeatRepository,
40
	monitorRepository repositories.HeartbeatMonitorRepository,
41
	dispatcher *EventDispatcher,
42
) (s *HeartbeatService) {
43
	return &HeartbeatService{
44
		logger:            logger.WithService(fmt.Sprintf("%T", s)),
45
		tracer:            tracer,
46
		repository:        repository,
47
		monitorRepository: monitorRepository,
48
		dispatcher:        dispatcher,
49
	}
50
}
51
52
// DeleteAllForUser deletes all entities.Heartbeat for an entities.UserID.
53
func (service *HeartbeatService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
54
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
55
	defer span.End()
56
57
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
58
		msg := fmt.Sprintf("could not delete all [entities.Heartbeat] for user with ID [%s]", userID)
59
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
60
	}
61
62
	if err := service.monitorRepository.DeleteAllForUser(ctx, userID); err != nil {
63
		msg := fmt.Sprintf("could not delete all [entities.HeartbeatMonitor] for user with ID [%s]", userID)
64
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
65
	}
66
67
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.Heartbeat] and [entities.HeartbeatMonitor] for user with ID [%s]", userID))
68
	return nil
69
}
70
71
// Index fetches the heartbeats for a phone number
72
func (service *HeartbeatService) Index(ctx context.Context, userID entities.UserID, owner string, params repositories.IndexParams) (*[]entities.Heartbeat, error) {
73
	ctx, span := service.tracer.Start(ctx)
74
	defer span.End()
75
76
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
77
78
	heartbeats, err := service.repository.Index(ctx, userID, owner, params)
79
	if err != nil {
80
		msg := fmt.Sprintf("could not fetch heartbeats with parms [%+#v]", params)
81
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
82
	}
83
84
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*heartbeats), params))
85
	return heartbeats, nil
86
}
87
88
// HeartbeatStoreParams are parameters for creating a new entities.Heartbeat
89
type HeartbeatStoreParams struct {
90
	Owner     string
91
	Version   string
92
	Charging  bool
93
	Source    string
94
	Timestamp time.Time
95
	UserID    entities.UserID
96
}
97
98
// Store a new entities.Heartbeat
99
func (service *HeartbeatService) Store(ctx context.Context, params HeartbeatStoreParams) (*entities.Heartbeat, error) {
100
	ctx, span := service.tracer.Start(ctx)
101
	defer span.End()
102
103
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
104
105
	heartbeat := &entities.Heartbeat{
106
		ID:        uuid.New(),
107
		Owner:     params.Owner,
108
		Charging:  params.Charging,
109
		Timestamp: params.Timestamp,
110
		Version:   params.Version,
111
		UserID:    params.UserID,
112
	}
113
114
	if err := service.repository.Store(ctx, heartbeat); err != nil {
115
		msg := fmt.Sprintf("cannot save heartbeat with id [%s]", heartbeat.ID)
116
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
117
	}
118
119
	ctxLogger.Info(fmt.Sprintf("heartbeat saved with id [%s] for user [%s]", heartbeat.ID, heartbeat.UserID))
120
121
	monitor, err := service.monitorRepository.Load(ctx, params.UserID, params.Owner)
122
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
123
		ctxLogger.Info(fmt.Sprintf("heartbeat monitor does not exist for owner [%s] and user [%s]", params.Owner, params.UserID))
124
		return heartbeat, nil
125
	}
126
	if err != nil {
127
		msg := fmt.Sprintf("cannot load heartbeat monitor for owner [%s] and user [%s]", params.Owner, params.UserID)
128
		ctxLogger.Error(stacktrace.Propagate(err, msg))
129
		return heartbeat, nil
130
	}
131
132
	if monitor.PhoneIsOffline() {
133
		ctxLogger.Info(fmt.Sprintf("phone with monitor ID [%s] was offline for user [%s]", monitor.ID, monitor.UserID))
134
		service.handleHeartbeatWhenPhoneWasOffline(ctx, params.Source, heartbeat, monitor)
135
	}
136
137
	return heartbeat, nil
138
}
139
140
// HeartbeatMonitorStoreParams are parameters for creating a new entities.Heartbeat
141
type HeartbeatMonitorStoreParams struct {
142
	Owner   string
143
	PhoneID uuid.UUID
144
	Source  string
145
	UserID  entities.UserID
146
}
147
148
func (service *HeartbeatService) handleHeartbeatWhenPhoneWasOffline(ctx context.Context, source string, heartbeat *entities.Heartbeat, monitor *entities.HeartbeatMonitor) {
149
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
150
	defer span.End()
151
152
	if err := service.UpdatePhoneOnline(ctx, monitor.UserID, monitor.ID, true); err != nil {
153
		msg := fmt.Sprintf("cannot update phone online status for heartbeat monitor [%s]", monitor.ID)
154
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
155
	}
156
157
	event, err := service.createEvent(events.EventTypePhoneHeartbeatOnline, source, &events.PhoneHeartbeatOnlinePayload{
158
		PhoneID:                monitor.PhoneID,
159
		UserID:                 monitor.UserID,
160
		LastHeartbeatTimestamp: heartbeat.Timestamp,
161
		Timestamp:              time.Now().UTC(),
162
		MonitorID:              monitor.ID,
163
		Owner:                  heartbeat.Owner,
164
	})
165
	if err != nil {
166
		msg := fmt.Sprintf("cannot create [%s] event for monitor with ID [%s]", events.EventTypePhoneHeartbeatOnline, monitor.ID)
167
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
168
		return
169
	}
170
171
	if err = service.dispatcher.Dispatch(ctx, event); err != nil {
172
		msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), monitor.PhoneID)
173
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
174
		return
175
	}
176
177
	ctxLogger.Info(fmt.Sprintf("[%s] event created with ID [%s] for monitor ID [%s] and user [%s]", event.Type(), event.ID(), monitor.ID, monitor.UserID))
178
	return
179
}
180
181
// StoreMonitor a new entities.HeartbeatMonitor
182
func (service *HeartbeatService) StoreMonitor(ctx context.Context, params *HeartbeatMonitorStoreParams) (*entities.HeartbeatMonitor, error) {
183
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
184
	defer span.End()
185
186
	monitor, scheduleCheck, err := service.phoneMonitor(ctx, params)
187
	if err != nil {
188
		msg := fmt.Sprintf("cannot create monitor for with userID [%s] and owner [%s]", params.UserID, params.Owner)
189
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
190
	}
191
192
	if !scheduleCheck {
193
		ctxLogger.Info(fmt.Sprintf("heartbeat monitor [%s] for owner [%s] does not need scheduling because it was updated at [%s]", monitor.ID, monitor.Owner, monitor.UpdatedAt))
194
		return monitor, nil
195
	}
196
197
	ctxLogger.Info(fmt.Sprintf("scheduling heartbeat monitor [%s] for owner [%s] and user [%s]", monitor.ID, monitor.Owner, monitor.UserID))
198
199
	monitorParams := &HeartbeatMonitorParams{
200
		Owner:     monitor.Owner,
201
		PhoneID:   monitor.PhoneID,
202
		UserID:    monitor.UserID,
203
		MonitorID: monitor.ID,
204
		Source:    params.Source,
205
	}
206
	if err = service.scheduleHeartbeatCheck(ctx, time.Now().UTC(), monitorParams); err != nil {
207
		msg := fmt.Sprintf("cannot schedule healthcheck for monitor [%s] with owner [%s] and userID [%s]", monitor.ID, params.Owner, params.UserID)
208
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
209
	}
210
211
	return monitor, nil
212
}
213
214
func (service *HeartbeatService) phoneMonitor(ctx context.Context, params *HeartbeatMonitorStoreParams) (*entities.HeartbeatMonitor, bool, error) {
215
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
216
	defer span.End()
217
218
	monitor, err := service.monitorRepository.Load(ctx, params.UserID, params.Owner)
219
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
220
		monitor = &entities.HeartbeatMonitor{
221
			ID:          uuid.New(),
222
			PhoneID:     params.PhoneID,
223
			UserID:      params.UserID,
224
			Owner:       params.Owner,
225
			PhoneOnline: true,
226
			CreatedAt:   time.Now().UTC(),
227
			UpdatedAt:   time.Now().UTC(),
228
		}
229
230
		if err = service.monitorRepository.Store(ctx, monitor); err != nil {
231
			msg := fmt.Sprintf("cannot save heartbeat monitor for owner [%s] and user [%s]", monitor.Owner, monitor.UserID)
232
			return nil, false, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
233
		}
234
235
		ctxLogger.Info(fmt.Sprintf("heartbeat monitor saved with id [%s] for owner [%s] and user [%s]", monitor.ID, monitor.Owner, monitor.UserID))
236
		return monitor, true, nil
237
	}
238
239
	if err != nil {
240
		msg := fmt.Sprintf("cannot check if monitor exists with userID [%s] and owner [%s]", params.UserID, params.Owner)
241
		return nil, false, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
242
	}
243
244
	return monitor, monitor.RequiresCheck(), nil
245
}
246
247
// DeleteMonitor an entities.HeartbeatMonitor
248
func (service *HeartbeatService) DeleteMonitor(ctx context.Context, userID entities.UserID, owner string) error {
249
	ctx, span := service.tracer.Start(ctx)
250
	defer span.End()
251
252
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
253
254
	if err := service.monitorRepository.Delete(ctx, userID, owner); err != nil {
255
		msg := fmt.Sprintf("cannot delete heartbeat monitor with userID [%s] and owner [%s]", userID, owner)
256
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
257
	}
258
259
	ctxLogger.Info(fmt.Sprintf("heartbeat monitor deleted for userID [%s] and owner [%s]", userID, owner))
260
	return nil
261
}
262
263
// UpdatePhoneOnline updates the phone_online field in an entities.HeartbeatMonitor
264
func (service *HeartbeatService) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, phoneOnline bool) error {
265
	ctx, span := service.tracer.Start(ctx)
266
	defer span.End()
267
268
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
269
270
	if err := service.monitorRepository.UpdatePhoneOnline(ctx, userID, monitorID, phoneOnline); err != nil {
271
		msg := fmt.Sprintf("cannot update heartbeat monitor [%s] with userID [%s] and status [%t]", monitorID, userID, phoneOnline)
272
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
273
	}
274
275
	ctxLogger.Info(fmt.Sprintf("heartbeat monitor [%s] updated for userID [%s] and status [%t]", monitorID, userID, phoneOnline))
276
	return nil
277
}
278
279
// HeartbeatMonitorParams are parameters for monitoring the heartbeat
280
type HeartbeatMonitorParams struct {
281
	Owner     string
282
	MonitorID uuid.UUID
283
	PhoneID   uuid.UUID
284
	UserID    entities.UserID
285
	Source    string
286
}
287
288
// Monitor the heartbeats of an owner and phone number
289
func (service *HeartbeatService) Monitor(ctx context.Context, params *HeartbeatMonitorParams) error {
290
	ctx, span := service.tracer.Start(ctx)
291
	defer span.End()
292
293
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
294
295
	monitor, err := service.monitorRepository.Load(ctx, params.UserID, params.Owner)
296
	if err != nil && stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
297
		ctxLogger.Info(fmt.Sprintf("heartbeat monitor does not exist for owner [%s] and user [%s]", params.Owner, params.UserID))
298
		return nil
299
	}
300
301
	if err != nil {
302
		msg := fmt.Sprintf("cannot check if monitor exists with userID [%s] and owner [%s]", params.UserID, params.Owner)
303
		ctxLogger.Error(stacktrace.Propagate(err, msg))
304
		return service.scheduleHeartbeatCheck(ctx, time.Now().UTC(), params)
305
	}
306
307
	// Update params in case of ID duplicate
308
	params.PhoneID = monitor.PhoneID
309
	params.MonitorID = monitor.ID
310
311
	heartbeat, err := service.repository.Last(ctx, params.UserID, params.Owner)
312
	if err != nil {
313
		msg := fmt.Sprintf("cannot fetch last heartbeat for userID [%s] and owner [%s] and ID [%s] removing check", params.UserID, params.Owner, params.MonitorID)
314
		ctxLogger.Error(stacktrace.Propagate(err, msg))
315
		return nil
316
	}
317
318
	// send urgent FCM message if the last heartbeat is late
319
	if time.Now().UTC().Sub(heartbeat.Timestamp) > heartbeatCheckInterval && time.Now().UTC().Sub(heartbeat.Timestamp) < (heartbeatCheckInterval*5) {
320
		ctxLogger.Info(fmt.Sprintf("sending missed heartbeat notification for userID [%s] and owner [%s] and monitor ID [%s]", params.UserID, params.Owner, params.MonitorID))
321
		service.handleMissedMonitor(ctx, heartbeat.Timestamp, params)
322
	}
323
324
	if time.Now().UTC().Sub(heartbeat.Timestamp) > (heartbeatCheckInterval*4) &&
325
		time.Now().UTC().Sub(heartbeat.Timestamp) < (heartbeatCheckInterval*5) && monitor.PhoneOnline {
326
		return service.handleFailedMonitor(ctx, heartbeat.Timestamp, params)
327
	}
328
329
	return service.scheduleHeartbeatCheck(ctx, heartbeat.Timestamp, params)
330
}
331
332
func (service *HeartbeatService) handleMissedMonitor(ctx context.Context, lastTimestamp time.Time, params *HeartbeatMonitorParams) {
333
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
334
	defer span.End()
335
336
	event, err := service.createPhoneHeartbeatMissedEvent(params.Source, &events.PhoneHeartbeatMissedPayload{
337
		PhoneID:                params.PhoneID,
338
		UserID:                 params.UserID,
339
		MonitorID:              params.MonitorID,
340
		LastHeartbeatTimestamp: lastTimestamp,
341
		Timestamp:              time.Now().UTC(),
342
		Owner:                  params.Owner,
343
	})
344
	if err != nil {
345
		msg := fmt.Sprintf("cannot create event when phone monitor [%s] missed heartbeat", params.MonitorID.String())
346
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
347
		return
348
	}
349
350
	if _, err = service.dispatcher.DispatchWithTimeout(ctx, event, heartbeatCheckInterval); err != nil {
351
		msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), params.PhoneID)
352
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
353
	}
354
}
355
356
func (service *HeartbeatService) handleFailedMonitor(ctx context.Context, lastTimestamp time.Time, params *HeartbeatMonitorParams) error {
357
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
358
	defer span.End()
359
360
	err := service.scheduleHeartbeatCheck(ctx, time.Now().UTC(), params)
361
	if err != nil {
362
		msg := fmt.Sprintf("cannot schedule healthcheck for monitor with owner [%s] and userID [%s]", params.Owner, params.UserID)
363
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
364
	}
365
366
	event, err := service.createPhoneHeartbeatOfflineEvent(params.Source, &events.PhoneHeartbeatOfflinePayload{
367
		PhoneID:                params.PhoneID,
368
		UserID:                 params.UserID,
369
		MonitorID:              params.MonitorID,
370
		LastHeartbeatTimestamp: lastTimestamp,
371
		Timestamp:              time.Now().UTC(),
372
		Owner:                  params.Owner,
373
	})
374
	if err != nil {
375
		msg := fmt.Sprintf("cannot create event when phone monitor failed")
376
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
377
	}
378
379
	if err = service.dispatcher.Dispatch(ctx, event); err != nil {
380
		msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), params.PhoneID)
381
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
382
	}
383
384
	ctxLogger.Info(fmt.Sprintf("heartbeat monitor with id [%s] and phone id [%s] failed for user [%s]", params.MonitorID, params.PhoneID, params.UserID))
385
	return nil
386
}
387
388
func (service *HeartbeatService) scheduleHeartbeatCheck(ctx context.Context, lastTimestamp time.Time, params *HeartbeatMonitorParams) error {
389
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
390
	defer span.End()
391
392
	event, err := service.createPhoneHeartbeatCheckEvent(params.Source, &events.PhoneHeartbeatCheckPayload{
393
		PhoneID:     params.PhoneID,
394
		UserID:      params.UserID,
395
		MonitorID:   params.MonitorID,
396
		ScheduledAt: lastTimestamp.Add(heartbeatCheckInterval),
397
		Owner:       params.Owner,
398
	})
399
	if err != nil {
400
		msg := fmt.Sprintf("cannot create event when phone monitor failed")
401
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
402
	}
403
404
	queueID, err := service.dispatcher.DispatchWithTimeout(ctx, event, heartbeatCheckInterval)
405
	if err != nil {
406
		msg := fmt.Sprintf("cannot dispatch event [%s] for heartbeat monitor with phone id [%s]", event.Type(), params.PhoneID)
407
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
408
	}
409
410
	if err = service.monitorRepository.UpdateQueueID(ctx, params.MonitorID, queueID); err != nil {
411
		msg := fmt.Sprintf("cannot update monitor with id [%s] with queue with ID [%s]", params.MonitorID, queueID)
412
		service.logger.Error(stacktrace.Propagate(err, msg))
413
	}
414
415
	ctxLogger.Info(fmt.Sprintf("heartbeat check scheduled for monitor with id [%s] and phone id [%s] and queue id [%s] for user [%s]", params.MonitorID, params.PhoneID, queueID, params.UserID))
416
417
	return nil
418
}
419
420
func (service *HeartbeatService) createPhoneHeartbeatMissedEvent(source string, payload *events.PhoneHeartbeatMissedPayload) (cloudevents.Event, error) {
421
	return service.createEvent(events.PhoneHeartbeatMissed, source, payload)
422
}
423
424
func (service *HeartbeatService) createPhoneHeartbeatOfflineEvent(source string, payload *events.PhoneHeartbeatOfflinePayload) (cloudevents.Event, error) {
425
	return service.createEvent(events.EventTypePhoneHeartbeatOffline, source, payload)
426
}
427
428
func (service *HeartbeatService) createPhoneHeartbeatCheckEvent(source string, payload *events.PhoneHeartbeatCheckPayload) (cloudevents.Event, error) {
429
	return service.createEvent(events.EventTypePhoneHeartbeatCheck, source, payload)
430
}
431