api/pkg/services/message_service.go   F
last analyzed

Size/Duplication

Total Lines 1063
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 128
eloc 699
dl 0
loc 1063
rs 1.901
c 0
b 0
f 0

40 Methods

Rating   Name   Duplication   Size   Complexity  
A services.NewMessageService 0 13 1
B services.*MessageService.CheckExpired 0 47 8
A services.*MessageService.getSendDelay 0 12 3
A services.*MessageService.HandleMessageFailed 0 24 4
C services.*MessageService.HandleMessageExpired 0 50 9
A services.*MessageService.RegisterMissedCall 0 36 4
B services.*MessageService.HandleMessageDelivered 0 25 7
A services.*MessageService.storeMissedCallMessage 0 27 2
A services.*MessageService.GetOutstanding 0 36 4
A services.*MessageService.HandleMessageSending 0 24 4
A services.*MessageService.handleMessageDeliveredEvent 0 25 3
A services.*MessageService.SendMessage 0 43 4
B services.*MessageService.DeleteMessage 0 48 5
A services.*MessageService.handleMessageFailedEvent 0 31 4
A services.*MessageService.phoneSettings 0 14 2
A services.*MessageService.DeleteAllForUser 0 11 2
A services.*MessageService.storeSentMessage 0 37 3
A services.*MessageService.DeleteByOwnerAndContact 0 13 2
A services.*MessageService.handleMessageSentEvent 0 25 3
A services.*MessageService.GetMessage 0 11 2
A services.*MessageService.ScheduleExpirationCheck 0 28 4
A services.*MessageService.HandleMessageNotificationSent 0 19 3
A services.*MessageService.ReceiveMessage 0 34 3
A services.*MessageService.storeReceivedMessage 0 30 2
B services.*MessageService.StoreEvent 0 23 6
B services.*MessageService.HandleMessageNotificationScheduled 0 23 6
B services.*MessageService.RespondToMissedCall 0 35 5
A services.*MessageService.GetMessages 0 14 2
A services.*MessageService.SearchMessages 0 14 2
B services.*MessageService.HandleMessageSent 0 29 8
A services.*MessageService.createMessageSendRetryEvent 0 2 1
A services.*MessageService.createMessageAPISentEvent 0 2 1
A services.*MessageService.enrichErrorMessage 0 5 2
A services.*MessageService.createMessageSendFailedEvent 0 2 1
A services.*MessageService.createMessageSendExpiredEvent 0 2 1
A services.*MessageService.createMessageSendExpiredCheckEvent 0 2 1
A services.*MessageService.createMessagePhoneSentEvent 0 2 1
A services.*MessageService.createMessagePhoneSendingEvent 0 2 1
A services.*MessageService.createMessagePhoneDeliveredEvent 0 2 1
A services.*MessageService.createMessagePhoneReceivedEvent 0 2 1
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8
9
	"github.com/davecgh/go-spew/spew"
10
11
	"github.com/nyaruka/phonenumbers"
12
13
	"github.com/NdoleStudio/httpsms/pkg/events"
14
	"github.com/NdoleStudio/httpsms/pkg/repositories"
15
	cloudevents "github.com/cloudevents/sdk-go/v2"
16
	"github.com/google/uuid"
17
	"github.com/palantir/stacktrace"
18
19
	"github.com/NdoleStudio/httpsms/pkg/entities"
20
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
21
)
22
23
// MessageService is handles message requests
24
type MessageService struct {
25
	service
26
	logger          telemetry.Logger
27
	tracer          telemetry.Tracer
28
	eventDispatcher *EventDispatcher
29
	phoneService    *PhoneService
30
	repository      repositories.MessageRepository
31
}
32
33
// NewMessageService creates a new MessageService
34
func NewMessageService(
35
	logger telemetry.Logger,
36
	tracer telemetry.Tracer,
37
	repository repositories.MessageRepository,
38
	eventDispatcher *EventDispatcher,
39
	phoneService *PhoneService,
40
) (s *MessageService) {
41
	return &MessageService{
42
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
43
		tracer:          tracer,
44
		repository:      repository,
45
		phoneService:    phoneService,
46
		eventDispatcher: eventDispatcher,
47
	}
48
}
49
50
// MessageGetOutstandingParams parameters for sending a new message
51
type MessageGetOutstandingParams struct {
52
	Source       string
53
	UserID       entities.UserID
54
	PhoneNumbers []string
55
	Timestamp    time.Time
56
	MessageID    uuid.UUID
57
}
58
59
// GetOutstanding fetches messages that still to be sent to the phone
60
func (service *MessageService) GetOutstanding(ctx context.Context, params MessageGetOutstandingParams) (*entities.Message, error) {
61
	ctx, span := service.tracer.Start(ctx)
62
	defer span.End()
63
64
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
65
66
	message, err := service.repository.GetOutstanding(ctx, params.UserID, params.MessageID, params.PhoneNumbers)
67
	if err != nil {
68
		msg := fmt.Sprintf("could not fetch outstanding messages with params [%s]", spew.Sdump(params))
69
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
70
	}
71
72
	event, err := service.createMessagePhoneSendingEvent(params.Source, events.MessagePhoneSendingPayload{
73
		ID:        message.ID,
74
		Owner:     message.Owner,
75
		Contact:   message.Contact,
76
		Timestamp: params.Timestamp,
77
		Encrypted: message.Encrypted,
78
		UserID:    message.UserID,
79
		Content:   message.Content,
80
		SIM:       message.SIM,
81
	})
82
	if err != nil {
83
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
84
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
85
	}
86
87
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
88
89
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
90
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
91
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
92
	}
