Passed
Push — main ( 863ef7...45bc91 )
by Acho
01:14
created

services.*MessageService.HandleMessageSent   B

Complexity

Conditions 7

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 19
dl 0
loc 29
rs 8
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"time"
7
8
	"github.com/davecgh/go-spew/spew"
9
10
	"github.com/nyaruka/phonenumbers"
11
12
	"github.com/NdoleStudio/httpsms/pkg/events"
13
	"github.com/NdoleStudio/httpsms/pkg/repositories"
14
	cloudevents "github.com/cloudevents/sdk-go/v2"
15
	"github.com/google/uuid"
16
	"github.com/palantir/stacktrace"
17
18
	"github.com/NdoleStudio/httpsms/pkg/entities"
19
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
20
)
21
22
// MessageService is handles message requests
23
type MessageService struct {
24
	service
25
	logger          telemetry.Logger
26
	tracer          telemetry.Tracer
27
	eventDispatcher *EventDispatcher
28
	phoneService    *PhoneService
29
	repository      repositories.MessageRepository
30
}
31
32
// NewMessageService creates a new MessageService
33
func NewMessageService(
34
	logger telemetry.Logger,
35
	tracer telemetry.Tracer,
36
	repository repositories.MessageRepository,
37
	eventDispatcher *EventDispatcher,
38
	phoneService *PhoneService,
39
) (s *MessageService) {
40
	return &MessageService{
41
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
42
		tracer:          tracer,
43
		repository:      repository,
44
		phoneService:    phoneService,
45
		eventDispatcher: eventDispatcher,
46
	}
47
}
48
49
// MessageGetOutstandingParams parameters for sending a new message
50
type MessageGetOutstandingParams struct {
51
	Source    string
52
	UserID    entities.UserID
53
	Timestamp time.Time
54
	MessageID uuid.UUID
55
}
56
57
// GetOutstanding fetches messages that still to be sent to the phone
58
func (service *MessageService) GetOutstanding(ctx context.Context, params MessageGetOutstandingParams) (*entities.Message, error) {
59
	ctx, span := service.tracer.Start(ctx)
60
	defer span.End()
61
62
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
63
64
	message, err := service.repository.GetOutstanding(ctx, params.UserID, params.MessageID)
65
	if err != nil {
66
		msg := fmt.Sprintf("could not fetch outstanding messages with params [%s]", spew.Sdump(params))
67
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
68
	}
69
70
	event, err := service.createMessagePhoneSendingEvent(params.Source, events.MessagePhoneSendingPayload{
71
		ID:        message.ID,
72
		Owner:     message.Owner,
73
		Contact:   message.Contact,
74
		Timestamp: params.Timestamp,
75
		Encrypted: message.Encrypted,
76
		UserID:    message.UserID,
77
		Content:   message.Content,
78
		SIM:       message.SIM,
79
	})
80
	if err != nil {
81
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
82
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
83
	}
84
85
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
86
87
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
88
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
89
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
90
	}
91
92
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
93
	return message, nil
94
}
95
96
// DeleteMessage deletes a message from the database
97
func (service *MessageService) DeleteMessage(ctx context.Context, source string, message *entities.Message) error {
98
	ctx, span := service.tracer.Start(ctx)
99
	defer span.End()
100
101
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
102
103
	if err := service.repository.Delete(ctx, message.UserID, message.ID); err != nil {
104
		msg := fmt.Sprintf("could not delete message with ID [%s] for user wit ID [%s]", message.ID, message.UserID)
105
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
106
	}
107
108
	event, err := service.createEvent(events.MessageAPIDeleted, source, &events.MessageAPIDeletedPayload{
109
		MessageID: message.ID,
110
		UserID:    message.UserID,
111
		Owner:     message.Owner,
112
		Encrypted: message.Encrypted,
113
		RequestID: message.RequestID,
114
		Contact:   message.Contact,
115
		Timestamp: time.Now().UTC(),
116
		Content:   message.Content,
117
		SIM:       message.SIM,
118
	})
119
	if err != nil {
120
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
121
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
122
	}
123
124
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
125
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
126
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
127
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
128
	}
129
130
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
131
	return nil
