Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

listeners.*UserListener.onUserAccountDeleted   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
nop 2
1
package listeners
2
3
import (
4
	"context"
5
	"fmt"
6
7
	"github.com/davecgh/go-spew/spew"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
	"github.com/NdoleStudio/httpsms/pkg/services"
11
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
12
	cloudevents "github.com/cloudevents/sdk-go/v2"
13
	"github.com/palantir/stacktrace"
14
)
15
16
// UserListener handles cloud events which sends notifications
17
type UserListener struct {
18
	logger  telemetry.Logger
19
	tracer  telemetry.Tracer
20
	service *services.UserService
21
}
22
23
// NewUserListener creates a new instance of UserListener
24
func NewUserListener(
25
	logger telemetry.Logger,
26
	tracer telemetry.Tracer,
27
	service *services.UserService,
28
) (l *UserListener, routes map[string]events.EventListener) {
29
	l = &UserListener{
30
		logger:  logger.WithService(fmt.Sprintf("%T", l)),
31
		tracer:  tracer,
32
		service: service,
33
	}
34
35
	return l, map[string]events.EventListener{
36
		events.EventTypePhoneHeartbeatOffline: l.onPhoneHeartbeatDead,
37
		events.UserSubscriptionCreated:        l.OnUserSubscriptionCreated,
38
		events.UserSubscriptionCancelled:      l.OnUserSubscriptionCancelled,
39
		events.UserSubscriptionUpdated:        l.OnUserSubscriptionUpdated,
40
		events.UserSubscriptionExpired:        l.OnUserSubscriptionExpired,
41
		events.UserAPIKeyRotated:              l.onUserAPIKeyRotated,
42
		events.UserAccountDeleted:             l.onUserAccountDeleted,
43
	}
44
}
45
46
// onPhoneHeartbeatDead handles the events.EventTypePhoneHeartbeatOffline event
47
func (listener *UserListener) onPhoneHeartbeatDead(ctx context.Context, event cloudevents.Event) error {
48
	ctx, span := listener.tracer.Start(ctx)
49
	defer span.End()
50
51
	var payload events.PhoneHeartbeatOfflinePayload
52
	if err := event.DataAs(&payload); err != nil {
53
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
54
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
55
	}
56
57
	sendParams := &services.UserSendPhoneDeadEmailParams{
58
		UserID:                 payload.UserID,
59
		PhoneID:                payload.PhoneID,
60
		Owner:                  payload.Owner,
61
		LastHeartbeatTimestamp: payload.LastHeartbeatTimestamp,
62
	}
63
64
	if err := listener.service.SendPhoneDeadEmail(ctx, sendParams); err != nil {
65
		msg := fmt.Sprintf("cannot send notification with params [%s] for event with ID [%s]", spew.Sdump(sendParams), event.ID())
66
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
67
	}
68
69
	return nil
70
}
71
72
// onAPIKeyRotated handles the events.UserAPIKeyRotated event
73
func (listener *UserListener) onUserAPIKeyRotated(ctx context.Context, event cloudevents.Event) error {
74
	ctx, span := listener.tracer.Start(ctx)
75
	defer span.End()
76
77
	payload := new(events.UserAPIKeyRotatedPayload)
78
	if err := event.DataAs(&payload); err != nil {
79
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
80
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
81
	}
82
83
	if err := listener.service.SendAPIKeyRotatedEmail(ctx, payload); err != nil {
84
		msg := fmt.Sprintf("cannot send notification with params [%s] for event with ID [%s]", spew.Sdump(payload), event.ID())
85
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
86
	}
87
88
	return nil
89
}
90
91
// OnUserSubscriptionCreated handles the events.UserSubscriptionCreated event
92
func (listener *UserListener) OnUserSubscriptionCreated(ctx context.Context, event cloudevents.Event) error {
93
	ctx, span := listener.tracer.Start(ctx)
94
	defer span.End()
95
96
	var payload events.UserSubscriptionCreatedPayload
97
	if err := event.DataAs(&payload); err != nil {
98
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
99
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
100
	}
101
102
	if err := listener.service.StartSubscription(ctx, &payload); err != nil {
103
		msg := fmt.Sprintf("cannot start subscription for user with ID [%s] for event with ID [%s]", payload.UserID, event.ID())
104
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
105
	}
106
107
	return nil
108
}
109
110
// OnUserSubscriptionCancelled handles the events.UserSubscriptionCancelled event
111
func (listener *UserListener) OnUserSubscriptionCancelled(ctx context.Context, event cloudevents.Event) error {
112
	ctx, span := listener.tracer.Start(ctx)
113
	defer span.End()
114
115
	var payload events.UserSubscriptionCancelledPayload
116
	if err := event.DataAs(&payload); err != nil {
117
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
118
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
119
	}
120
121
	if err := listener.service.CancelSubscription(ctx, &payload); err != nil {
122
		msg := fmt.Sprintf("cannot cancell subscription for user with ID [%s] for event with ID [%s]", payload.UserID, event.ID())
123
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
124
	}
125
126
	return nil
127
}
128
129
// OnUserSubscriptionExpired handles the events.UserSubscriptionExpired event
130
func (listener *UserListener) OnUserSubscriptionExpired(ctx context.Context, event cloudevents.Event) error {
131
	ctx, span := listener.tracer.Start(ctx)
132
	defer span.End()
133
134
	var payload events.UserSubscriptionExpiredPayload
135
	if err := event.DataAs(&payload); err != nil {
136
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
137
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
138
	}
139
140
	if err := listener.service.ExpireSubscription(ctx, &payload); err != nil {
141
		msg := fmt.Sprintf("cannot expire subscription for user with ID [%s] for event with ID [%s]", payload.UserID, event.ID())
142
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
143
	}
144
145
	return nil
146
}
147
148
// OnUserSubscriptionUpdated handles the events.UserSubscriptionUpdated event
149
func (listener *UserListener) OnUserSubscriptionUpdated(ctx context.Context, event cloudevents.Event) error {
150
	ctx, span := listener.tracer.Start(ctx)
151
	defer span.End()
152
153
	var payload events.UserSubscriptionUpdatedPayload
154
	if err := event.DataAs(&payload); err != nil {
155
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
156
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
157
	}
158
159
	if err := listener.service.UpdateSubscription(ctx, &payload); err != nil {
160
		msg := fmt.Sprintf("cannot expire subscription for user with ID [%s] for event with ID [%s]", payload.UserID, event.ID())
161
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
162
	}
163
164
	return nil
165
}
166
167
func (listener *UserListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
168
	ctx, span := listener.tracer.Start(ctx)
169
	defer span.End()
170
171
	var payload events.UserAccountDeletedPayload
172
	if err := event.DataAs(&payload); err != nil {
173
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
174
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
175
	}
176
177
	if err := listener.service.DeleteAuthUser(ctx, payload.UserID); err != nil {
178
		msg := fmt.Sprintf("cannot delete [entities.AuthUser] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
179
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
180
	}
181
182
	return nil
183
}
184