93
94
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
95
	return message, nil
96
}
97
98
// DeleteAllForUser deletes all entities.Message for an entities.UserID.
99
func (service *MessageService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
100
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
101
	defer span.End()
102
103
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
104
		msg := fmt.Sprintf("could not delete [entities.Message] for user with ID [%s]", userID)
105
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
106
	}
107
108
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.Message] for user with ID [%s]", userID))
109
	return nil
110
}
111
112
// DeleteMessage deletes a message from the database
113
func (service *MessageService) DeleteMessage(ctx context.Context, source string, message *entities.Message) error {
114
	ctx, span := service.tracer.Start(ctx)
115
	defer span.End()
116
117
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
118
119
	if err := service.repository.Delete(ctx, message.UserID, message.ID); err != nil {
120
		msg := fmt.Sprintf("could not delete message with ID [%s] for user wit ID [%s]", message.ID, message.UserID)
121
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
122
	}
123
124
	var prevID *uuid.UUID
125
	var prevStatus *entities.MessageStatus
126
	var prevContent *string
127
	previousMessage, err := service.repository.LastMessage(ctx, message.UserID, message.Owner, message.Contact)
128
	if err == nil {
129
		prevID = &previousMessage.ID
130
		prevStatus = &previousMessage.Status
131
		prevContent = &previousMessage.Content
132
	}
133
134
	event, err := service.createEvent(events.MessageAPIDeleted, source, &events.MessageAPIDeletedPayload{
135
		MessageID:              message.ID,
136
		UserID:                 message.UserID,
137
		Owner:                  message.Owner,
138
		Encrypted:              message.Encrypted,
139
		RequestID:              message.RequestID,
140
		Contact:                message.Contact,
141
		Timestamp:              time.Now().UTC(),
142
		Content:                message.Content,
143
		PreviousMessageID:      prevID,
144
		PreviousMessageStatus:  prevStatus,
145
		PreviousMessageContent: prevContent,
146
		SIM:                    message.SIM,
147
	})
148
	if err != nil {
149
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
150
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
151
	}
152
153
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
154
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
155
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
156
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
157
	}
158
159
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
160
	return nil