132
}
133
134
// DeleteByOwnerAndContact deletes all the messages between an owner and a contact
135
func (service *MessageService) DeleteByOwnerAndContact(ctx context.Context, userID entities.UserID, owner, contact string) error {
136
	ctx, span := service.tracer.Start(ctx)
137
	defer span.End()
138
139
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
140
141
	if err := service.repository.DeleteByOwnerAndContact(ctx, userID, owner, contact); err != nil {
142
		msg := fmt.Sprintf("could not all delete messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact)
143
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
144
	}
145
146
	ctxLogger.Info(fmt.Sprintf("deleted all messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact))
147
	return nil
148
}
149
150
// MessageGetParams parameters for sending a new message
151
type MessageGetParams struct {
152
	repositories.IndexParams
153
	UserID  entities.UserID
154
	Owner   string
155
	Contact string
156
}
157
158
// GetMessages fetches sent between 2 phone numbers
159
func (service *MessageService) GetMessages(ctx context.Context, params MessageGetParams) (*[]entities.Message, error) {
160
	ctx, span := service.tracer.Start(ctx)
161
	defer span.End()
162
163
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
164
165
	messages, err := service.repository.Index(ctx, params.UserID, params.Owner, params.Contact, params.IndexParams)
166
	if err != nil {
167
		msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
168
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
169
	}
170
171
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*messages), params))
172
	return messages, nil
173
}
174
175
// GetMessage fetches a message by the ID
176
func (service *MessageService) GetMessage(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
177
	ctx, span := service.tracer.Start(ctx)
178
	defer span.End()
179
180
	message, err := service.repository.Load(ctx, userID, messageID)
181
	if err != nil {
182
		msg := fmt.Sprintf("could not fetch messages with ID [%s]", messageID)
183
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
184
	}
185
186
	return message, nil
187
}
188
189
// MessageStoreEventParams parameters registering a message event
190
type MessageStoreEventParams struct {
191
	MessageID    uuid.UUID
192
	EventName    entities.MessageEventName
193
	Timestamp    time.Time
194
	ErrorMessage *string
195
	Source       string
196
}
197
198
// StoreEvent handles event generated by a mobile phone
199
func (service *MessageService) StoreEvent(ctx context.Context, message *entities.Message, params MessageStoreEventParams) (*entities.Message, error) {
200
	ctx, span := service.tracer.Start(ctx)
201
	defer span.End()
202
203
	var err error
204
205
	switch params.EventName {
206
	case entities.MessageEventNameSent:
207
		err = service.handleMessageSentEvent(ctx, params, message)
208
	case entities.MessageEventNameDelivered:
209
		err = service.handleMessageDeliveredEvent(ctx, params, message)
210
	case entities.MessageEventNameFailed:
211
		err = service.handleMessageFailedEvent(ctx, params, message)
212
	default:
213
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
214
	}
215
216
	if err != nil {
217
		msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
218
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
219
	}
220
221
	return service.repository.Load(ctx, message.UserID, params.MessageID)
222
}
223
224
// MessageReceiveParams parameters registering a message event
225
type MessageReceiveParams struct {
226
	Contact   string
227
	UserID    entities.UserID
228
	Owner     phonenumbers.PhoneNumber
229
	Content   string
230
	SIM       entities.SIM
231
	Timestamp time.Time
232
	Encrypted bool
233
	Source    string
234
}
235
236
// ReceiveMessage handles message received by a mobile phone
237
func (service *MessageService) ReceiveMessage(ctx context.Context, params *MessageReceiveParams) (*entities.Message, error) {
238
	ctx, span := service.tracer.Start(ctx)
239
	defer span.End()
240
241
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
242
243
	eventPayload := events.MessagePhoneReceivedPayload{
244
		MessageID: uuid.New(),
245
		UserID:    params.UserID,
246
		Encrypted: params.Encrypted,
247
		Owner:     phonenumbers.Format(&params.Owner, phonenumbers.E164),
248
		Contact:   params.Contact,
249
		Timestamp: params.Timestamp,
250
		Content:   params.Content,
251
		SIM:       params.SIM,
252
	}
253
254
	ctxLogger.Info(fmt.Sprintf("creating cloud event for received with ID [%s]", eventPayload.MessageID))
255
256
	event, err := service.createMessagePhoneReceivedEvent(params.Source, eventPayload)
257
	if err != nil {
258
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
259
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
260
	}
261
262
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.MessageID))
263
264
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
265
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
266
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
267
	}
