Passed
Pull Request — main (#9)
by Acho
01:36
created

ageSendFailedEvent   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"time"
7
8
	"github.com/davecgh/go-spew/spew"
9
10
	"github.com/nyaruka/phonenumbers"
11
12
	"github.com/NdoleStudio/httpsms/pkg/events"
13
	"github.com/NdoleStudio/httpsms/pkg/repositories"
14
	cloudevents "github.com/cloudevents/sdk-go/v2"
15
	"github.com/google/uuid"
16
	"github.com/palantir/stacktrace"
17
18
	"github.com/NdoleStudio/httpsms/pkg/entities"
19
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
20
)
21
22
// MessageService is handles message requests
23
type MessageService struct {
24
	logger          telemetry.Logger
25
	tracer          telemetry.Tracer
26
	eventDispatcher *EventDispatcher
27
	repository      repositories.MessageRepository
28
}
29
30
// NewMessageService creates a new MessageService
31
func NewMessageService(
32
	logger telemetry.Logger,
33
	tracer telemetry.Tracer,
34
	repository repositories.MessageRepository,
35
	eventDispatcher *EventDispatcher,
36
) (s *MessageService) {
37
	return &MessageService{
38
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
39
		tracer:          tracer,
40
		repository:      repository,
41
		eventDispatcher: eventDispatcher,
42
	}
43
}
44
45
// MessageGetOutstandingParams parameters for sending a new message
46
type MessageGetOutstandingParams struct {
47
	Source    string
48
	UserID    entities.UserID
49
	Timestamp time.Time
50
	MessageID uuid.UUID
51
}
52
53
// GetOutstanding fetches messages that still to be sent to the phone
54
func (service *MessageService) GetOutstanding(ctx context.Context, params MessageGetOutstandingParams) (*entities.Message, error) {
55
	ctx, span := service.tracer.Start(ctx)
56
	defer span.End()
57
58
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
59
60
	message, err := service.repository.GetOutstanding(ctx, params.UserID, params.MessageID)
61
	if err != nil {
62
		msg := fmt.Sprintf("could not fetch outstanding messages with params [%s]", spew.Sdump(params))
63
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
64
	}
65
66
	event, err := service.createMessagePhoneSendingEvent(params.Source, events.MessagePhoneSendingPayload{
67
		ID:        message.ID,
68
		Owner:     message.Owner,
69
		Contact:   message.Contact,
70
		Timestamp: params.Timestamp,
71
		UserID:    message.UserID,
72
		Content:   message.Content,
73
	})
74
	if err != nil {
75
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
76
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
77
	}
78
79
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
80
81
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
82
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
83
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
84
	}
85
86
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
87
	return message, nil
88
}
89
90
// MessageGetParams parameters for sending a new message
91
type MessageGetParams struct {
92
	repositories.IndexParams
93
	UserID  entities.UserID
94
	Owner   string
95
	Contact string
96
}
97
98
// GetMessages fetches sent between 2 phone numbers
99
func (service *MessageService) GetMessages(ctx context.Context, params MessageGetParams) (*[]entities.Message, error) {
100
	ctx, span := service.tracer.Start(ctx)
101
	defer span.End()
102
103
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
104
105
	messages, err := service.repository.Index(ctx, params.UserID, params.Owner, params.Contact, params.IndexParams)
106
	if err != nil {
107
		msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
108
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
109
	}
110
111
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*messages), params))
112
	return messages, nil