161
}
162
163
// DeleteByOwnerAndContact deletes all the messages between an owner and a contact
164
func (service *MessageService) DeleteByOwnerAndContact(ctx context.Context, userID entities.UserID, owner, contact string) error {
165
	ctx, span := service.tracer.Start(ctx)
166
	defer span.End()
167
168
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
169
170
	if err := service.repository.DeleteByOwnerAndContact(ctx, userID, owner, contact); err != nil {
171
		msg := fmt.Sprintf("could not all delete messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact)
172
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
173
	}
174
175
	ctxLogger.Info(fmt.Sprintf("deleted all messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact))
176
	return nil
177
}
178
179
// RespondToMissedCall creates an SMS response to a missed phone call on the android phone
180
func (service *MessageService) RespondToMissedCall(ctx context.Context, source string, payload *events.MessageCallMissedPayload) error {
181
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
182
	defer span.End()
183
184
	phone, err := service.phoneService.Load(ctx, payload.UserID, payload.Owner)
185
	if err != nil {
186
		msg := fmt.Sprintf("cannot find phone with owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
187
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
188
	}
189
190
	if phone.MissedCallAutoReply == nil || strings.TrimSpace(*phone.MissedCallAutoReply) == "" {
191
		ctxLogger.Info(fmt.Sprintf("no auto reply set for phone [%s] for message [%s] with user [%s]", payload.Owner, payload.MessageID, payload.UserID))
192
		return nil
193
	}
194
195
	requestID := fmt.Sprintf("missed-call-%s", payload.MessageID)
196
	owner, _ := phonenumbers.Parse(payload.Owner, phonenumbers.UNKNOWN_REGION)
197
	message, err := service.SendMessage(ctx, MessageSendParams{
198
		Owner:             owner,
199
		Contact:           payload.Contact,
200
		Encrypted:         false,
201
		Content:           *phone.MissedCallAutoReply,
202
		Source:            source,
203
		SendAt:            nil,
204
		RequestID:         &requestID,
205
		UserID:            payload.UserID,
206
		RequestReceivedAt: time.Now().UTC(),
207
	})
208
	if err != nil {
209
		msg := fmt.Sprintf("cannot send auto response message for owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
210
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
211
	}
212
213
	ctxLogger.Info(fmt.Sprintf("created response message with ID [%s] for missed call event [%s] for user [%s]", message.ID, payload.MessageID, message.UserID))
214
	return nil
215
}
216
217
// MessageGetParams parameters for sending a new message
218
type MessageGetParams struct {
219
	repositories.IndexParams
220
	UserID  entities.UserID
221
	Owner   string
222
	Contact string
223
}
224
225
// GetMessages fetches sent between 2 phone numbers
226
func (service *MessageService) GetMessages(ctx context.Context, params MessageGetParams) (*[]entities.Message, error) {
227
	ctx, span := service.tracer.Start(ctx)
228
	defer span.End()
229
230
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
231
232
	messages, err := service.repository.Index(ctx, params.UserID, params.Owner, params.Contact, params.IndexParams)
233
	if err != nil {
234
		msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
235
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
236
	}
237
238
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*messages), params))
239
	return messages, nil
240
}
241
242
// GetMessage fetches a message by the ID
243
func (service *MessageService) GetMessage(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
244
	ctx, span := service.tracer.Start(ctx)
245
	defer span.End()
246
247
	message, err := service.repository.Load(ctx, userID, messageID)
248
	if err != nil {
249
		msg := fmt.Sprintf("could not fetch messages with ID [%s]", messageID)
250
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
251
	}
252
253
	return message, nil
254
}
255
256
// MessageStoreEventParams parameters registering a message event
257
type MessageStoreEventParams struct {
258
	MessageID    uuid.UUID
259
	EventName    entities.MessageEventName
260
	Timestamp    time.Time
261
	ErrorMessage *string
262
	Source       string
263
}
264
265
// StoreEvent handles event generated by a mobile phone
266
func (service *MessageService) StoreEvent(ctx context.Context, message *entities.Message, params MessageStoreEventParams) (*entities.Message, error) {
267
	ctx, span := service.tracer.Start(ctx)
268
	defer span.End()
269
270
	var err error
271
272
	switch params.EventName {
273
	case entities.MessageEventNameSent:
274
		err = service.handleMessageSentEvent(ctx, params, message)
275
	case entities.MessageEventNameDelivered:
276
		err = service.handleMessageDeliveredEvent(ctx, params, message)
277
	case entities.MessageEventNameFailed:
278
		err = service.handleMessageFailedEvent(ctx, params, message)
279
	default:
280
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
281
	}
282
283
	if err != nil {
284
		msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
285
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
286
	}
287
288
	return service.repository.Load(ctx, message.UserID, params.MessageID)
289
}
290
291
// MessageReceiveParams parameters registering a message event
292
type MessageReceiveParams struct {
293
	Contact   string
294
	UserID    entities.UserID
295
	Owner     phonenumbers.PhoneNumber
296
	Content   string
297
	SIM       entities.SIM
298
	Timestamp time.Time
299
	Encrypted bool
300
	Source    string
301
}
302
303
// ReceiveMessage handles message received by a mobile phone
304
func (service *MessageService) ReceiveMessage(ctx context.Context, params *MessageReceiveParams) (*entities.Message, error) {
305
	ctx, span := service.tracer.Start(ctx)
306
	defer span.End()
307
308
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
309
310
	eventPayload := events.MessagePhoneReceivedPayload{
311
		MessageID: uuid.New(),
312
		UserID:    params.UserID,
313
		Encrypted: params.Encrypted,
314
		Owner:     phonenumbers.Format(&params.Owner, phonenumbers.E164),
315
		Contact:   params.Contact,
316
		Timestamp: params.Timestamp,
317
		Content:   params.Content,
318
		SIM:       params.SIM,
319
	}
320
321
	ctxLogger.Info(fmt.Sprintf("creating cloud event for received with ID [%s]", eventPayload.MessageID))
322
323
	event, err := service.createMessagePhoneReceivedEvent(params.Source, eventPayload)
324
	if err != nil {
325
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
326
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
327
	}
328
329
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.MessageID))
330
331
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
332
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
333
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
334
	}
335
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
336
337
	return service.storeReceivedMessage(ctx, eventPayload)
338
}
339
340
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
341
	ctx, span := service.tracer.Start(ctx)
342
	defer span.End()
343
344
	event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
345
		ID:        message.ID,
346
		Owner:     message.Owner,
347
		UserID:    message.UserID,
348
		RequestID: message.RequestID,
349
		Timestamp: params.Timestamp,
350
		Contact:   message.Contact,
351
		Encrypted: message.Encrypted,
352
		Content:   message.Content,
353
		SIM:       message.SIM,
354
	})
