Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

ntDeleted   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
dl 0
loc 16
rs 9.85
c 0
b 0
f 0
nop 2
1
package listeners
2
3
import (
4
	"context"
5
	"fmt"
6
7
	"github.com/NdoleStudio/httpsms/pkg/entities"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
	"github.com/NdoleStudio/httpsms/pkg/services"
11
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
12
	cloudevents "github.com/cloudevents/sdk-go/v2"
13
	"github.com/palantir/stacktrace"
14
)
15
16
// MessageThreadListener handles cloud events which need to update entities.MessageThread
17
type MessageThreadListener struct {
18
	logger  telemetry.Logger
19
	tracer  telemetry.Tracer
20
	service *services.MessageThreadService
21
}
22
23
// NewMessageThreadListener creates a new instance of MessageThreadListener
24
func NewMessageThreadListener(
25
	logger telemetry.Logger,
26
	tracer telemetry.Tracer,
27
	service *services.MessageThreadService,
28
) (l *MessageThreadListener, routes map[string]events.EventListener) {
29
	l = &MessageThreadListener{
30
		logger:  logger.WithService(fmt.Sprintf("%T", l)),
31
		tracer:  tracer,
32
		service: service,
33
	}
34
35
	return l, map[string]events.EventListener{
36
		events.EventTypeMessageAPISent:               l.OnMessageAPISent,
37
		events.MessageAPIDeleted:                     l.onMessageDeleted,
38
		events.EventTypeMessagePhoneSending:          l.OnMessagePhoneSending,
39
		events.EventTypeMessagePhoneSent:             l.OnMessagePhoneSent,
40
		events.EventTypeMessagePhoneDelivered:        l.OnMessagePhoneDelivered,
41
		events.EventTypeMessageSendFailed:            l.OnMessagePhoneFailed,
42
		events.EventTypeMessagePhoneReceived:         l.OnMessagePhoneReceived,
43
		events.EventTypeMessageNotificationScheduled: l.onMessageNotificationScheduled,
44
		events.EventTypeMessageSendExpired:           l.onMessageExpired,
45
		events.UserAccountDeleted:                    l.onUserAccountDeleted,
46
	}
47
}
48
49
// OnMessageAPISent handles the events.EventTypeMessageAPISent event
50
func (listener *MessageThreadListener) OnMessageAPISent(ctx context.Context, event cloudevents.Event) error {
51
	ctx, span := listener.tracer.Start(ctx)
52
	defer span.End()
53
54
	var payload events.MessageAPISentPayload
55
	if err := event.DataAs(&payload); err != nil {
56
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
57
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
58
	}
59
60
	updateParams := services.MessageThreadUpdateParams{
61
		Owner:     payload.Owner,
62
		Contact:   payload.Contact,
63
		UserID:    payload.UserID,
64
		Status:    entities.MessageStatusPending,
65
		Timestamp: payload.RequestReceivedAt,
66
		Content:   payload.Content,
67
		MessageID: payload.MessageID,
68
	}
69
70
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
71
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
72
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
73
	}
74
75
	return nil
76
}
77
78
// onMessageDeleted handles the events.MessageAPIDeleted event
79
func (listener *MessageThreadListener) onMessageDeleted(ctx context.Context, event cloudevents.Event) error {
80
	ctx, span := listener.tracer.Start(ctx)
81
	defer span.End()
82
83
	payload := new(events.MessageAPIDeletedPayload)
84
	if err := event.DataAs(payload); err != nil {
85
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
86
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
87
	}
88
89
	if err := listener.service.UpdateAfterDeletedMessage(ctx, payload); err != nil {
90
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", payload.MessageID, event.ID())
91
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
92
	}
93
94
	return nil
95
}
96
97
// OnMessagePhoneSending handles the events.EventTypeMessagePhoneSending event
98
func (listener *MessageThreadListener) OnMessagePhoneSending(ctx context.Context, event cloudevents.Event) error {
99
	ctx, span := listener.tracer.Start(ctx)
100
	defer span.End()
101
102
	var payload events.MessagePhoneSendingPayload
103
	if err := event.DataAs(&payload); err != nil {
104
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
105
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
106
	}
107
108
	updateParams := services.MessageThreadUpdateParams{
109
		Owner:     payload.Owner,
110
		UserID:    payload.UserID,
111
		Contact:   payload.Contact,
112
		Status:    entities.MessageStatusSending,
113
		Timestamp: payload.Timestamp,
114
		Content:   payload.Content,
115
		MessageID: payload.ID,
116
	}
117
118
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
119
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
120
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
121
	}
122
123
	return nil
124
}
125
126
// OnMessagePhoneSent handles the events.EventTypeMessagePhoneSent event
127
func (listener *MessageThreadListener) OnMessagePhoneSent(ctx context.Context, event cloudevents.Event) error {
128
	ctx, span := listener.tracer.Start(ctx)
129
	defer span.End()
130
131
	var payload events.MessagePhoneSentPayload
132
	if err := event.DataAs(&payload); err != nil {
133
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
134
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
135
	}
136
137
	updateParams := services.MessageThreadUpdateParams{
138
		Owner:     payload.Owner,
139
		Contact:   payload.Contact,
140
		UserID:    payload.UserID,
141
		Status:    entities.MessageStatusSent,
142
		Timestamp: payload.Timestamp,
143
		Content:   payload.Content,
144
		MessageID: payload.ID,
145
	}
146
147
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
148
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
149
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
150
	}
151
152
	return nil