113
}
114
115
// GetMessage fetches a message by the MessageID
116
func (service *MessageService) GetMessage(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
117
	ctx, span := service.tracer.Start(ctx)
118
	defer span.End()
119
120
	message, err := service.repository.Load(ctx, userID, messageID)
121
	if err != nil {
122
		msg := fmt.Sprintf("could not fetch messages with MessageID [%s]", messageID)
123
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
124
	}
125
126
	return message, nil
127
}
128
129
// MessageStoreEventParams parameters registering a message event
130
type MessageStoreEventParams struct {
131
	MessageID    uuid.UUID
132
	EventName    entities.MessageEventName
133
	Timestamp    time.Time
134
	ErrorMessage *string
135
	Source       string
136
}
137
138
// StoreEvent handles event generated by a mobile phone
139
func (service *MessageService) StoreEvent(ctx context.Context, message *entities.Message, params MessageStoreEventParams) (*entities.Message, error) {
140
	ctx, span := service.tracer.Start(ctx)
141
	defer span.End()
142
143
	var err error
144
145
	switch params.EventName {
146
	case entities.MessageEventNameSent:
147
		err = service.handleMessageSentEvent(ctx, params, message)
148
	case entities.MessageEventNameDelivered:
149
		err = service.handleMessageDeliveredEvent(ctx, params, message)
150
	case entities.MessageEventNameFailed:
151
		err = service.handleMessageFailedEvent(ctx, params, message)
152
	default:
153
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
154
	}
155
156
	if err != nil {
157
		msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
158
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
159
	}
160
161
	return service.repository.Load(ctx, message.UserID, params.MessageID)
162
}
163
164
// MessageReceiveParams parameters registering a message event
165
type MessageReceiveParams struct {
166
	Contact   string
167
	UserID    entities.UserID
168
	Owner     phonenumbers.PhoneNumber
169
	Content   string
170
	Timestamp time.Time
171
	Source    string
172
}
173
174
// ReceiveMessage handles message received by a mobile phone
175
func (service *MessageService) ReceiveMessage(ctx context.Context, params MessageReceiveParams) (*entities.Message, error) {
176
	ctx, span := service.tracer.Start(ctx)
177
	defer span.End()
178
179
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
180
181
	eventPayload := events.MessagePhoneReceivedPayload{
182
		ID:        uuid.New(),
183
		UserID:    params.UserID,
184
		Owner:     phonenumbers.Format(&params.Owner, phonenumbers.E164),
185
		Contact:   params.Contact,
186
		Timestamp: params.Timestamp,
187
		Content:   params.Content,
188
	}
189
190
	ctxLogger.Info(fmt.Sprintf("creating cloud event for received with MessageID [%s]", eventPayload.ID))
191
192
	event, err := service.createMessagePhoneReceivedEvent(params.Source, eventPayload)
193
	if err != nil {
194
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.ID)
195
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
196
	}
197
198
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.ID))
199
200
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
201
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
202
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
203
	}
204
205
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
206
207
	message, err := service.repository.Load(ctx, params.UserID, eventPayload.ID)
208
	if err != nil {
209
		msg := fmt.Sprintf("cannot load message with ID [%s]", eventPayload.ID)
210
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
211
	}
212
213
	ctxLogger.Info(fmt.Sprintf("fetched message with id [%s]", message.ID))
214
215
	return message, nil
216
}
217
218
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
219
	ctx, span := service.tracer.Start(ctx)
220
	defer span.End()
221
222
	event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
223
		ID:        message.ID,
224
		Owner:     message.Owner,
225
		UserID:    message.UserID,
226
		Timestamp: params.Timestamp,
227
		Contact:   message.Contact,
228
		Content:   message.Content,
229
	})
230
	if err != nil {
231
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
232
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
233
	}
234
235
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
236
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
237
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
238
	}
239
	return nil
240
}
241
242
func (service *MessageService) handleMessageDeliveredEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
243
	ctx, span := service.tracer.Start(ctx)
244
	defer span.End()
245
246
	event, err := service.createMessagePhoneDeliveredEvent(params.Source, events.MessagePhoneDeliveredPayload{
247
		ID:        message.ID,
248
		Owner:     message.Owner,
249
		UserID:    message.UserID,
250
		Timestamp: params.Timestamp,
251
		Contact:   message.Contact,
252
		Content:   message.Content,
253
	})
254
	if err != nil {
255
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
256
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
257
	}
258
259
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
260
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
261
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
262
	}
263
	return nil
264
}
265
266
func (service *MessageService) handleMessageFailedEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
267
	ctx, span := service.tracer.Start(ctx)
268
	defer span.End()
269
270
	errorMessage := "UNKNOWN ERROR"
271
	if params.ErrorMessage != nil {
272
		errorMessage = *params.ErrorMessage
273
	}
274
275
	event, err := service.createMessageSendFailedEvent(params.Source, events.MessageSendFailedPayload{
276
		ID:           message.ID,
277
		Owner:        message.Owner,
278
		ErrorMessage: errorMessage,
279
		Timestamp:    params.Timestamp,
280
		Contact:      message.Contact,
281
		UserID:       message.UserID,
282
		Content:      message.Content,
283
	})
284
	if err != nil {
285
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessageSendFailed, message.ID)
286
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
287
	}
288
289
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
290
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
291
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
292
	}
293
	return nil