355
	if err != nil {
356
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
357
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
358
	}
359
360
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
361
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
362
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
363
	}
364
	return nil
365
}
366
367
func (service *MessageService) handleMessageDeliveredEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
368
	ctx, span := service.tracer.Start(ctx)
369
	defer span.End()
370
371
	event, err := service.createMessagePhoneDeliveredEvent(params.Source, events.MessagePhoneDeliveredPayload{
372
		ID:        message.ID,
373
		Owner:     message.Owner,
374
		UserID:    message.UserID,
375
		RequestID: message.RequestID,
376
		Timestamp: params.Timestamp,
377
		Encrypted: message.Encrypted,
378
		Contact:   message.Contact,
379
		Content:   message.Content,
380
		SIM:       message.SIM,
381
	})
382
	if err != nil {
383
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
384
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
385
	}
386
387
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, time.Second); err != nil {
388
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
389
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
390
	}
391
	return nil
392
}
393
394
func (service *MessageService) handleMessageFailedEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
395
	ctx, span := service.tracer.Start(ctx)
396
	defer span.End()
397
398
	errorMessage := "UNKNOWN ERROR"
399
	if params.ErrorMessage != nil {
400
		errorMessage = service.enrichErrorMessage(*params.ErrorMessage)
401
	}
402
403
	event, err := service.createMessageSendFailedEvent(params.Source, events.MessageSendFailedPayload{
404
		ID:           message.ID,
405
		Owner:        message.Owner,
406
		ErrorMessage: errorMessage,
407
		Timestamp:    params.Timestamp,
408
		Encrypted:    message.Encrypted,
409
		Contact:      message.Contact,
410
		RequestID:    message.RequestID,
411
		UserID:       message.UserID,
412
		Content:      message.Content,
413
		SIM:          message.SIM,
414
	})
415
	if err != nil {
416
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessageSendFailed, message.ID)
417
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
418
	}
419
420
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
421
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
422
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
423
	}
424
	return nil
425
}
426
427
// MessageSendParams parameters for sending a new message
428
type MessageSendParams struct {
429
	Owner             *phonenumbers.PhoneNumber
430
	Contact           string
431
	Encrypted         bool
432
	Content           string
433
	Source            string
434
	SendAt            *time.Time
435
	RequestID         *string
436
	UserID            entities.UserID
437
	RequestReceivedAt time.Time
438
}
439
440
// SendMessage a new message
441
func (service *MessageService) SendMessage(ctx context.Context, params MessageSendParams) (*entities.Message, error) {
442
	ctx, span := service.tracer.Start(ctx)
443
	defer span.End()
444
445
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
446
447
	sendAttempts, sim := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
448
449
	eventPayload := events.MessageAPISentPayload{
450
		MessageID:         uuid.New(),
451
		UserID:            params.UserID,
452
		Encrypted:         params.Encrypted,
453
		MaxSendAttempts:   sendAttempts,
454
		RequestID:         params.RequestID,
455
		Owner:             phonenumbers.Format(params.Owner, phonenumbers.E164),
456
		Contact:           params.Contact,
457
		RequestReceivedAt: params.RequestReceivedAt,
458
		Content:           params.Content,
459
		ScheduledSendTime: params.SendAt,
460
		SIM:               sim,
461
	}
462
463
	event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
464
	if err != nil {
465
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
466
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
467
	}
468
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
469
470
	message, err := service.storeSentMessage(ctx, eventPayload)
471
	if err != nil {
472
		msg := fmt.Sprintf("cannot store message with id [%s]", eventPayload.MessageID)
473
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
474
	}
475
476
	timeout := service.getSendDelay(ctxLogger, eventPayload, params.SendAt)
477
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, timeout); err != nil {
478
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
479
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
480
	}
481
482
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s] and delay [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID, timeout))
483
	return message, err
484
}
485
486
// MissedCallParams parameters for sending a new message
487
type MissedCallParams struct {
488
	Owner     *phonenumbers.PhoneNumber
489
	Contact   string
490
	Source    string
491
	SIM       entities.SIM
492
	Timestamp time.Time
493
	UserID    entities.UserID
494
}
495
496
// RegisterMissedCall a new message
497
func (service *MessageService) RegisterMissedCall(ctx context.Context, params *MissedCallParams) (*entities.Message, error) {
498
	ctx, span := service.tracer.Start(ctx)
499
	defer span.End()
500
501
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
502
503
	eventPayload := &events.MessageCallMissedPayload{
504
		MessageID: uuid.New(),
505
		UserID:    params.UserID,
506
		Timestamp: params.Timestamp,
507
		Owner:     phonenumbers.Format(params.Owner, phonenumbers.E164),
508
		Contact:   params.Contact,
509
		SIM:       params.SIM,
510
	}
511
512
	event, err := service.createEvent(events.MessageCallMissed, params.Source, eventPayload)
513
	if err != nil {
514
		msg := fmt.Sprintf("cannot create [%T] from payload with message id [%s]", event, eventPayload.MessageID)
515
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
516
	}
517
518
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
519
520
	message, err := service.storeMissedCallMessage(ctx, eventPayload)
521
	if err != nil {
522
		msg := fmt.Sprintf("cannot store missed call message message with id [%s]", eventPayload.MessageID)
523
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
524
	}
525
526
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
527
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
528
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
529
	}