268
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
269
270
	return service.storeReceivedMessage(ctx, eventPayload)
271
}
272
273
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
274
	ctx, span := service.tracer.Start(ctx)
275
	defer span.End()
276
277
	event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
278
		ID:        message.ID,
279
		Owner:     message.Owner,
280
		UserID:    message.UserID,
281
		RequestID: message.RequestID,
282
		Timestamp: params.Timestamp,
283
		Contact:   message.Contact,
284
		Encrypted: message.Encrypted,
285
		Content:   message.Content,
286
		SIM:       message.SIM,
287
	})
288
	if err != nil {
289
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
290
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
291
	}
292
293
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
294
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
295
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
296
	}
297
	return nil
298
}
299
300
func (service *MessageService) handleMessageDeliveredEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
301
	ctx, span := service.tracer.Start(ctx)
302
	defer span.End()
303
304
	event, err := service.createMessagePhoneDeliveredEvent(params.Source, events.MessagePhoneDeliveredPayload{
305
		ID:        message.ID,
306
		Owner:     message.Owner,
307
		UserID:    message.UserID,
308
		RequestID: message.RequestID,
309
		Timestamp: params.Timestamp,
310
		Encrypted: message.Encrypted,
311
		Contact:   message.Contact,
312
		Content:   message.Content,
313
		SIM:       message.SIM,
314
	})
315
	if err != nil {
316
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
317
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
318
	}
319
320
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, time.Second); err != nil {
321
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
322
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
323
	}
324
	return nil
325
}
326
327
func (service *MessageService) handleMessageFailedEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
328
	ctx, span := service.tracer.Start(ctx)
329
	defer span.End()
330
331
	errorMessage := "UNKNOWN ERROR"
332
	if params.ErrorMessage != nil {
333
		errorMessage = *params.ErrorMessage
334
	}
335
336
	event, err := service.createMessageSendFailedEvent(params.Source, events.MessageSendFailedPayload{
337
		ID:           message.ID,
338
		Owner:        message.Owner,
339
		ErrorMessage: errorMessage,
340
		Timestamp:    params.Timestamp,
341
		Encrypted:    message.Encrypted,
342
		Contact:      message.Contact,
343
		RequestID:    message.RequestID,
344
		UserID:       message.UserID,
345
		Content:      message.Content,
346
		SIM:          message.SIM,
347
	})
348
	if err != nil {
349
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessageSendFailed, message.ID)
350
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
351
	}
352
353
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
354
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
355
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
356
	}
357
	return nil
358
}
359
360
// MessageSendParams parameters for sending a new message
361
type MessageSendParams struct {
362
	Owner             *phonenumbers.PhoneNumber
363
	Contact           string
364
	Encrypted         bool
365
	Content           string
366
	Source            string
367
	SendAt            *time.Time
368
	RequestID         *string
369
	UserID            entities.UserID
370
	RequestReceivedAt time.Time
371
}
372
373
// SendMessage a new message
374
func (service *MessageService) SendMessage(ctx context.Context, params MessageSendParams) (*entities.Message, error) {
375
	ctx, span := service.tracer.Start(ctx)
376
	defer span.End()
377
378
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
379
380
	sendAttempts, sim := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
381
382
	eventPayload := events.MessageAPISentPayload{
383
		MessageID:         uuid.New(),
384
		UserID:            params.UserID,
385
		Encrypted:         params.Encrypted,
386
		MaxSendAttempts:   sendAttempts,
387
		RequestID:         params.RequestID,
388
		Owner:             phonenumbers.Format(params.Owner, phonenumbers.E164),
389
		Contact:           params.Contact,
390
		RequestReceivedAt: params.RequestReceivedAt,
391
		Content:           params.Content,
392
		ScheduledSendTime: params.SendAt,
393
		SIM:               sim,
394
	}
395
396
	event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
397
	if err != nil {
398
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
399
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
400
	}
401
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
402
403
	message, err := service.storeSentMessage(ctx, eventPayload)
404
	if err != nil {
405
		msg := fmt.Sprintf("cannot store message with id [%s]", eventPayload.MessageID)
406
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
407
	}
408
409
	timeout := service.getSendDelay(ctxLogger, eventPayload, params.SendAt)
410
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, timeout); err != nil {
411
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
412
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
413
	}