294
}
295
296
// MessageSendParams parameters for sending a new message
297
type MessageSendParams struct {
298
	Owner             phonenumbers.PhoneNumber
299
	Contact           phonenumbers.PhoneNumber
300
	Content           string
301
	Source            string
302
	UserID            entities.UserID
303
	RequestReceivedAt time.Time
304
}
305
306
// SendMessage a new message
307
func (service *MessageService) SendMessage(ctx context.Context, params MessageSendParams) (*entities.Message, error) {
308
	ctx, span := service.tracer.Start(ctx)
309
	defer span.End()
310
311
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
312
313
	eventPayload := events.MessageAPISentPayload{
314
		ID:                uuid.New(),
315
		UserID:            params.UserID,
316
		Owner:             phonenumbers.Format(&params.Owner, phonenumbers.E164),
317
		Contact:           phonenumbers.Format(&params.Contact, phonenumbers.E164),
318
		RequestReceivedAt: params.RequestReceivedAt,
319
		Content:           params.Content,
320
	}
321
322
	ctxLogger.Info(fmt.Sprintf("creating cloud event for message with ID [%s]", eventPayload.ID))
323
324
	event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
325
	if err != nil {
326
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.ID)
327
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
328
	}
329
330
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.ID))
331
332
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
333
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
334
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
335
	}
336
337
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
338
339
	message, err := service.repository.Load(ctx, eventPayload.UserID, eventPayload.ID)
340
	if err != nil {
341
		msg := fmt.Sprintf("cannot load message with ID [%s] in the userRepository", eventPayload.ID)
342
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
343
	}
344
345
	ctxLogger.Info(fmt.Sprintf("fetched message with id [%s] from the userRepository", message.ID))
346
347
	return message, nil
348
}
349
350
// MessageStoreParams are parameters for creating a new message
351
type MessageStoreParams struct {
352
	Owner     string
353
	Contact   string
354
	Content   string
355
	UserID    entities.UserID
356
	ID        uuid.UUID
357
	Timestamp time.Time
358
}
359
360
// StoreSentMessage a new message
361
func (service *MessageService) StoreSentMessage(ctx context.Context, params MessageStoreParams) (*entities.Message, error) {
362
	ctx, span := service.tracer.Start(ctx)
363
	defer span.End()
364
365
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
366
367
	message := &entities.Message{
368
		ID:                params.ID,
369
		Owner:             params.Owner,
370
		Contact:           params.Contact,
371
		UserID:            params.UserID,
372
		Content:           params.Content,
373
		Type:              entities.MessageTypeMobileTerminated,
374
		Status:            entities.MessageStatusPending,
375
		RequestReceivedAt: params.Timestamp,
376
		CreatedAt:         time.Now().UTC(),
377
		UpdatedAt:         time.Now().UTC(),
378
		OrderTimestamp:    params.Timestamp,
379
		SendDuration:      nil,
380
		LastAttemptedAt:   nil,
381
		SentAt:            nil,
382
		ReceivedAt:        nil,
383
	}
384
385
	if err := service.repository.Store(ctx, message); err != nil {
386
		msg := fmt.Sprintf("cannot save message with id [%s]", params.ID)
387
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
388
	}
389
390
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s] in the userRepository", message.ID))
391
	return message, nil
392
}
393
394
// StoreReceivedMessage a new message
395
func (service *MessageService) StoreReceivedMessage(ctx context.Context, params MessageStoreParams) (*entities.Message, error) {
396
	ctx, span := service.tracer.Start(ctx)
397
	defer span.End()
398
399
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
400
401
	message := &entities.Message{
402
		ID:                params.ID,
403
		Owner:             params.Owner,
404
		UserID:            params.UserID,
405
		Contact:           params.Contact,
406
		Content:           params.Content,
407
		Type:              entities.MessageTypeMobileOriginated,
408
		Status:            entities.MessageStatusReceived,
409
		RequestReceivedAt: params.Timestamp,
410
		CreatedAt:         time.Now().UTC(),
411
		UpdatedAt:         time.Now().UTC(),
412
		OrderTimestamp:    params.Timestamp,
413
		ReceivedAt:        &params.Timestamp,
414
	}
415
416
	if err := service.repository.Store(ctx, message); err != nil {
417
		msg := fmt.Sprintf("cannot save message with id [%s]", params.ID)
418
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
419
	}
420
421
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s] in the userRepository", message.ID))
422
	return message, nil
423
}
424
425
// HandleMessageParams are parameters for handling a message event
426
type HandleMessageParams struct {
427
	ID        uuid.UUID
428
	UserID    entities.UserID
429
	Timestamp time.Time
430
}
431
432
// HandleMessageSending handles when a message is being sent
433
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
434
	ctx, span := service.tracer.Start(ctx)
435
	defer span.End()
436
437
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
438
439
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
440
	if err != nil {
441
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
442
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
443
	}
