Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

listeners.*HeartbeatListener.onUserAccountDeleted   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
nop 2
1
package listeners
2
3
import (
4
	"context"
5
	"fmt"
6
7
	cloudevents "github.com/cloudevents/sdk-go/v2"
8
	"github.com/davecgh/go-spew/spew"
9
	"github.com/palantir/stacktrace"
10
11
	"github.com/NdoleStudio/httpsms/pkg/events"
12
	"github.com/NdoleStudio/httpsms/pkg/services"
13
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
14
)
15
16
// HeartbeatListener handles cloud events which need to register entities.Heartbeat
17
type HeartbeatListener struct {
18
	logger  telemetry.Logger
19
	tracer  telemetry.Tracer
20
	service *services.HeartbeatService
21
}
22
23
// NewHeartbeatListener creates a new instance of HeartbeatListener
24
func NewHeartbeatListener(
25
	logger telemetry.Logger,
26
	tracer telemetry.Tracer,
27
	service *services.HeartbeatService,
28
) (l *HeartbeatListener, routes map[string]events.EventListener) {
29
	l = &HeartbeatListener{
30
		logger:  logger.WithService(fmt.Sprintf("%T", l)),
31
		tracer:  tracer,
32
		service: service,
33
	}
34
35
	return l, map[string]events.EventListener{
36
		events.EventTypePhoneUpdated:          l.onPhoneUpdated,
37
		events.EventTypePhoneDeleted:          l.onPhoneDeleted,
38
		events.EventTypePhoneHeartbeatCheck:   l.onPhoneHeartbeatCheck,
39
		events.EventTypePhoneHeartbeatOffline: l.onPhoneHeartbeatOffline,
40
		events.UserAccountDeleted:             l.onUserAccountDeleted,
41
	}
42
}
43
44
// onPhoneUpdated handles the events.EventTypePhoneUpdated event
45
func (listener *HeartbeatListener) onPhoneUpdated(ctx context.Context, event cloudevents.Event) error {
46
	ctx, span := listener.tracer.Start(ctx)
47
	defer span.End()
48
49
	var payload events.PhoneUpdatedPayload
50
	if err := event.DataAs(&payload); err != nil {
51
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
52
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
53
	}
54
55
	storeParams := &services.HeartbeatMonitorStoreParams{
56
		Owner:   payload.Owner,
57
		PhoneID: payload.PhoneID,
58
		UserID:  payload.UserID,
59
		Source:  event.Source(),
60
	}
61
62
	if _, err := listener.service.StoreMonitor(ctx, storeParams); err != nil {
63
		msg := fmt.Sprintf("cannot store heartbeat monitor with params [%s] for event with ID [%s]", spew.Sdump(storeParams), event.ID())
64
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
65
	}
66
67
	return nil
68
}
69
70
// onPhoneDeleted handles the events.EventTypePhoneDeleted event
71
func (listener *HeartbeatListener) onPhoneDeleted(ctx context.Context, event cloudevents.Event) error {
72
	ctx, span := listener.tracer.Start(ctx)
73
	defer span.End()
74
75
	var payload events.PhoneDeletedPayload
76
	if err := event.DataAs(&payload); err != nil {
77
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
78
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
79
	}
80
81
	if err := listener.service.DeleteMonitor(ctx, payload.UserID, payload.Owner); err != nil {
82
		msg := fmt.Sprintf("cannot delete heartbeat monitor with userID [%s] and owner [%s] for event with ID [%s]", payload.UserID, payload.Owner, event.ID())
83
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
84
	}
85
86
	return nil
87
}
88
89
// onPhoneHeartbeatCheck handles the events.EventTypePhoneHeartbeatCheck event
90
func (listener *HeartbeatListener) onPhoneHeartbeatCheck(ctx context.Context, event cloudevents.Event) error {
91
	ctx, span := listener.tracer.Start(ctx)
92
	defer span.End()
93
94
	var payload events.PhoneHeartbeatCheckPayload
95
	if err := event.DataAs(&payload); err != nil {
96
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
97
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
98
	}
99
100
	monitorParams := &services.HeartbeatMonitorParams{
101
		Owner:     payload.Owner,
102
		PhoneID:   payload.PhoneID,
103
		MonitorID: payload.MonitorID,
104
		UserID:    payload.UserID,
105
		Source:    event.Source(),
106
	}
107
108
	if err := listener.service.Monitor(ctx, monitorParams); err != nil {
109
		msg := fmt.Sprintf("cannot monitor heartbeats for userID [%s] and owner [%s] for event with ID [%s]", payload.UserID, payload.Owner, event.ID())
110
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
111
	}
112
113
	return nil
114
}
115
116
// onPhoneDeleted handles the events.EventTypePhoneDeleted event
117
func (listener *HeartbeatListener) onPhoneHeartbeatOffline(ctx context.Context, event cloudevents.Event) error {
118
	ctx, span := listener.tracer.Start(ctx)
119
	defer span.End()
120
121
	var payload events.PhoneHeartbeatOfflinePayload
122
	if err := event.DataAs(&payload); err != nil {
123
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
124
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
125
	}
126
127
	if err := listener.service.UpdatePhoneOnline(ctx, payload.UserID, payload.MonitorID, false); err != nil {
128
		msg := fmt.Sprintf("cannot delete heartbeat monitor with userID [%s] and owner [%s] for event with ID [%s]", payload.UserID, payload.Owner, event.ID())
129
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
130
	}
131
132
	return nil
133
}
134
135
func (listener *HeartbeatListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
136
	ctx, span := listener.tracer.Start(ctx)
137
	defer span.End()
138
139
	var payload events.UserAccountDeletedPayload
140
	if err := event.DataAs(&payload); err != nil {
141
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
142
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
143
	}
144
145
	if err := listener.service.DeleteAllForUser(ctx, payload.UserID); err != nil {
146
		msg := fmt.Sprintf("cannot delete [entities.Heartbeat] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
147
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
148
	}
149
150
	return nil
151
}
152