530
531
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
532
	return message, err
533
}
534
535
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, sendAt *time.Time) time.Duration {
536
	if sendAt == nil {
537
		return time.Duration(0)
538
	}
539
540
	delay := sendAt.Sub(time.Now().UTC())
541
	if delay < 0 {
542
		ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, sendAt.String()))
543
		return time.Duration(0)
544
	}
545
546
	return delay
547
}
548
549
// StoreReceivedMessage a new message
550
func (service *MessageService) storeReceivedMessage(ctx context.Context, params events.MessagePhoneReceivedPayload) (*entities.Message, error) {
551
	ctx, span := service.tracer.Start(ctx)
552
	defer span.End()
553
554
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
555
556
	message := &entities.Message{
557
		ID:                params.MessageID,
558
		Owner:             params.Owner,
559
		UserID:            params.UserID,
560
		Contact:           params.Contact,
561
		Content:           params.Content,
562
		SIM:               params.SIM,
563
		Encrypted:         params.Encrypted,
564
		Type:              entities.MessageTypeMobileOriginated,
565
		Status:            entities.MessageStatusReceived,
566
		RequestReceivedAt: params.Timestamp,
567
		CreatedAt:         time.Now().UTC(),
568
		UpdatedAt:         time.Now().UTC(),
569
		OrderTimestamp:    params.Timestamp,
570
		ReceivedAt:        &params.Timestamp,
571
	}
572
573
	if err := service.repository.Store(ctx, message); err != nil {
574
		msg := fmt.Sprintf("cannot save message with id [%s]", params.MessageID)
575
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
576
	}
577
578
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", message.ID))
579
	return message, nil
580
}
581
582
// HandleMessageParams are parameters for handling a message event
583
type HandleMessageParams struct {
584
	ID        uuid.UUID
585
	Source    string
586
	UserID    entities.UserID
587
	Timestamp time.Time
588
}
589
590
// HandleMessageSending handles when a message is being sent
591
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
592
	ctx, span := service.tracer.Start(ctx)
593
	defer span.End()
594
595
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
596
597
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
598
	if err != nil {
599
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
600
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
601
	}
602
603
	if !message.IsSending() {
604
		msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
605
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
606
	}
607
608
	if err = service.repository.Update(ctx, message.AddSendAttempt(params.Timestamp)); err != nil {
609
		msg := fmt.Sprintf("cannot update message with id [%s] after sending", message.ID)
610
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
611
	}
612
613
	ctxLogger.Info(fmt.Sprintf("message with id [%s] updated after adding send attempt", message.ID))
614
	return nil
615
}
616
617
// HandleMessageSent handles when a message has been sent by a mobile phone
618
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
619
	ctx, span := service.tracer.Start(ctx)
620
	defer span.End()
621
622
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
623
624
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
625
	if err != nil {
626
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
627
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
628
	}
629
630
	if message.IsSent() || message.IsDelivered() {
631
		ctxLogger.Info(fmt.Sprintf("message [%s] for [%s] has already been processed with status [%s]", message.ID, message.UserID, message.Status))
632
		return nil
633
	}
634
635
	if !message.IsSending() && !message.IsExpired() && !message.IsScheduled() {
636
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusExpired, entities.MessageStatusScheduled)
637
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
638
	}
639
640
	if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
641
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
642
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
643
	}
644
645
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
646
	return nil
647
}
648
649
// HandleMessageFailedParams are parameters for handling a failed message event
650
type HandleMessageFailedParams struct {
651
	ID           uuid.UUID
652
	UserID       entities.UserID
653
	ErrorMessage string
654
	Timestamp    time.Time
655
}
656
657
// HandleMessageFailed handles when a message could not be sent by a mobile phone
658
func (service *MessageService) HandleMessageFailed(ctx context.Context, params HandleMessageFailedParams) error {
659
	ctx, span := service.tracer.Start(ctx)
660
	defer span.End()
661
662
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
663
664
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
665
	if err != nil {
666
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
667
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
668
	}
669
670
	if message.IsDelivered() {
671
		msg := fmt.Sprintf("message has already been delivered with status [%s]", message.Status)
672
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
673
	}
674
675
	if err = service.repository.Update(ctx, message.Failed(params.Timestamp, params.ErrorMessage)); err != nil {
676
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
677
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
678
	}
679
680
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
681
	return nil
682
}
683
684
// HandleMessageDelivered handles when a message is has been delivered by a mobile phone
685
func (service *MessageService) HandleMessageDelivered(ctx context.Context, params HandleMessageParams) error {
686
	ctx, span := service.tracer.Start(ctx)
687
	defer span.End()
688
689
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
690
691
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
692
	if err != nil {
693
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
694
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
695
	}
696
697
	if !message.IsSent() && !message.IsSending() && !message.IsExpired() && !message.IsScheduled() {
698
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s, %s]", message.Status, entities.MessageStatusSent, entities.MessageStatusScheduled, entities.MessageStatusSending, entities.MessageStatusExpired)
699
		ctxLogger.Warn(stacktrace.NewError(msg))
700
		return nil
701
	}