414
415
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s] and delay [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID, timeout))
416
	return message, err
417
}
418
419
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, sendAt *time.Time) time.Duration {
420
	if sendAt == nil {
421
		return time.Duration(0)
422
	}
423
424
	delay := sendAt.Sub(time.Now().UTC())
425
	if delay < 0 {
426
		ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, sendAt.String()))
427
		return time.Duration(0)
428
	}
429
430
	return delay
431
}
432
433
// StoreReceivedMessage a new message
434
func (service *MessageService) storeReceivedMessage(ctx context.Context, params events.MessagePhoneReceivedPayload) (*entities.Message, error) {
435
	ctx, span := service.tracer.Start(ctx)
436
	defer span.End()
437
438
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
439
440
	message := &entities.Message{
441
		ID:                params.MessageID,
442
		Owner:             params.Owner,
443
		UserID:            params.UserID,
444
		Contact:           params.Contact,
445
		Content:           params.Content,
446
		SIM:               params.SIM,
447
		Encrypted:         params.Encrypted,
448
		Type:              entities.MessageTypeMobileOriginated,
449
		Status:            entities.MessageStatusReceived,
450
		RequestReceivedAt: params.Timestamp,
451
		CreatedAt:         time.Now().UTC(),
452
		UpdatedAt:         time.Now().UTC(),
453
		OrderTimestamp:    params.Timestamp,
454
		ReceivedAt:        &params.Timestamp,
455
	}
456
457
	if err := service.repository.Store(ctx, message); err != nil {
458
		msg := fmt.Sprintf("cannot save message with id [%s]", params.MessageID)
459
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
460
	}
461
462
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", message.ID))
463
	return message, nil
464
}
465
466
// HandleMessageParams are parameters for handling a message event
467
type HandleMessageParams struct {
468
	ID        uuid.UUID
469
	Source    string
470
	UserID    entities.UserID
471
	Timestamp time.Time
472
}
473
474
// HandleMessageSending handles when a message is being sent
475
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
476
	ctx, span := service.tracer.Start(ctx)
477
	defer span.End()
478
479
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
480
481
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
482
	if err != nil {
483
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
484
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
485
	}
486
487
	if !message.IsSending() {
488
		msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
489
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
490
	}
491
492
	if err = service.repository.Update(ctx, message.AddSendAttempt(params.Timestamp)); err != nil {
493
		msg := fmt.Sprintf("cannot update message with id [%s] after sending", message.ID)
494
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
495
	}
496
497
	ctxLogger.Info(fmt.Sprintf("message with id [%s] updated after adding send attempt", message.ID))
498
	return nil
499
}
500
501
// HandleMessageSent handles when a message has been sent by a mobile phone
502
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
503
	ctx, span := service.tracer.Start(ctx)
504
	defer span.End()
505
506
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
507
508
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
509
	if err != nil {
510
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
511
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
512
	}
513
514
	if message.IsSent() || message.IsDelivered() {
515
		ctxLogger.Info(fmt.Sprintf("message [%s] for [%s] has already been processed with status [%s]", message.ID, message.UserID, message.Status))
516
		return nil
517
	}
518
519
	if !message.IsSending() && !message.IsExpired() {
520
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusExpired)
521
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
522
	}
523
524
	if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
525
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
526
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
527
	}
528
529
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
530
	return nil
