Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

listeners.*MessageListener.onUserAccountDeleted   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
nop 2
1
package listeners
2
3
import (
4
	"context"
5
	"fmt"
6
7
	"github.com/NdoleStudio/httpsms/pkg/entities"
8
	"github.com/NdoleStudio/httpsms/pkg/events"
9
	"github.com/NdoleStudio/httpsms/pkg/services"
10
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
11
	cloudevents "github.com/cloudevents/sdk-go/v2"
12
	"github.com/palantir/stacktrace"
13
)
14
15
// MessageListener handles cloud events which need to update entities.Message
16
type MessageListener struct {
17
	logger  telemetry.Logger
18
	tracer  telemetry.Tracer
19
	service *services.MessageService
20
}
21
22
// NewMessageListener creates a new instance of MessageListener
23
func NewMessageListener(
24
	logger telemetry.Logger,
25
	tracer telemetry.Tracer,
26
	service *services.MessageService,
27
) (l *MessageListener, routes map[string]events.EventListener) {
28
	l = &MessageListener{
29
		logger:  logger.WithService(fmt.Sprintf("%T", l)),
30
		tracer:  tracer,
31
		service: service,
32
	}
33
34
	return l, map[string]events.EventListener{
35
		events.EventTypeMessagePhoneSending:          l.OnMessagePhoneSending,
36
		events.EventTypeMessagePhoneSent:             l.OnMessagePhoneSent,
37
		events.EventTypeMessagePhoneDelivered:        l.OnMessagePhoneDelivered,
38
		events.EventTypeMessageSendFailed:            l.OnMessagePhoneFailed,
39
		events.EventTypeMessageNotificationSent:      l.onMessageNotificationSent,
40
		events.EventTypeMessageNotificationFailed:    l.onMessageNotificationFailed,
41
		events.EventTypeMessageSendExpiredCheck:      l.onMessageSendExpiredCheck,
42
		events.EventTypeMessageSendExpired:           l.onMessageSendExpired,
43
		events.EventTypeMessageNotificationScheduled: l.onMessageNotificationScheduled,
44
		events.MessageThreadAPIDeleted:               l.onMessageThreadAPIDeleted,
45
		events.MessageCallMissed:                     l.onMessageCallMissed,
46
		events.UserAccountDeleted:                    l.onUserAccountDeleted,
47
	}
48
}
49
50
// OnMessagePhoneSending handles the events.EventTypeMessagePhoneSending event
51
func (listener *MessageListener) OnMessagePhoneSending(ctx context.Context, event cloudevents.Event) error {
52
	ctx, span := listener.tracer.Start(ctx)
53
	defer span.End()
54
55
	var payload events.MessagePhoneSendingPayload
56
	if err := event.DataAs(&payload); err != nil {
57
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
58
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
59
	}
60
61
	handleParams := services.HandleMessageParams{
62
		ID:        payload.ID,
63
		UserID:    payload.UserID,
64
		Timestamp: event.Time(),
65
		Source:    event.Source(),
66
	}
67
68
	if err := listener.service.HandleMessageSending(ctx, handleParams); err != nil {
69
		msg := fmt.Sprintf("cannot handle sending for message with ID [%s] for event with ID [%s]", handleParams.ID, event.ID())
70
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
71
	}
72
73
	return nil
74
}
75
76
// OnMessagePhoneSent handles the events.EventTypeMessagePhoneSent event
77
func (listener *MessageListener) OnMessagePhoneSent(ctx context.Context, event cloudevents.Event) error {
78
	ctx, span := listener.tracer.Start(ctx)
79
	defer span.End()
80
81
	var payload events.MessagePhoneSentPayload
82
	if err := event.DataAs(&payload); err != nil {
83
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
84
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
85
	}
86
87
	handleParams := services.HandleMessageParams{
88
		ID:        payload.ID,
89
		UserID:    payload.UserID,
90
		Source:    event.Source(),
91
		Timestamp: payload.Timestamp,
92
	}
93
94
	if err := listener.service.HandleMessageSent(ctx, handleParams); err != nil {
95
		msg := fmt.Sprintf("cannot handle [%s] for message with ID [%s] for event with ID [%s]", event.Type(), handleParams.ID, event.ID())
96
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
97
	}
98
	return nil
99
}
100
101
// OnMessagePhoneDelivered handles the events.EventTypeMessagePhoneDelivered event
102
func (listener *MessageListener) OnMessagePhoneDelivered(ctx context.Context, event cloudevents.Event) error {
103
	ctx, span := listener.tracer.Start(ctx)
104
	defer span.End()
105
106
	var payload events.MessagePhoneDeliveredPayload
107
	if err := event.DataAs(&payload); err != nil {
108
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
109
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
110
	}
111
112
	handleParams := services.HandleMessageParams{
113
		ID:        payload.ID,
114
		UserID:    payload.UserID,
115
		Timestamp: payload.Timestamp,
116
	}
117
118
	if err := listener.service.HandleMessageDelivered(ctx, handleParams); err != nil {
119
		msg := fmt.Sprintf("cannot handle [%s] for message with ID [%s] for event with ID [%s]", event.Type(), handleParams.ID, event.ID())
120
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
121
	}
122
123
	return nil
124
}
125
126
// OnMessagePhoneFailed handles the events.EventTypeMessageSendFailed event
127
func (listener *MessageListener) OnMessagePhoneFailed(ctx context.Context, event cloudevents.Event) error {
128
	ctx, span := listener.tracer.Start(ctx)
129
	defer span.End()
130
131
	var payload events.MessageSendFailedPayload
132
	if err := event.DataAs(&payload); err != nil {
133
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
134
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
135
	}
136
137
	handleParams := services.HandleMessageFailedParams{
138
		ID:           payload.ID,
139
		UserID:       payload.UserID,
140
		ErrorMessage: payload.ErrorMessage,
141
		Timestamp:    payload.Timestamp,
142
	}
143
144
	if err := listener.service.HandleMessageFailed(ctx, handleParams); err != nil {
145
		msg := fmt.Sprintf("cannot handle [%s] for message with ID [%s] for event with ID [%s]", event.Type(), handleParams.ID, event.ID())
146
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
147
	}
148
149
	return nil
150
}
151
152
// onMessageNotificationFailed handles the events.EventTypeMessageNotificationFailed event
153
func (listener *MessageListener) onMessageNotificationFailed(ctx context.Context, event cloudevents.Event) error {
154
	ctx, span := listener.tracer.Start(ctx)
155
	defer span.End()
156
157
	var payload events.MessageNotificationFailedPayload
158
	if err := event.DataAs(&payload); err != nil {
159
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
160
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
161
	}
162
163
	message, err := listener.service.GetMessage(ctx, payload.UserID, payload.MessageID)
164
	if err != nil {
165
		msg := fmt.Sprintf("cannot load message with id [%s] and user id [%s]", payload.MessageID, payload.UserID)
166
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
167
	}
168
169
	storeParams := services.MessageStoreEventParams{
170
		MessageID:    payload.MessageID,
171
		EventName:    entities.MessageEventNameFailed,
172
		Timestamp:    payload.NotificationFailedAt,
173
		ErrorMessage: &payload.ErrorMessage,
174
		Source:       event.Source(),
175
	}
176
	if _, err = listener.service.StoreEvent(ctx, message, storeParams); err != nil {
177
		msg := fmt.Sprintf("cannot store message event [%s] for message with ID [%s]", storeParams.EventName, storeParams.MessageID)
178
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
179
	}
180
181
	return nil
182
}
183
184
// onMessageNotificationSent handles the events.EventTypeMessageNotificationSent event
185
func (listener *MessageListener) onMessageNotificationSent(ctx context.Context, event cloudevents.Event) error {
186
	ctx, span := listener.tracer.Start(ctx)
187
	defer span.End()
188
189
	var payload events.MessageNotificationSentPayload
190
	if err := event.DataAs(&payload); err != nil {
191
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
192
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
193
	}
194
195
	checkParams := services.MessageScheduleExpirationParams{
196
		MessageID:                 payload.MessageID,
197
		UserID:                    payload.UserID,
198
		NotificationSentAt:        payload.NotificationSentAt,
199
		PhoneID:                   payload.PhoneID,
200
		Source:                    event.Source(),
201
		MessageExpirationDuration: payload.MessageExpirationDuration,
202
	}
203
	if err := listener.service.ScheduleExpirationCheck(ctx, checkParams); err != nil {
204
		msg := fmt.Sprintf("cannot exchedule expiration check for  ID [%s] and userID [%s]", checkParams.MessageID, checkParams.UserID)
205
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
206
	}
207
208
	handleParams := services.HandleMessageParams{
209
		ID:        payload.MessageID,
210
		UserID:    payload.UserID,
211
		Source:    event.Source(),
212
		Timestamp: payload.NotificationSentAt,
213
	}
214
	if err := listener.service.HandleMessageNotificationSent(ctx, handleParams); err != nil {
215
		msg := fmt.Sprintf("cannot handle event [%s] for message [%s] and userID [%s]", event.Type(), checkParams.MessageID, checkParams.UserID)
216
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
217
	}
218
219
	return nil
220
}
221
222
// onMessageSendExpiredCheck handles the events.EventTypeMessageSendExpiredCheck event
223
func (listener *MessageListener) onMessageSendExpiredCheck(ctx context.Context, event cloudevents.Event) error {
224
	ctx, span := listener.tracer.Start(ctx)
225
	defer span.End()
226
227
	var payload events.MessageSendExpiredCheckPayload
228
	if err := event.DataAs(&payload); err != nil {
229
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
230
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
231
	}
232
233
	checkParams := services.MessageCheckExpired{
234
		MessageID: payload.MessageID,
235
		UserID:    payload.UserID,
236
		Source:    event.Source(),
237
	}
238
	if err := listener.service.CheckExpired(ctx, checkParams); err != nil {
239
		msg := fmt.Sprintf("cannot check expiration for message with ID [%s] and userID [%s]", checkParams.MessageID, checkParams.UserID)
240
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
241
	}
242
243
	return nil
244
}
245
246
// onMessageSendExpired handles the events.EventTypeMessageSendExpired event
247
func (listener *MessageListener) onMessageSendExpired(ctx context.Context, event cloudevents.Event) error {
248
	ctx, span := listener.tracer.Start(ctx)
249
	defer span.End()
250
251
	var payload events.MessageSendExpiredPayload
252
	if err := event.DataAs(&payload); err != nil {
253
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
254
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
255
	}
256
257
	expiredParams := services.HandleMessageParams{
258
		ID:        payload.MessageID,
259
		UserID:    payload.UserID,
260
		Source:    event.Source(),
261
		Timestamp: payload.Timestamp,
262
	}
263
	if err := listener.service.HandleMessageExpired(ctx, expiredParams); err != nil {
264
		msg := fmt.Sprintf("cannot handle event [%s] for ID [%s] and userID [%s]", event.Type(), expiredParams.ID, expiredParams.UserID)
265
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
266
	}
267
268
	return nil
269
}
270
271
// onMessageNotificationScheduled handles the events.EventTypeMessageSendExpired event
272
func (listener *MessageListener) onMessageNotificationScheduled(ctx context.Context, event cloudevents.Event) error {
273
	ctx, span := listener.tracer.Start(ctx)
274
	defer span.End()
275
276
	var payload events.MessageNotificationScheduledPayload
277
	if err := event.DataAs(&payload); err != nil {
278
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
279
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
280
	}
281
282
	expiredParams := services.HandleMessageParams{
283
		ID:        payload.MessageID,
284
		UserID:    payload.UserID,
285
		Source:    event.Source(),
286
		Timestamp: payload.ScheduledAt,
287
	}
288
	if err := listener.service.HandleMessageNotificationScheduled(ctx, expiredParams); err != nil {
289
		msg := fmt.Sprintf("cannot handle event [%s] for ID [%s] and userID [%s]", event.Type(), expiredParams.ID, expiredParams.UserID)
290
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
291
	}
292
293
	return nil
294
}
295
296
// onMessageThreadAPIDeleted handles the events.MessageThreadAPIDeleted event
297
func (listener *MessageListener) onMessageThreadAPIDeleted(ctx context.Context, event cloudevents.Event) error {
298
	ctx, span := listener.tracer.Start(ctx)
299
	defer span.End()
300
301
	var payload events.MessageThreadAPIDeletedPayload
302
	if err := event.DataAs(&payload); err != nil {
303
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
304
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
305
	}
306
307
	if err := listener.service.DeleteByOwnerAndContact(ctx, payload.UserID, payload.Owner, payload.Contact); err != nil {
308
		msg := fmt.Sprintf("cannot handle [%s] event with ID [%s] and userID [%s]", event.Type(), event.ID(), payload.UserID)
309
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
310
	}
311
312
	return nil
313
}
314
315
// onMessageThreadAPIDeleted handles the events.MessageThreadAPIDeleted event
316
func (listener *MessageListener) onMessageCallMissed(ctx context.Context, event cloudevents.Event) error {
317
	ctx, span := listener.tracer.Start(ctx)
318
	defer span.End()
319
320
	payload := new(events.MessageCallMissedPayload)
321
	if err := event.DataAs(payload); err != nil {
322
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
323
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
324
	}
325
326
	if err := listener.service.RespondToMissedCall(ctx, event.Source(), payload); err != nil {
327
		msg := fmt.Sprintf("cannot handle [%s] event with ID [%s] and userID [%s]", event.Type(), event.ID(), payload.UserID)
328
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
329
	}
330
331
	return nil
332
}
333
334
func (listener *MessageListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
335
	ctx, span := listener.tracer.Start(ctx)
336
	defer span.End()
337
338
	var payload events.UserAccountDeletedPayload
339
	if err := event.DataAs(&payload); err != nil {
340
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
341
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
342
	}
343
344
	if err := listener.service.DeleteAllForUser(ctx, payload.UserID); err != nil {
345
		msg := fmt.Sprintf("cannot delete [entities.Message] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
346
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
347
	}
348
349
	return nil
350
}
351