702
703
	if err = service.repository.Update(ctx, message.Delivered(params.Timestamp)); err != nil {
704
		msg := fmt.Sprintf("cannot update message with id [%s] as delivered", message.ID)
705
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
706
	}
707
708
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
709
	return nil
710
}
711
712
// HandleMessageNotificationScheduled handles the event when the notification of a message has been scheduled
713
func (service *MessageService) HandleMessageNotificationScheduled(ctx context.Context, params HandleMessageParams) error {
714
	ctx, span := service.tracer.Start(ctx)
715
	defer span.End()
716
717
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
718
719
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
720
	if err != nil {
721
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
722
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
723
	}
724
725
	if !message.IsPending() && !message.IsExpired() && !message.IsSending() {
726
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("received scheduled event for message with id [%s] message has status [%s]", message.ID, message.Status)))
727
	}
728
729
	if err = service.repository.Update(ctx, message.NotificationScheduled(params.Timestamp)); err != nil {
730
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
731
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
732
	}
733
734
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been scheduled to send at [%s]", message.ID, message.NotificationScheduledAt.String()))
735
	return nil
736
}
737
738
// HandleMessageNotificationSent handles the event when the notification of a message has been sent
739
func (service *MessageService) HandleMessageNotificationSent(ctx context.Context, params HandleMessageParams) error {
740
	ctx, span := service.tracer.Start(ctx)
741
	defer span.End()
742
743
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
744
745
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
746
	if err != nil {
747
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
748
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
749
	}
750
751
	if err = service.repository.Update(ctx, message.AddSendAttemptCount()); err != nil {
752
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
753
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
754
	}
755
756
	ctxLogger.Info(fmt.Sprintf("notification for message with id [%s] has been sent at [%s]", message.ID, params.Timestamp.String()))
757
	return nil
758
}
759
760
// HandleMessageExpired handles when a message is has been expired
761
func (service *MessageService) HandleMessageExpired(ctx context.Context, params HandleMessageParams) error {
762
	ctx, span := service.tracer.Start(ctx)
763
	defer span.End()
764
765
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
766
767
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
768
	if err != nil {
769
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
770
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
771
	}
772
773
	if !message.IsSending() && !message.IsScheduled() && !message.IsPending() {
774
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusScheduled, entities.MessageStatusPending)
775
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
776
	}
777
778
	if err = service.repository.Update(ctx, message.Expired(params.Timestamp)); err != nil {
779
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
780
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
781
	}
782
783
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
784
785
	if !message.CanBeRescheduled() {
786
		return nil
787
	}
788
789
	event, err := service.createMessageSendRetryEvent(params.Source, &events.MessageSendRetryPayload{
790
		MessageID: message.ID,
791
		Timestamp: time.Now().UTC(),
792
		Contact:   message.Contact,
793
		Owner:     message.Owner,
794
		Encrypted: message.Encrypted,
795
		UserID:    message.UserID,
796
		Content:   message.Content,
797
		SIM:       message.SIM,
798
	})
799
	if err != nil {
800
		msg := fmt.Sprintf("cannot create [%s] event for expired message with ID [%s]", events.EventTypeMessageSendRetry, message.ID)
801
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
802
	}
803
804
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
805
		msg := fmt.Sprintf("cannot dispatch [%s] event for message with ID [%s]", event.Type(), message.ID)
806
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
807
	}
808
809
	ctxLogger.Info(fmt.Sprintf("retried sending message with ID [%s]", message.ID))
810
	return nil
811
}
812
813
// MessageScheduleExpirationParams are parameters for scheduling the expiration of a message event
814
type MessageScheduleExpirationParams struct {
815
	MessageID                 uuid.UUID
816
	UserID                    entities.UserID
817
	NotificationSentAt        time.Time
818
	PhoneID                   uuid.UUID
819
	MessageExpirationDuration time.Duration
820
	Source                    string
821
}
822
823
// ScheduleExpirationCheck schedules an event to check if a message is expired
824
func (service *MessageService) ScheduleExpirationCheck(ctx context.Context, params MessageScheduleExpirationParams) error {
825
	ctx, span := service.tracer.Start(ctx)
826
	defer span.End()
827
828
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
829
830
	if params.MessageExpirationDuration == 0 {
831
		ctxLogger.Info(fmt.Sprintf("message expiration duration not set for message [%s] using phone [%s]", params.MessageID, params.PhoneID))
832
		return nil
833
	}
834
835
	event, err := service.createMessageSendExpiredCheckEvent(params.Source, &events.MessageSendExpiredCheckPayload{
836
		MessageID:   params.MessageID,
837
		ScheduledAt: params.NotificationSentAt.Add(params.MessageExpirationDuration),
838
		UserID:      params.UserID,
839
	})
840
	if err != nil {
841
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpiredCheck, params.MessageID)
842
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
843
	}