153
}
154
155
// OnMessagePhoneDelivered handles the events.EventTypeMessagePhoneDelivered event
156
func (listener *MessageThreadListener) OnMessagePhoneDelivered(ctx context.Context, event cloudevents.Event) error {
157
	ctx, span := listener.tracer.Start(ctx)
158
	defer span.End()
159
160
	var payload events.MessagePhoneDeliveredPayload
161
	if err := event.DataAs(&payload); err != nil {
162
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
163
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
164
	}
165
166
	updateParams := services.MessageThreadUpdateParams{
167
		Owner:     payload.Owner,
168
		UserID:    payload.UserID,
169
		Contact:   payload.Contact,
170
		Status:    entities.MessageStatusDelivered,
171
		Timestamp: payload.Timestamp,
172
		Content:   payload.Content,
173
		MessageID: payload.ID,
174
	}
175
176
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
177
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
178
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
179
	}
180
181
	return nil
182
}
183
184
// OnMessagePhoneFailed handles the events.EventTypeMessageSendFailed event
185
func (listener *MessageThreadListener) OnMessagePhoneFailed(ctx context.Context, event cloudevents.Event) error {
186
	ctx, span := listener.tracer.Start(ctx)
187
	defer span.End()
188
189
	var payload events.MessageSendFailedPayload
190
	if err := event.DataAs(&payload); err != nil {
191
		msg := fmt.Sprintf("cannot decode [%s] into [%T] for event [%s]", event.Data(), payload, event.ID())
192
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
193
	}
194
195
	updateParams := services.MessageThreadUpdateParams{
196
		Owner:     payload.Owner,
197
		Contact:   payload.Contact,
198
		UserID:    payload.UserID,
199
		Status:    entities.MessageStatusFailed,
200
		Timestamp: payload.Timestamp,
201
		Content:   payload.Content,
202
		MessageID: payload.ID,
203
	}
204
205
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
206
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
207
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
208
	}
209
210
	return nil
211
}
212
213
// OnMessagePhoneReceived handles the events.EventTypeMessagePhoneReceived event
214
func (listener *MessageThreadListener) OnMessagePhoneReceived(ctx context.Context, event cloudevents.Event) error {
215
	ctx, span := listener.tracer.Start(ctx)
216
	defer span.End()
217
218
	var payload events.MessagePhoneReceivedPayload
219
	if err := event.DataAs(&payload); err != nil {
220
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
221
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
222
	}
223
224
	updateParams := services.MessageThreadUpdateParams{
225
		Owner:     payload.Owner,
226
		Contact:   payload.Contact,
227
		Timestamp: payload.Timestamp,
228
		UserID:    payload.UserID,
229
		Status:    entities.MessageStatusReceived,
230
		Content:   payload.Content,
231
		MessageID: payload.MessageID,
232
	}
233
234
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
235
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
236
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
237
	}
238
239
	return nil
240
}
241
242
// onMessageNotificationScheduled handles the events.EventTypeMessageNotificationScheduled event
243
func (listener *MessageThreadListener) onMessageNotificationScheduled(ctx context.Context, event cloudevents.Event) error {
244
	ctx, span := listener.tracer.Start(ctx)
245
	defer span.End()
246
247
	var payload events.MessageNotificationScheduledPayload
248
	if err := event.DataAs(&payload); err != nil {
249
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
250
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
251
	}
252
253
	updateParams := services.MessageThreadUpdateParams{
254
		Owner:     payload.Owner,
255
		Contact:   payload.Contact,
256
		Timestamp: payload.ScheduledAt,
257
		UserID:    payload.UserID,
258
		Content:   payload.Content,
259
		Status:    entities.MessageStatusScheduled,
260
		MessageID: payload.MessageID,
261
	}
262
263
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
264
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
265
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
266
	}
267
268
	return nil
269
}
270
271
// onMessageNotificationScheduled handles the events.EventTypeMessageNotificationScheduled event
272
func (listener *MessageThreadListener) onMessageExpired(ctx context.Context, event cloudevents.Event) error {
273
	ctx, span := listener.tracer.Start(ctx)
274
	defer span.End()
275
276
	var payload events.MessageSendExpiredPayload
277
	if err := event.DataAs(&payload); err != nil {
278
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
279
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
280
	}
281
282
	updateParams := services.MessageThreadUpdateParams{
283
		Owner:     payload.Owner,
284
		Contact:   payload.Contact,
285
		Timestamp: payload.Timestamp,
286
		UserID:    payload.UserID,
287
		Content:   payload.Content,
288
		Status:    entities.MessageStatusExpired,
289
		MessageID: payload.MessageID,
290
	}
291
292
	if err := listener.service.UpdateThread(ctx, updateParams); err != nil {
293
		msg := fmt.Sprintf("cannot update thread for message with ID [%s] for event with ID [%s]", updateParams.MessageID, event.ID())
294
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
295
	}
296
297
	return nil
298
}
299
300
func (listener *MessageThreadListener) onUserAccountDeleted(ctx context.Context, event cloudevents.Event) error {
301
	ctx, span := listener.tracer.Start(ctx)
302
	defer span.End()
303
304
	var payload events.UserAccountDeletedPayload
305
	if err := event.DataAs(&payload); err != nil {
306
		msg := fmt.Sprintf("cannot decode [%s] into [%T]", event.Data(), payload)
307
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
308
	}
309
310
	if err := listener.service.DeleteAllForUser(ctx, payload.UserID); err != nil {
311
		msg := fmt.Sprintf("cannot delete [entities.MessageThread] for user [%s] on [%s] event with ID [%s]", payload.UserID, event.Type(), event.ID())
312
		return listener.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
313
	}
314
315
	return nil
316
}
317
318
func (listener *MessageThreadListener) updateThread(ctx context.Context, params services.MessageThreadUpdateParams) error {
319
	return listener.service.UpdateThread(ctx, params)
320
}
321