531
}
532
533
// HandleMessageFailedParams are parameters for handling a failed message event
534
type HandleMessageFailedParams struct {
535
	ID           uuid.UUID
536
	UserID       entities.UserID
537
	ErrorMessage string
538
	Timestamp    time.Time
539
}
540
541
// HandleMessageFailed handles when a message could not be sent by a mobile phone
542
func (service *MessageService) HandleMessageFailed(ctx context.Context, params HandleMessageFailedParams) error {
543
	ctx, span := service.tracer.Start(ctx)
544
	defer span.End()
545
546
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
547
548
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
549
	if err != nil {
550
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
551
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
552
	}
553
554
	if message.IsDelivered() {
555
		msg := fmt.Sprintf("message has already been delivered with status [%s]", message.Status)
556
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
557
	}
558
559
	if err = service.repository.Update(ctx, message.Failed(params.Timestamp, params.ErrorMessage)); err != nil {
560
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
561
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
562
	}
563
564
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
565
	return nil
566
}
567
568
// HandleMessageDelivered handles when a message is has been delivered by a mobile phone
569
func (service *MessageService) HandleMessageDelivered(ctx context.Context, params HandleMessageParams) error {
570
	ctx, span := service.tracer.Start(ctx)
571
	defer span.End()
572
573
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
574
575
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
576
	if err != nil {
577
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
578
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
579
	}
580
581
	if !message.IsSent() && !message.IsSending() && !message.IsExpired() && !message.IsScheduled() {
582
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s, %s]", message.Status, entities.MessageStatusSent, entities.MessageStatusScheduled, entities.MessageStatusSending, entities.MessageStatusExpired)
583
		ctxLogger.Warn(stacktrace.NewError(msg))
584
		return nil
585
	}
586
587
	if err = service.repository.Update(ctx, message.Delivered(params.Timestamp)); err != nil {
588
		msg := fmt.Sprintf("cannot update message with id [%s] as delivered", message.ID)
589
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
590
	}
591
592
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
593
	return nil
594
}
595
596
// HandleMessageNotificationScheduled handles the event when the notification of a message has been scheduled
597
func (service *MessageService) HandleMessageNotificationScheduled(ctx context.Context, params HandleMessageParams) error {
598
	ctx, span := service.tracer.Start(ctx)
599
	defer span.End()
600
601
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
602
603
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
604
	if err != nil {
605
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
606
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
607
	}
608
609
	if !message.IsPending() && !message.IsExpired() && !message.IsSending() {
610
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("received scheduled event for message with id [%s] message has status [%s]", message.ID, message.Status)))
611
	}
612
613
	if err = service.repository.Update(ctx, message.NotificationScheduled(params.Timestamp)); err != nil {
614
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
615
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
616
	}
617
618
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been scheduled to send at [%s]", message.ID, message.NotificationScheduledAt.String()))
619
	return nil
620
}
621
622
// HandleMessageNotificationSent handles the event when the notification of a message has been sent
623
func (service *MessageService) HandleMessageNotificationSent(ctx context.Context, params HandleMessageParams) error {
624
	ctx, span := service.tracer.Start(ctx)
625
	defer span.End()
626
627
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
628
629
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
630
	if err != nil {
631
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
632
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
633
	}
634
635
	if err = service.repository.Update(ctx, message.AddSendAttemptCount()); err != nil {
636
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
637
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
638
	}
639
640
	ctxLogger.Info(fmt.Sprintf("notification for message with id [%s] has been sent at [%s]", message.ID, params.Timestamp.String()))
641
	return nil
642
}
643
644
// HandleMessageExpired handles when a message is has been expired
645
func (service *MessageService) HandleMessageExpired(ctx context.Context, params HandleMessageParams) error {
646
	ctx, span := service.tracer.Start(ctx)
647
	defer span.End()
648
649
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
650
651
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
652
	if err != nil {
653
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
654
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
655
	}
656
657
	if !message.IsSending() && !message.IsScheduled() {
658
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusScheduled)
659
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
660
	}
661
662
	if err = service.repository.Update(ctx, message.Expired(params.Timestamp)); err != nil {
663
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
664
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
665
	}
666
667
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
668
669
	if !message.CanBeRescheduled() {
670
		return nil
671
	}
672
673
	event, err := service.createMessageSendRetryEvent(params.Source, &events.MessageSendRetryPayload{
674
		MessageID: message.ID,
675
		Timestamp: time.Now().UTC(),
676
		Contact:   message.Contact,
677
		Owner:     message.Owner,
678
		Encrypted: message.Encrypted,
679
		UserID:    message.UserID,
680
		Content:   message.Content,
681
		SIM:       message.SIM,
682
	})