444
445
	if !message.IsSending() {
446
		msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
447
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
448
	}
449
450
	if err = service.repository.Update(ctx, message.AddSendAttempt(params.Timestamp)); err != nil {
451
		msg := fmt.Sprintf("cannot update message with id [%s] after sending", message.ID)
452
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
453
	}
454
455
	ctxLogger.Info(fmt.Sprintf("message with id [%s] in the userRepository after adding send attempt", message.ID))
456
	return nil
457
}
458
459
// HandleMessageSent handles when a message has been sent by a mobile phone
460
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
461
	ctx, span := service.tracer.Start(ctx)
462
	defer span.End()
463
464
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
465
466
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
467
	if err != nil {
468
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
469
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
470
	}
471
472
	if !message.IsSending() && !message.IsExpired() {
473
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusExpired)
474
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
475
	}
476
477
	if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
478
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
479
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
480
	}
481
482
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
483
	return nil
484
}
485
486
// HandleMessageFailedParams are parameters for handling a failed message event
487
type HandleMessageFailedParams struct {
488
	ID           uuid.UUID
489
	UserID       entities.UserID
490
	ErrorMessage string
491
	Timestamp    time.Time
492
}
493
494
// HandleMessageFailed handles when a message could not be sent by a mobile phone
495
func (service *MessageService) HandleMessageFailed(ctx context.Context, params HandleMessageFailedParams) error {
496
	ctx, span := service.tracer.Start(ctx)
497
	defer span.End()
498
499
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
500
501
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
502
	if err != nil {
503
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
504
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
505
	}
506
507
	if message.IsDelivered() {
508
		msg := fmt.Sprintf("message has already been delivered with status [%s]", message.Status)
509
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
510
	}
511
512
	if err = service.repository.Update(ctx, message.Failed(params.Timestamp, params.ErrorMessage)); err != nil {
513
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
514
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
515
	}
516
517
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
518
	return nil
519
}
520
521
// HandleMessageDelivered handles when a message is has been delivered by a mobile phone
522
func (service *MessageService) HandleMessageDelivered(ctx context.Context, params HandleMessageParams) error {
523
	ctx, span := service.tracer.Start(ctx)
524
	defer span.End()
525
526
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
527
528
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
529
	if err != nil {
530
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
531
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
532
	}
533
534
	if !message.IsSent() && !message.IsSending() && !message.IsExpired() {
535
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSent, entities.MessageStatusSending, entities.MessageStatusExpired)
536
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
537
	}
538
539
	if err = service.repository.Update(ctx, message.Delivered(params.Timestamp)); err != nil {
540
		msg := fmt.Sprintf("cannot update message with id [%s] as delivered", message.ID)
541
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
542
	}
543
544
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
545
	return nil
546
}
547
548
// HandleMessageExpired handles when a message is has been expired
549
func (service *MessageService) HandleMessageExpired(ctx context.Context, params HandleMessageParams) error {
550
	ctx, span := service.tracer.Start(ctx)
551
	defer span.End()
552
553
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
554
555
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
556
	if err != nil {
557
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
558
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
559
	}
560
561
	if !message.IsSending() && !message.IsPending() {
562
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusSending)
563
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
564
	}
565
566
	if err = service.repository.Update(ctx, message.Expired(params.Timestamp)); err != nil {
567
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
568
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
569
	}
570
571
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
572
	return nil
573
}
574
575
// MessageScheduleExpirationParams are parameters for scheduling the expiration of a message event
576
type MessageScheduleExpirationParams struct {
577
	MessageID                uuid.UUID
578
	UserID                   entities.UserID
579
	NotificationSentAt       time.Time
580
	PhoneID                  uuid.UUID
581
	MessageExpirationTimeout time.Duration
582
	source                   string
583
}
584
585
// ScheduleExpirationCheck schedules an event to check if a message is expired
586
func (service *MessageService) ScheduleExpirationCheck(ctx context.Context, params MessageScheduleExpirationParams) error {
587
	ctx, span := service.tracer.Start(ctx)
588
	defer span.End()
589
590
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
591
592
	if params.MessageExpirationTimeout == 0 {
593
		ctxLogger.Info(fmt.Sprintf("message expiration duration not set for message [%s] using phone [%s]", params.MessageID, params.PhoneID))
594
		return nil
595
	}
596
597
	event, err := service.createMessageSendExpiredCheckEvent(params.source, events.MessageSendExpiredCheckPayload{
598
		MessageID:   params.MessageID,
599
		ScheduledAt: params.NotificationSentAt.Add(params.MessageExpirationTimeout),
600
		UserID:      params.UserID,
601
	})
602
	if err != nil {
603
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpiredCheck, params.MessageID)
604
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
605
	}