844
845
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, params.MessageExpirationDuration); err != nil {
846
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
847
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
848
	}
849
850
	ctxLogger.Info(fmt.Sprintf("scheduled message id [%s] to expire at [%s]", params.MessageID, params.NotificationSentAt.Add(params.MessageExpirationDuration)))
851
	return nil
852
}
853
854
// MessageCheckExpired are parameters for checking if a message is expired
855
type MessageCheckExpired struct {
856
	MessageID uuid.UUID
857
	UserID    entities.UserID
858
	Source    string
859
}
860
861
// CheckExpired checks if a message has expired
862
func (service *MessageService) CheckExpired(ctx context.Context, params MessageCheckExpired) error {
863
	ctx, span := service.tracer.Start(ctx)
864
	defer span.End()
865
866
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
867
868
	message, err := service.repository.Load(ctx, params.UserID, params.MessageID)
869
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
870
		ctxLogger.Info(fmt.Sprintf("message has been deleted for userID [%s] and messageID [%s]", params.UserID, params.MessageID))
871
		return nil
872
	}
873
874
	if err != nil {
875
		msg := fmt.Sprintf("cannot load message with userID [%s] and messageID [%s]", params.UserID, params.MessageID)
876
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
877
	}
878
879
	if !message.IsPending() && !message.IsSending() && !message.IsScheduled() {
880
		ctxLogger.Info(fmt.Sprintf("message with ID [%s] has status [%s] and is not expired", message.ID, message.Status))
881
		return nil
882
	}
883
884
	event, err := service.createMessageSendExpiredEvent(params.Source, events.MessageSendExpiredPayload{
885
		MessageID:        message.ID,
886
		Owner:            message.Owner,
887
		Contact:          message.Contact,
888
		Encrypted:        message.Encrypted,
889
		RequestID:        message.RequestID,
890
		IsFinal:          message.SendAttemptCount == message.MaxSendAttempts,
891
		SendAttemptCount: message.SendAttemptCount,
892
		UserID:           message.UserID,
893
		Timestamp:        time.Now().UTC(),
894
		Content:          message.Content,
895
		SIM:              message.SIM,
896
	})
897
	if err != nil {
898
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpired, params.MessageID)
899
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
900
	}
901
902
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
903
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
904
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
905
	}
906
907
	ctxLogger.Info(fmt.Sprintf("message [%s] has expired with status [%s]", params.MessageID, message.Status))
908
	return nil
909
}
910
911
// MessageSearchParams are parameters for searching messages
912
type MessageSearchParams struct {
913
	repositories.IndexParams
914
	UserID   entities.UserID
915
	Owners   []string
916
	Types    []entities.MessageType
917
	Statuses []entities.MessageStatus
918
}
919
920
// SearchMessages fetches all the messages for a user
921
func (service *MessageService) SearchMessages(ctx context.Context, params *MessageSearchParams) ([]*entities.Message, error) {
922
	ctx, span := service.tracer.Start(ctx)
923
	defer span.End()
924
925
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
926
927
	messages, err := service.repository.Search(ctx, params.UserID, params.Owners, params.Types, params.Statuses, params.IndexParams)
928
	if err != nil {
929
		msg := fmt.Sprintf("could not search messages with parms [%+#v]", params)
930
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
931
	}
932
933
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(messages), params))
934
	return messages, nil