683
	if err != nil {
684
		msg := fmt.Sprintf("cannot create [%s] event for expired message with ID [%s]", events.EventTypeMessageSendRetry, message.ID)
685
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
686
	}
687
688
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
689
		msg := fmt.Sprintf("cannot dispatch [%s] event for message with ID [%s]", event.Type(), message.ID)
690
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
691
	}
692
693
	ctxLogger.Info(fmt.Sprintf("retried sending message with ID [%s]", message.ID))
694
	return nil
695
}
696
697
// MessageScheduleExpirationParams are parameters for scheduling the expiration of a message event
698
type MessageScheduleExpirationParams struct {
699
	MessageID                 uuid.UUID
700
	UserID                    entities.UserID
701
	NotificationSentAt        time.Time
702
	PhoneID                   uuid.UUID
703
	MessageExpirationDuration time.Duration
704
	Source                    string
705
}
706
707
// ScheduleExpirationCheck schedules an event to check if a message is expired
708
func (service *MessageService) ScheduleExpirationCheck(ctx context.Context, params MessageScheduleExpirationParams) error {
709
	ctx, span := service.tracer.Start(ctx)
710
	defer span.End()
711
712
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
713
714
	if params.MessageExpirationDuration == 0 {
715
		ctxLogger.Info(fmt.Sprintf("message expiration duration not set for message [%s] using phone [%s]", params.MessageID, params.PhoneID))
716
		return nil
717
	}
718
719
	event, err := service.createMessageSendExpiredCheckEvent(params.Source, &events.MessageSendExpiredCheckPayload{
720
		MessageID:   params.MessageID,
721
		ScheduledAt: params.NotificationSentAt.Add(params.MessageExpirationDuration),
722
		UserID:      params.UserID,
723
	})
724
	if err != nil {
725
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpiredCheck, params.MessageID)
726
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
727
	}
728
729
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, params.MessageExpirationDuration); err != nil {
730
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
731
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
732
	}
733
734
	ctxLogger.Info(fmt.Sprintf("scheduled message id [%s] to expire at [%s]", params.MessageID, params.NotificationSentAt.Add(params.MessageExpirationDuration)))
735
	return nil
736
}
737
738
// MessageCheckExpired are parameters for checking if a message is expired
739
type MessageCheckExpired struct {
740
	MessageID uuid.UUID
741
	UserID    entities.UserID
742
	Source    string
743
}
744
745
// CheckExpired checks if a message has expired
746
func (service *MessageService) CheckExpired(ctx context.Context, params MessageCheckExpired) error {
747
	ctx, span := service.tracer.Start(ctx)
748
	defer span.End()
749
750
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
751
752
	message, err := service.repository.Load(ctx, params.UserID, params.MessageID)
753
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
754
		ctxLogger.Info(fmt.Sprintf("message has been deleted for userID [%s] and messageID [%s]", params.UserID, params.MessageID))
755
		return nil
756
	}
757
758
	if err != nil {
759
		msg := fmt.Sprintf("cannot load message with userID [%s] and messageID [%s]", params.UserID, params.MessageID)
760
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
761
	}
762
763
	if !message.IsPending() && !message.IsSending() && !message.IsScheduled() {
764
		ctxLogger.Info(fmt.Sprintf("message with ID [%s] has status [%s] and is not expired", message.ID, message.Status))
765
		return nil
766
	}
767
768
	event, err := service.createMessageSendExpiredEvent(params.Source, events.MessageSendExpiredPayload{
769
		MessageID:        message.ID,
770
		Owner:            message.Owner,
771
		Contact:          message.Contact,
772
		Encrypted:        message.Encrypted,
773
		RequestID:        message.RequestID,
774
		IsFinal:          message.SendAttemptCount == message.MaxSendAttempts,
775
		SendAttemptCount: message.SendAttemptCount,
776
		UserID:           message.UserID,
777
		Timestamp:        time.Now().UTC(),
778
		Content:          message.Content,
779
		SIM:              message.SIM,
780
	})
781
	if err != nil {
782
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpired, params.MessageID)
783
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
784
	}
785
786
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
787
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
788
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
789
	}