606
607
	if err = service.eventDispatcher.DispatchWithTimeout(ctx, event, params.MessageExpirationTimeout); err != nil {
608
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
609
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
610
	}
611
612
	ctxLogger.Info(fmt.Sprintf("scheduled message id [%s] to expire at [%s]", params.MessageID, params.NotificationSentAt.Add(params.MessageExpirationTimeout)))
613
	return nil
614
}
615
616
// MessageCheckExpired are parameters for checking if a message is expired
617
type MessageCheckExpired struct {
618
	MessageID uuid.UUID
619
	UserID    entities.UserID
620
	source    string
621
}
622
623
// CheckExpired checks if a message has expired
624
func (service *MessageService) CheckExpired(ctx context.Context, params MessageCheckExpired) error {
625
	ctx, span := service.tracer.Start(ctx)
626
	defer span.End()
627
628
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
629
630
	message, err := service.repository.Load(ctx, params.UserID, params.MessageID)
631
	if err != nil {
632
		msg := fmt.Sprintf("cannot load message with userID [%s] and messageID [%s]", params.UserID, params.MessageID)
633
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
634
	}
635
636
	if !message.IsPending() && !message.IsSending() {
637
		ctxLogger.Info(fmt.Sprintf("message with ID [%s] has status [%s] and is not expired", message.ID, message.Status))
638
		return nil
639
	}
640
641
	event, err := service.createMessageSendExpiredEvent(params.source, events.MessageSendExpiredPayload{
642
		MessageID: message.ID,
643
		Owner:     message.Owner,
644
		Contact:   message.Contact,
645
		UserID:    message.UserID,
646
		Timestamp: time.Now().UTC(),
647
		Content:   message.Content,
648
	})
649
	if err != nil {
650
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpired, params.MessageID)
651
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
652
	}
653
654
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
655
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
656
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
657
	}
658
659
	ctxLogger.Info(fmt.Sprintf("message [%s] has expired with status [%s]", params.MessageID, message.Status))
660
	return nil
661
}
662
663
func (service *MessageService) createMessageSendExpiredEvent(source string, payload events.MessageSendExpiredPayload) (cloudevents.Event, error) {
664
	return service.createEvent(events.EventTypeMessageSendExpired, source, payload)
665
}
666
667
func (service *MessageService) createMessageSendExpiredCheckEvent(source string, payload events.MessageSendExpiredCheckPayload) (cloudevents.Event, error) {
668
	return service.createEvent(events.EventTypeMessageSendExpiredCheck, source, payload)
669
}
670
671
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
672
	return service.createEvent(events.EventTypeMessageAPISent, source, payload)
673
}
674
675
func (service *MessageService) createMessagePhoneReceivedEvent(source string, payload events.MessagePhoneReceivedPayload) (cloudevents.Event, error) {
676
	return service.createEvent(events.EventTypeMessagePhoneReceived, source, payload)
677
}
678
679
func (service *MessageService) createMessagePhoneSendingEvent(source string, payload events.MessagePhoneSendingPayload) (cloudevents.Event, error) {
680
	return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
681
}
682
683
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
684
	return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
685
}
686
687
func (service *MessageService) createMessageSendFailedEvent(source string, payload events.MessageSendFailedPayload) (cloudevents.Event, error) {
688
	return service.createEvent(events.EventTypeMessageSendFailed, source, payload)
689
}
690
691
func (service *MessageService) createHeartbeatPhoneOutstandingEvent(source string, payload events.HeartbeatPhoneOutstandingPayload) (cloudevents.Event, error) {
692
	return service.createEvent(events.EventTypeHeartbeatPhoneOutstanding, source, payload)
693
}
694
695
func (service *MessageService) createMessagePhoneDeliveredEvent(source string, payload events.MessagePhoneDeliveredPayload) (cloudevents.Event, error) {
696
	return service.createEvent(events.EventTypeMessagePhoneDelivered, source, payload)
697
}
698
699
func (service *MessageService) createEvent(eventType string, source string, payload any) (cloudevents.Event, error) {
700
	event := cloudevents.NewEvent()
701
702
	event.SetSource(source)
703
	event.SetType(eventType)
704
	event.SetTime(time.Now().UTC())
705
	event.SetID(uuid.New().String())
706
707
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
708
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
709
		return event, stacktrace.Propagate(err, msg)
710
	}
711
712
	return event, nil
713
}
714