935
}
936
937
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM) {
938
	ctx, span := service.tracer.Start(ctx)
939
	defer span.End()
940
941
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
942
943
	phone, err := service.phoneService.Load(ctx, userID, owner)
944
	if err != nil {
945
		msg := fmt.Sprintf("cannot load phone for userID [%s] and owner [%s]. using default max send attempt of 2", userID, owner)
946
		ctxLogger.Error(stacktrace.Propagate(err, msg))
947
		return 2, entities.SIM1
948
	}
949
950
	return phone.MaxSendAttemptsSanitized(), phone.SIM
951
}
952
953
// storeSentMessage a new message
954
func (service *MessageService) storeSentMessage(ctx context.Context, payload events.MessageAPISentPayload) (*entities.Message, error) {
955
	ctx, span := service.tracer.Start(ctx)
956
	defer span.End()
957
958
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
959
960
	timestamp := payload.RequestReceivedAt
961
	if payload.ScheduledSendTime != nil {
962
		timestamp = *payload.ScheduledSendTime
963
	}
964
965
	message := &entities.Message{
966
		ID:                payload.MessageID,
967
		Owner:             payload.Owner,
968
		Contact:           payload.Contact,
969
		UserID:            payload.UserID,
970
		Content:           payload.Content,
971
		RequestID:         payload.RequestID,
972
		SIM:               payload.SIM,
973
		Encrypted:         payload.Encrypted,
974
		ScheduledSendTime: payload.ScheduledSendTime,
975
		Type:              entities.MessageTypeMobileTerminated,
976
		Status:            entities.MessageStatusPending,
977
		RequestReceivedAt: payload.RequestReceivedAt,
978
		CreatedAt:         time.Now().UTC(),
979
		UpdatedAt:         time.Now().UTC(),
980
		MaxSendAttempts:   payload.MaxSendAttempts,
981
		OrderTimestamp:    timestamp,
982
	}
983
984
	if err := service.repository.Store(ctx, message); err != nil {
985
		msg := fmt.Sprintf("cannot save message with id [%s]", payload.MessageID)
986
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
987
	}
988
989
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", payload.MessageID))
990
	return message, nil
991
}
992
993
// storeMissedCallMessage a new message
994
func (service *MessageService) storeMissedCallMessage(ctx context.Context, payload *events.MessageCallMissedPayload) (*entities.Message, error) {
995
	ctx, span := service.tracer.Start(ctx)
996
	defer span.End()
997
998
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
999
1000
	message := &entities.Message{
1001
		ID:                payload.MessageID,
1002
		Owner:             payload.Owner,
1003
		Contact:           payload.Contact,
1004
		UserID:            payload.UserID,
1005
		SIM:               payload.SIM,
1006
		Type:              entities.MessageTypeCallMissed,
1007
		Status:            entities.MessageStatusReceived,
1008
		RequestReceivedAt: payload.Timestamp,
1009
		CreatedAt:         time.Now().UTC(),
1010
		UpdatedAt:         time.Now().UTC(),
1011
		OrderTimestamp:    payload.Timestamp,
1012
	}
1013
1014
	if err := service.repository.Store(ctx, message); err != nil {
1015
		msg := fmt.Sprintf("cannot save missed call message with id [%s]", payload.MessageID)
1016
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
1017
	}
1018
1019
	ctxLogger.Info(fmt.Sprintf("missed call message saved with id [%s]", payload.MessageID))
1020
	return message, nil
1021
}
1022
1023
func (service *MessageService) enrichErrorMessage(message string) string {
1024
	if strings.Contains(message, "android.permission.SEND_SMS") {
1025
		return message + " You need to grant the SMS permission to the httpSMS Android app https://httpsms.com/blog/grant-send-and-read-sms-permissions-on-android"
1026
	}
1027
	return message
1028
}
1029
1030
func (service *MessageService) createMessageSendExpiredEvent(source string, payload events.MessageSendExpiredPayload) (cloudevents.Event, error) {
1031
	return service.createEvent(events.EventTypeMessageSendExpired, source, payload)
1032
}
1033
1034
func (service *MessageService) createMessageSendExpiredCheckEvent(source string, payload *events.MessageSendExpiredCheckPayload) (cloudevents.Event, error) {
1035
	return service.createEvent(events.EventTypeMessageSendExpiredCheck, source, payload)
1036
}
1037
1038
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
1039
	return service.createEvent(events.EventTypeMessageAPISent, source, payload)
1040
}
1041
1042
func (service *MessageService) createMessagePhoneReceivedEvent(source string, payload events.MessagePhoneReceivedPayload) (cloudevents.Event, error) {
1043
	return service.createEvent(events.EventTypeMessagePhoneReceived, source, payload)
1044
}
1045
1046
func (service *MessageService) createMessagePhoneSendingEvent(source string, payload events.MessagePhoneSendingPayload) (cloudevents.Event, error) {
1047
	return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
1048
}
1049
1050
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
1051
	return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
1052
}
1053
1054
func (service *MessageService) createMessageSendFailedEvent(source string, payload events.MessageSendFailedPayload) (cloudevents.Event, error) {
1055
	return service.createEvent(events.EventTypeMessageSendFailed, source, payload)
1056
}
1057
1058
func (service *MessageService) createMessagePhoneDeliveredEvent(source string, payload events.MessagePhoneDeliveredPayload) (cloudevents.Event, error) {
1059
	return service.createEvent(events.EventTypeMessagePhoneDelivered, source, payload)
1060
}
1061
1062
func (service *MessageService) createMessageSendRetryEvent(source string, payload *events.MessageSendRetryPayload) (cloudevents.Event, error) {
1063
	return service.createEvent(events.EventTypeMessageSendRetry, source, payload)
1064
}
1065