790
791
	ctxLogger.Info(fmt.Sprintf("message [%s] has expired with status [%s]", params.MessageID, message.Status))
792
	return nil
793
}
794
795
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM) {
796
	ctx, span := service.tracer.Start(ctx)
797
	defer span.End()
798
799
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
800
801
	phone, err := service.phoneService.Load(ctx, userID, owner)
802
	if err != nil {
803
		msg := fmt.Sprintf("cannot load phone for userID [%s] and owner [%s]. using default max send attempt of 2", userID, owner)
804
		ctxLogger.Error(stacktrace.Propagate(err, msg))
805
		return 2, entities.SIM1
806
	}
807
808
	return phone.MaxSendAttemptsSanitized(), phone.SIM
809
}
810
811
// storeSentMessage a new message
812
func (service *MessageService) storeSentMessage(ctx context.Context, payload events.MessageAPISentPayload) (*entities.Message, error) {
813
	ctx, span := service.tracer.Start(ctx)
814
	defer span.End()
815
816
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
817
818
	timestamp := payload.RequestReceivedAt
819
	if payload.ScheduledSendTime != nil {
820
		timestamp = *payload.ScheduledSendTime
821
	}
822
823
	message := &entities.Message{
824
		ID:                payload.MessageID,
825
		Owner:             payload.Owner,
826
		Contact:           payload.Contact,
827
		UserID:            payload.UserID,
828
		Content:           payload.Content,
829
		RequestID:         payload.RequestID,
830
		SIM:               payload.SIM,
831
		Encrypted:         payload.Encrypted,
832
		ScheduledSendTime: payload.ScheduledSendTime,
833
		Type:              entities.MessageTypeMobileTerminated,
834
		Status:            entities.MessageStatusPending,
835
		RequestReceivedAt: payload.RequestReceivedAt,
836
		CreatedAt:         time.Now().UTC(),
837
		UpdatedAt:         time.Now().UTC(),
838
		MaxSendAttempts:   payload.MaxSendAttempts,
839
		OrderTimestamp:    timestamp,
840
	}
841
842
	if err := service.repository.Store(ctx, message); err != nil {
843
		msg := fmt.Sprintf("cannot save message with id [%s]", payload.MessageID)
844
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
845
	}
846
847
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", payload.MessageID))
848
	return message, nil
849
}
850
851
func (service *MessageService) createMessageSendExpiredEvent(source string, payload events.MessageSendExpiredPayload) (cloudevents.Event, error) {
852
	return service.createEvent(events.EventTypeMessageSendExpired, source, payload)
853
}
854
855
func (service *MessageService) createMessageSendExpiredCheckEvent(source string, payload *events.MessageSendExpiredCheckPayload) (cloudevents.Event, error) {
856
	return service.createEvent(events.EventTypeMessageSendExpiredCheck, source, payload)
857
}
858
859
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
860
	return service.createEvent(events.EventTypeMessageAPISent, source, payload)
861
}
862
863
func (service *MessageService) createMessagePhoneReceivedEvent(source string, payload events.MessagePhoneReceivedPayload) (cloudevents.Event, error) {
864
	return service.createEvent(events.EventTypeMessagePhoneReceived, source, payload)
865
}
866
867
func (service *MessageService) createMessagePhoneSendingEvent(source string, payload events.MessagePhoneSendingPayload) (cloudevents.Event, error) {
868
	return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
869
}
870
871
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
872
	return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
873
}
874
875
func (service *MessageService) createMessageSendFailedEvent(source string, payload events.MessageSendFailedPayload) (cloudevents.Event, error) {
876
	return service.createEvent(events.EventTypeMessageSendFailed, source, payload)
877
}
878
879
func (service *MessageService) createMessagePhoneDeliveredEvent(source string, payload events.MessagePhoneDeliveredPayload) (cloudevents.Event, error) {
880
	return service.createEvent(events.EventTypeMessagePhoneDelivered, source, payload)
881
}
882
883
func (service *MessageService) createMessageSendRetryEvent(source string, payload *events.MessageSendRetryPayload) (cloudevents.Event, error) {
884
	return service.createEvent(events.EventTypeMessageSendRetry, source, payload)
885
}
886