Passed
Push — main ( 203737...4cb1c0 )
by Acho
01:56
created

services.*MessageService.SearchMessages   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8
9
	"github.com/davecgh/go-spew/spew"
10
11
	"github.com/nyaruka/phonenumbers"
12
13
	"github.com/NdoleStudio/httpsms/pkg/events"
14
	"github.com/NdoleStudio/httpsms/pkg/repositories"
15
	cloudevents "github.com/cloudevents/sdk-go/v2"
16
	"github.com/google/uuid"
17
	"github.com/palantir/stacktrace"
18
19
	"github.com/NdoleStudio/httpsms/pkg/entities"
20
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
21
)
22
23
// MessageService is handles message requests
24
type MessageService struct {
25
	service
26
	logger          telemetry.Logger
27
	tracer          telemetry.Tracer
28
	eventDispatcher *EventDispatcher
29
	phoneService    *PhoneService
30
	repository      repositories.MessageRepository
31
}
32
33
// NewMessageService creates a new MessageService
34
func NewMessageService(
35
	logger telemetry.Logger,
36
	tracer telemetry.Tracer,
37
	repository repositories.MessageRepository,
38
	eventDispatcher *EventDispatcher,
39
	phoneService *PhoneService,
40
) (s *MessageService) {
41
	return &MessageService{
42
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
43
		tracer:          tracer,
44
		repository:      repository,
45
		phoneService:    phoneService,
46
		eventDispatcher: eventDispatcher,
47
	}
48
}
49
50
// MessageGetOutstandingParams parameters for sending a new message
51
type MessageGetOutstandingParams struct {
52
	Source    string
53
	UserID    entities.UserID
54
	Timestamp time.Time
55
	MessageID uuid.UUID
56
}
57
58
// GetOutstanding fetches messages that still to be sent to the phone
59
func (service *MessageService) GetOutstanding(ctx context.Context, params MessageGetOutstandingParams) (*entities.Message, error) {
60
	ctx, span := service.tracer.Start(ctx)
61
	defer span.End()
62
63
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
64
65
	message, err := service.repository.GetOutstanding(ctx, params.UserID, params.MessageID)
66
	if err != nil {
67
		msg := fmt.Sprintf("could not fetch outstanding messages with params [%s]", spew.Sdump(params))
68
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
69
	}
70
71
	event, err := service.createMessagePhoneSendingEvent(params.Source, events.MessagePhoneSendingPayload{
72
		ID:        message.ID,
73
		Owner:     message.Owner,
74
		Contact:   message.Contact,
75
		Timestamp: params.Timestamp,
76
		Encrypted: message.Encrypted,
77
		UserID:    message.UserID,
78
		Content:   message.Content,
79
		SIM:       message.SIM,
80
	})
81
	if err != nil {
82
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
83
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
84
	}
85
86
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
87
88
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
89
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
90
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
91
	}
92
93
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
94
	return message, nil
95
}
96
97
// DeleteMessage deletes a message from the database
98
func (service *MessageService) DeleteMessage(ctx context.Context, source string, message *entities.Message) error {
99
	ctx, span := service.tracer.Start(ctx)
100
	defer span.End()
101
102
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
103
104
	if err := service.repository.Delete(ctx, message.UserID, message.ID); err != nil {
105
		msg := fmt.Sprintf("could not delete message with ID [%s] for user wit ID [%s]", message.ID, message.UserID)
106
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
107
	}
108
109
	var prevID *uuid.UUID
110
	var prevStatus *entities.MessageStatus
111
	var prevContent *string
112
	previousMessage, err := service.repository.LastMessage(ctx, message.UserID, message.Owner, message.Contact)
113
	if err == nil {
114
		prevID = &previousMessage.ID
115
		prevStatus = &previousMessage.Status
116
		prevContent = &previousMessage.Content
117
	}
118
119
	event, err := service.createEvent(events.MessageAPIDeleted, source, &events.MessageAPIDeletedPayload{
120
		MessageID:              message.ID,
121
		UserID:                 message.UserID,
122
		Owner:                  message.Owner,
123
		Encrypted:              message.Encrypted,
124
		RequestID:              message.RequestID,
125
		Contact:                message.Contact,
126
		Timestamp:              time.Now().UTC(),
127
		Content:                message.Content,
128
		PreviousMessageID:      prevID,
129
		PreviousMessageStatus:  prevStatus,
130
		PreviousMessageContent: prevContent,
131
		SIM:                    message.SIM,
132
	})
133
	if err != nil {
134
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
135
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
136
	}
137
138
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
139
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
140
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
141
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
142
	}
143
144
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
145
	return nil
146
}
147
148
// DeleteByOwnerAndContact deletes all the messages between an owner and a contact
149
func (service *MessageService) DeleteByOwnerAndContact(ctx context.Context, userID entities.UserID, owner, contact string) error {
150
	ctx, span := service.tracer.Start(ctx)
151
	defer span.End()
152
153
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
154
155
	if err := service.repository.DeleteByOwnerAndContact(ctx, userID, owner, contact); err != nil {
156
		msg := fmt.Sprintf("could not all delete messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact)
157
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
158
	}
159
160
	ctxLogger.Info(fmt.Sprintf("deleted all messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact))
161
	return nil
162
}
163
164
// RespondToMissedCall creates an SMS response to a missed phone call on the android phone
165
func (service *MessageService) RespondToMissedCall(ctx context.Context, source string, payload *events.MessageCallMissedPayload) error {
166
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
167
	defer span.End()
168
169
	phone, err := service.phoneService.Load(ctx, payload.UserID, payload.Owner)
170
	if err != nil {
171
		msg := fmt.Sprintf("cannot find phone with owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
172
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
173
	}
174
175
	if phone.MissedCallAutoReply == nil || strings.TrimSpace(*phone.MissedCallAutoReply) == "" {
176
		ctxLogger.Info(fmt.Sprintf("no auto reply set for phone [%s] for message [%s] with user [%s]", payload.Owner, payload.MessageID, payload.UserID))
177
		return nil
178
	}
179
180
	requestID := fmt.Sprintf("missed-call-%s", payload.MessageID)
181
	owner, _ := phonenumbers.Parse(payload.Owner, phonenumbers.UNKNOWN_REGION)
182
	message, err := service.SendMessage(ctx, MessageSendParams{
183
		Owner:             owner,
184
		Contact:           payload.Contact,
185
		Encrypted:         false,
186
		Content:           *phone.MissedCallAutoReply,
187
		Source:            source,
188
		SendAt:            nil,
189
		RequestID:         &requestID,
190
		UserID:            payload.UserID,
191
		RequestReceivedAt: time.Now().UTC(),
192
	})
193
	if err != nil {
194
		msg := fmt.Sprintf("cannot send auto response message for owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
195
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
196
	}
197
198
	ctxLogger.Info(fmt.Sprintf("created response message with ID [%s] for missed call event [%s] for user [%s]", message.ID, payload.MessageID, message.UserID))
199
	return nil
200
}
201
202
// MessageGetParams parameters for sending a new message
203
type MessageGetParams struct {
204
	repositories.IndexParams
205
	UserID  entities.UserID
206
	Owner   string
207
	Contact string
208
}
209
210
// GetMessages fetches sent between 2 phone numbers
211
func (service *MessageService) GetMessages(ctx context.Context, params MessageGetParams) (*[]entities.Message, error) {
212
	ctx, span := service.tracer.Start(ctx)
213
	defer span.End()
214
215
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
216
217
	messages, err := service.repository.Index(ctx, params.UserID, params.Owner, params.Contact, params.IndexParams)
218
	if err != nil {
219
		msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
220
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
221
	}
222
223
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*messages), params))
224
	return messages, nil
225
}
226
227
// GetMessage fetches a message by the ID
228
func (service *MessageService) GetMessage(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
229
	ctx, span := service.tracer.Start(ctx)
230
	defer span.End()
231
232
	message, err := service.repository.Load(ctx, userID, messageID)
233
	if err != nil {
234
		msg := fmt.Sprintf("could not fetch messages with ID [%s]", messageID)
235
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
236
	}
237
238
	return message, nil
239
}
240
241
// MessageStoreEventParams parameters registering a message event
242
type MessageStoreEventParams struct {
243
	MessageID    uuid.UUID
244
	EventName    entities.MessageEventName
245
	Timestamp    time.Time
246
	ErrorMessage *string
247
	Source       string
248
}
249
250
// StoreEvent handles event generated by a mobile phone
251
func (service *MessageService) StoreEvent(ctx context.Context, message *entities.Message, params MessageStoreEventParams) (*entities.Message, error) {
252
	ctx, span := service.tracer.Start(ctx)
253
	defer span.End()
254
255
	var err error
256
257
	switch params.EventName {
258
	case entities.MessageEventNameSent:
259
		err = service.handleMessageSentEvent(ctx, params, message)
260
	case entities.MessageEventNameDelivered:
261
		err = service.handleMessageDeliveredEvent(ctx, params, message)
262
	case entities.MessageEventNameFailed:
263
		err = service.handleMessageFailedEvent(ctx, params, message)
264
	default:
265
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
266
	}
267
268
	if err != nil {
269
		msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
270
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
271
	}
272
273
	return service.repository.Load(ctx, message.UserID, params.MessageID)
274
}
275
276
// MessageReceiveParams parameters registering a message event
277
type MessageReceiveParams struct {
278
	Contact   string
279
	UserID    entities.UserID
280
	Owner     phonenumbers.PhoneNumber
281
	Content   string
282
	SIM       entities.SIM
283
	Timestamp time.Time
284
	Encrypted bool
285
	Source    string
286
}
287
288
// ReceiveMessage handles message received by a mobile phone
289
func (service *MessageService) ReceiveMessage(ctx context.Context, params *MessageReceiveParams) (*entities.Message, error) {
290
	ctx, span := service.tracer.Start(ctx)
291
	defer span.End()
292
293
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
294
295
	eventPayload := events.MessagePhoneReceivedPayload{
296
		MessageID: uuid.New(),
297
		UserID:    params.UserID,
298
		Encrypted: params.Encrypted,
299
		Owner:     phonenumbers.Format(&params.Owner, phonenumbers.E164),
300
		Contact:   params.Contact,
301
		Timestamp: params.Timestamp,
302
		Content:   params.Content,
303
		SIM:       params.SIM,
304
	}
305
306
	ctxLogger.Info(fmt.Sprintf("creating cloud event for received with ID [%s]", eventPayload.MessageID))
307
308
	event, err := service.createMessagePhoneReceivedEvent(params.Source, eventPayload)
309
	if err != nil {
310
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
311
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
312
	}
313
314
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.MessageID))
315
316
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
317
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
318
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
319
	}
320
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
321
322
	return service.storeReceivedMessage(ctx, eventPayload)
323
}
324
325
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
326
	ctx, span := service.tracer.Start(ctx)
327
	defer span.End()
328
329
	event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
330
		ID:        message.ID,
331
		Owner:     message.Owner,
332
		UserID:    message.UserID,
333
		RequestID: message.RequestID,
334
		Timestamp: params.Timestamp,
335
		Contact:   message.Contact,
336
		Encrypted: message.Encrypted,
337
		Content:   message.Content,
338
		SIM:       message.SIM,
339
	})
340
	if err != nil {
341
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
342
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
343
	}
344
345
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
346
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
347
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
348
	}
349
	return nil
350
}
351
352
func (service *MessageService) handleMessageDeliveredEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
353
	ctx, span := service.tracer.Start(ctx)
354
	defer span.End()
355
356
	event, err := service.createMessagePhoneDeliveredEvent(params.Source, events.MessagePhoneDeliveredPayload{
357
		ID:        message.ID,
358
		Owner:     message.Owner,
359
		UserID:    message.UserID,
360
		RequestID: message.RequestID,
361
		Timestamp: params.Timestamp,
362
		Encrypted: message.Encrypted,
363
		Contact:   message.Contact,
364
		Content:   message.Content,
365
		SIM:       message.SIM,
366
	})
367
	if err != nil {
368
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
369
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
370
	}
371
372
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, time.Second); err != nil {
373
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
374
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
375
	}
376
	return nil
377
}
378
379
func (service *MessageService) handleMessageFailedEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
380
	ctx, span := service.tracer.Start(ctx)
381
	defer span.End()
382
383
	errorMessage := "UNKNOWN ERROR"
384
	if params.ErrorMessage != nil {
385
		errorMessage = *params.ErrorMessage
386
	}
387
388
	event, err := service.createMessageSendFailedEvent(params.Source, events.MessageSendFailedPayload{
389
		ID:           message.ID,
390
		Owner:        message.Owner,
391
		ErrorMessage: errorMessage,
392
		Timestamp:    params.Timestamp,
393
		Encrypted:    message.Encrypted,
394
		Contact:      message.Contact,
395
		RequestID:    message.RequestID,
396
		UserID:       message.UserID,
397
		Content:      message.Content,
398
		SIM:          message.SIM,
399
	})
400
	if err != nil {
401
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessageSendFailed, message.ID)
402
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
403
	}
404
405
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
406
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
407
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
408
	}
409
	return nil
410
}
411
412
// MessageSendParams parameters for sending a new message
413
type MessageSendParams struct {
414
	Owner             *phonenumbers.PhoneNumber
415
	Contact           string
416
	Encrypted         bool
417
	Content           string
418
	Source            string
419
	SendAt            *time.Time
420
	RequestID         *string
421
	UserID            entities.UserID
422
	RequestReceivedAt time.Time
423
}
424
425
// SendMessage a new message
426
func (service *MessageService) SendMessage(ctx context.Context, params MessageSendParams) (*entities.Message, error) {
427
	ctx, span := service.tracer.Start(ctx)
428
	defer span.End()
429
430
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
431
432
	sendAttempts, sim := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
433
434
	eventPayload := events.MessageAPISentPayload{
435
		MessageID:         uuid.New(),
436
		UserID:            params.UserID,
437
		Encrypted:         params.Encrypted,
438
		MaxSendAttempts:   sendAttempts,
439
		RequestID:         params.RequestID,
440
		Owner:             phonenumbers.Format(params.Owner, phonenumbers.E164),
441
		Contact:           params.Contact,
442
		RequestReceivedAt: params.RequestReceivedAt,
443
		Content:           params.Content,
444
		ScheduledSendTime: params.SendAt,
445
		SIM:               sim,
446
	}
447
448
	event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
449
	if err != nil {
450
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
451
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
452
	}
453
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
454
455
	message, err := service.storeSentMessage(ctx, eventPayload)
456
	if err != nil {
457
		msg := fmt.Sprintf("cannot store message with id [%s]", eventPayload.MessageID)
458
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
459
	}
460
461
	timeout := service.getSendDelay(ctxLogger, eventPayload, params.SendAt)
462
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, timeout); err != nil {
463
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
464
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
465
	}
466
467
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s] and delay [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID, timeout))
468
	return message, err
469
}
470
471
// MissedCallParams parameters for sending a new message
472
type MissedCallParams struct {
473
	Owner     *phonenumbers.PhoneNumber
474
	Contact   string
475
	Source    string
476
	SIM       entities.SIM
477
	Timestamp time.Time
478
	UserID    entities.UserID
479
}
480
481
// RegisterMissedCall a new message
482
func (service *MessageService) RegisterMissedCall(ctx context.Context, params *MissedCallParams) (*entities.Message, error) {
483
	ctx, span := service.tracer.Start(ctx)
484
	defer span.End()
485
486
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
487
488
	eventPayload := &events.MessageCallMissedPayload{
489
		MessageID: uuid.New(),
490
		UserID:    params.UserID,
491
		Timestamp: params.Timestamp,
492
		Owner:     phonenumbers.Format(params.Owner, phonenumbers.E164),
493
		Contact:   params.Contact,
494
		SIM:       params.SIM,
495
	}
496
497
	event, err := service.createEvent(events.MessageCallMissed, params.Source, eventPayload)
498
	if err != nil {
499
		msg := fmt.Sprintf("cannot create [%T] from payload with message id [%s]", event, eventPayload.MessageID)
500
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
501
	}
502
503
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
504
505
	message, err := service.storeMissedCallMessage(ctx, eventPayload)
506
	if err != nil {
507
		msg := fmt.Sprintf("cannot store missed call message message with id [%s]", eventPayload.MessageID)
508
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
509
	}
510
511
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
512
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
513
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
514
	}
515
516
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
517
	return message, err
518
}
519
520
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, sendAt *time.Time) time.Duration {
521
	if sendAt == nil {
522
		return time.Duration(0)
523
	}
524
525
	delay := sendAt.Sub(time.Now().UTC())
526
	if delay < 0 {
527
		ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, sendAt.String()))
528
		return time.Duration(0)
529
	}
530
531
	return delay
532
}
533
534
// StoreReceivedMessage a new message
535
func (service *MessageService) storeReceivedMessage(ctx context.Context, params events.MessagePhoneReceivedPayload) (*entities.Message, error) {
536
	ctx, span := service.tracer.Start(ctx)
537
	defer span.End()
538
539
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
540
541
	message := &entities.Message{
542
		ID:                params.MessageID,
543
		Owner:             params.Owner,
544
		UserID:            params.UserID,
545
		Contact:           params.Contact,
546
		Content:           params.Content,
547
		SIM:               params.SIM,
548
		Encrypted:         params.Encrypted,
549
		Type:              entities.MessageTypeMobileOriginated,
550
		Status:            entities.MessageStatusReceived,
551
		RequestReceivedAt: params.Timestamp,
552
		CreatedAt:         time.Now().UTC(),
553
		UpdatedAt:         time.Now().UTC(),
554
		OrderTimestamp:    params.Timestamp,
555
		ReceivedAt:        &params.Timestamp,
556
	}
557
558
	if err := service.repository.Store(ctx, message); err != nil {
559
		msg := fmt.Sprintf("cannot save message with id [%s]", params.MessageID)
560
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
561
	}
562
563
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", message.ID))
564
	return message, nil
565
}
566
567
// HandleMessageParams are parameters for handling a message event
568
type HandleMessageParams struct {
569
	ID        uuid.UUID
570
	Source    string
571
	UserID    entities.UserID
572
	Timestamp time.Time
573
}
574
575
// HandleMessageSending handles when a message is being sent
576
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
577
	ctx, span := service.tracer.Start(ctx)
578
	defer span.End()
579
580
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
581
582
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
583
	if err != nil {
584
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
585
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
586
	}
587
588
	if !message.IsSending() {
589
		msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
590
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
591
	}
592
593
	if err = service.repository.Update(ctx, message.AddSendAttempt(params.Timestamp)); err != nil {
594
		msg := fmt.Sprintf("cannot update message with id [%s] after sending", message.ID)
595
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
596
	}
597
598
	ctxLogger.Info(fmt.Sprintf("message with id [%s] updated after adding send attempt", message.ID))
599
	return nil
600
}
601
602
// HandleMessageSent handles when a message has been sent by a mobile phone
603
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
604
	ctx, span := service.tracer.Start(ctx)
605
	defer span.End()
606
607
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
608
609
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
610
	if err != nil {
611
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
612
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
613
	}
614
615
	if message.IsSent() || message.IsDelivered() {
616
		ctxLogger.Info(fmt.Sprintf("message [%s] for [%s] has already been processed with status [%s]", message.ID, message.UserID, message.Status))
617
		return nil
618
	}
619
620
	if !message.IsSending() && !message.IsExpired() {
621
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusExpired)
622
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
623
	}
624
625
	if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
626
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
627
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
628
	}
629
630
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
631
	return nil
632
}
633
634
// HandleMessageFailedParams are parameters for handling a failed message event
635
type HandleMessageFailedParams struct {
636
	ID           uuid.UUID
637
	UserID       entities.UserID
638
	ErrorMessage string
639
	Timestamp    time.Time
640
}
641
642
// HandleMessageFailed handles when a message could not be sent by a mobile phone
643
func (service *MessageService) HandleMessageFailed(ctx context.Context, params HandleMessageFailedParams) error {
644
	ctx, span := service.tracer.Start(ctx)
645
	defer span.End()
646
647
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
648
649
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
650
	if err != nil {
651
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
652
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
653
	}
654
655
	if message.IsDelivered() {
656
		msg := fmt.Sprintf("message has already been delivered with status [%s]", message.Status)
657
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
658
	}
659
660
	if err = service.repository.Update(ctx, message.Failed(params.Timestamp, params.ErrorMessage)); err != nil {
661
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
662
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
663
	}
664
665
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
666
	return nil
667
}
668
669
// HandleMessageDelivered handles when a message is has been delivered by a mobile phone
670
func (service *MessageService) HandleMessageDelivered(ctx context.Context, params HandleMessageParams) error {
671
	ctx, span := service.tracer.Start(ctx)
672
	defer span.End()
673
674
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
675
676
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
677
	if err != nil {
678
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
679
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
680
	}
681
682
	if !message.IsSent() && !message.IsSending() && !message.IsExpired() && !message.IsScheduled() {
683
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s, %s]", message.Status, entities.MessageStatusSent, entities.MessageStatusScheduled, entities.MessageStatusSending, entities.MessageStatusExpired)
684
		ctxLogger.Warn(stacktrace.NewError(msg))
685
		return nil
686
	}
687
688
	if err = service.repository.Update(ctx, message.Delivered(params.Timestamp)); err != nil {
689
		msg := fmt.Sprintf("cannot update message with id [%s] as delivered", message.ID)
690
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
691
	}
692
693
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
694
	return nil
695
}
696
697
// HandleMessageNotificationScheduled handles the event when the notification of a message has been scheduled
698
func (service *MessageService) HandleMessageNotificationScheduled(ctx context.Context, params HandleMessageParams) error {
699
	ctx, span := service.tracer.Start(ctx)
700
	defer span.End()
701
702
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
703
704
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
705
	if err != nil {
706
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
707
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
708
	}
709
710
	if !message.IsPending() && !message.IsExpired() && !message.IsSending() {
711
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("received scheduled event for message with id [%s] message has status [%s]", message.ID, message.Status)))
712
	}
713
714
	if err = service.repository.Update(ctx, message.NotificationScheduled(params.Timestamp)); err != nil {
715
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
716
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
717
	}
718
719
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been scheduled to send at [%s]", message.ID, message.NotificationScheduledAt.String()))
720
	return nil
721
}
722
723
// HandleMessageNotificationSent handles the event when the notification of a message has been sent
724
func (service *MessageService) HandleMessageNotificationSent(ctx context.Context, params HandleMessageParams) error {
725
	ctx, span := service.tracer.Start(ctx)
726
	defer span.End()
727
728
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
729
730
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
731
	if err != nil {
732
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
733
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
734
	}
735
736
	if err = service.repository.Update(ctx, message.AddSendAttemptCount()); err != nil {
737
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
738
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
739
	}
740
741
	ctxLogger.Info(fmt.Sprintf("notification for message with id [%s] has been sent at [%s]", message.ID, params.Timestamp.String()))
742
	return nil
743
}
744
745
// HandleMessageExpired handles when a message is has been expired
746
func (service *MessageService) HandleMessageExpired(ctx context.Context, params HandleMessageParams) error {
747
	ctx, span := service.tracer.Start(ctx)
748
	defer span.End()
749
750
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
751
752
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
753
	if err != nil {
754
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
755
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
756
	}
757
758
	if !message.IsSending() && !message.IsScheduled() && !message.IsPending() {
759
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusScheduled, entities.MessageStatusPending)
760
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
761
	}
762
763
	if err = service.repository.Update(ctx, message.Expired(params.Timestamp)); err != nil {
764
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
765
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
766
	}
767
768
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
769
770
	if !message.CanBeRescheduled() {
771
		return nil
772
	}
773
774
	event, err := service.createMessageSendRetryEvent(params.Source, &events.MessageSendRetryPayload{
775
		MessageID: message.ID,
776
		Timestamp: time.Now().UTC(),
777
		Contact:   message.Contact,
778
		Owner:     message.Owner,
779
		Encrypted: message.Encrypted,
780
		UserID:    message.UserID,
781
		Content:   message.Content,
782
		SIM:       message.SIM,
783
	})
784
	if err != nil {
785
		msg := fmt.Sprintf("cannot create [%s] event for expired message with ID [%s]", events.EventTypeMessageSendRetry, message.ID)
786
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
787
	}
788
789
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
790
		msg := fmt.Sprintf("cannot dispatch [%s] event for message with ID [%s]", event.Type(), message.ID)
791
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
792
	}
793
794
	ctxLogger.Info(fmt.Sprintf("retried sending message with ID [%s]", message.ID))
795
	return nil
796
}
797
798
// MessageScheduleExpirationParams are parameters for scheduling the expiration of a message event
799
type MessageScheduleExpirationParams struct {
800
	MessageID                 uuid.UUID
801
	UserID                    entities.UserID
802
	NotificationSentAt        time.Time
803
	PhoneID                   uuid.UUID
804
	MessageExpirationDuration time.Duration
805
	Source                    string
806
}
807
808
// ScheduleExpirationCheck schedules an event to check if a message is expired
809
func (service *MessageService) ScheduleExpirationCheck(ctx context.Context, params MessageScheduleExpirationParams) error {
810
	ctx, span := service.tracer.Start(ctx)
811
	defer span.End()
812
813
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
814
815
	if params.MessageExpirationDuration == 0 {
816
		ctxLogger.Info(fmt.Sprintf("message expiration duration not set for message [%s] using phone [%s]", params.MessageID, params.PhoneID))
817
		return nil
818
	}
819
820
	event, err := service.createMessageSendExpiredCheckEvent(params.Source, &events.MessageSendExpiredCheckPayload{
821
		MessageID:   params.MessageID,
822
		ScheduledAt: params.NotificationSentAt.Add(params.MessageExpirationDuration),
823
		UserID:      params.UserID,
824
	})
825
	if err != nil {
826
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpiredCheck, params.MessageID)
827
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
828
	}
829
830
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, params.MessageExpirationDuration); err != nil {
831
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
832
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
833
	}
834
835
	ctxLogger.Info(fmt.Sprintf("scheduled message id [%s] to expire at [%s]", params.MessageID, params.NotificationSentAt.Add(params.MessageExpirationDuration)))
836
	return nil
837
}
838
839
// MessageCheckExpired are parameters for checking if a message is expired
840
type MessageCheckExpired struct {
841
	MessageID uuid.UUID
842
	UserID    entities.UserID
843
	Source    string
844
}
845
846
// CheckExpired checks if a message has expired
847
func (service *MessageService) CheckExpired(ctx context.Context, params MessageCheckExpired) error {
848
	ctx, span := service.tracer.Start(ctx)
849
	defer span.End()
850
851
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
852
853
	message, err := service.repository.Load(ctx, params.UserID, params.MessageID)
854
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
855
		ctxLogger.Info(fmt.Sprintf("message has been deleted for userID [%s] and messageID [%s]", params.UserID, params.MessageID))
856
		return nil
857
	}
858
859
	if err != nil {
860
		msg := fmt.Sprintf("cannot load message with userID [%s] and messageID [%s]", params.UserID, params.MessageID)
861
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
862
	}
863
864
	if !message.IsPending() && !message.IsSending() && !message.IsScheduled() {
865
		ctxLogger.Info(fmt.Sprintf("message with ID [%s] has status [%s] and is not expired", message.ID, message.Status))
866
		return nil
867
	}
868
869
	event, err := service.createMessageSendExpiredEvent(params.Source, events.MessageSendExpiredPayload{
870
		MessageID:        message.ID,
871
		Owner:            message.Owner,
872
		Contact:          message.Contact,
873
		Encrypted:        message.Encrypted,
874
		RequestID:        message.RequestID,
875
		IsFinal:          message.SendAttemptCount == message.MaxSendAttempts,
876
		SendAttemptCount: message.SendAttemptCount,
877
		UserID:           message.UserID,
878
		Timestamp:        time.Now().UTC(),
879
		Content:          message.Content,
880
		SIM:              message.SIM,
881
	})
882
	if err != nil {
883
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpired, params.MessageID)
884
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
885
	}
886
887
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
888
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
889
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
890
	}
891
892
	ctxLogger.Info(fmt.Sprintf("message [%s] has expired with status [%s]", params.MessageID, message.Status))
893
	return nil
894
}
895
896
// MessageSearchParams are parameters for searching messages
897
type MessageSearchParams struct {
898
	repositories.IndexParams
899
	UserID   entities.UserID
900
	Owners   []string
901
	Types    []entities.MessageType
902
	Statuses []entities.MessageStatus
903
}
904
905
// SearchMessages fetches all the messages for a user
906
func (service *MessageService) SearchMessages(ctx context.Context, params *MessageSearchParams) ([]*entities.Message, error) {
907
	ctx, span := service.tracer.Start(ctx)
908
	defer span.End()
909
910
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
911
912
	messages, err := service.repository.Search(ctx, params.UserID, params.Owners, params.Types, params.Statuses, params.IndexParams)
913
	if err != nil {
914
		msg := fmt.Sprintf("could not search messages with parms [%+#v]", params)
915
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
916
	}
917
918
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(messages), params))
919
	return messages, nil
920
}
921
922
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM) {
923
	ctx, span := service.tracer.Start(ctx)
924
	defer span.End()
925
926
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
927
928
	phone, err := service.phoneService.Load(ctx, userID, owner)
929
	if err != nil {
930
		msg := fmt.Sprintf("cannot load phone for userID [%s] and owner [%s]. using default max send attempt of 2", userID, owner)
931
		ctxLogger.Error(stacktrace.Propagate(err, msg))
932
		return 2, entities.SIM1
933
	}
934
935
	return phone.MaxSendAttemptsSanitized(), phone.SIM
936
}
937
938
// storeSentMessage a new message
939
func (service *MessageService) storeSentMessage(ctx context.Context, payload events.MessageAPISentPayload) (*entities.Message, error) {
940
	ctx, span := service.tracer.Start(ctx)
941
	defer span.End()
942
943
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
944
945
	timestamp := payload.RequestReceivedAt
946
	if payload.ScheduledSendTime != nil {
947
		timestamp = *payload.ScheduledSendTime
948
	}
949
950
	message := &entities.Message{
951
		ID:                payload.MessageID,
952
		Owner:             payload.Owner,
953
		Contact:           payload.Contact,
954
		UserID:            payload.UserID,
955
		Content:           payload.Content,
956
		RequestID:         payload.RequestID,
957
		SIM:               payload.SIM,
958
		Encrypted:         payload.Encrypted,
959
		ScheduledSendTime: payload.ScheduledSendTime,
960
		Type:              entities.MessageTypeMobileTerminated,
961
		Status:            entities.MessageStatusPending,
962
		RequestReceivedAt: payload.RequestReceivedAt,
963
		CreatedAt:         time.Now().UTC(),
964
		UpdatedAt:         time.Now().UTC(),
965
		MaxSendAttempts:   payload.MaxSendAttempts,
966
		OrderTimestamp:    timestamp,
967
	}
968
969
	if err := service.repository.Store(ctx, message); err != nil {
970
		msg := fmt.Sprintf("cannot save message with id [%s]", payload.MessageID)
971
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
972
	}
973
974
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", payload.MessageID))
975
	return message, nil
976
}
977
978
// storeMissedCallMessage a new message
979
func (service *MessageService) storeMissedCallMessage(ctx context.Context, payload *events.MessageCallMissedPayload) (*entities.Message, error) {
980
	ctx, span := service.tracer.Start(ctx)
981
	defer span.End()
982
983
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
984
985
	message := &entities.Message{
986
		ID:                payload.MessageID,
987
		Owner:             payload.Owner,
988
		Contact:           payload.Contact,
989
		UserID:            payload.UserID,
990
		SIM:               payload.SIM,
991
		Type:              entities.MessageTypeCallMissed,
992
		Status:            entities.MessageStatusReceived,
993
		RequestReceivedAt: payload.Timestamp,
994
		CreatedAt:         time.Now().UTC(),
995
		UpdatedAt:         time.Now().UTC(),
996
		OrderTimestamp:    payload.Timestamp,
997
	}
998
999
	if err := service.repository.Store(ctx, message); err != nil {
1000
		msg := fmt.Sprintf("cannot save missed call message with id [%s]", payload.MessageID)
1001
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
1002
	}
1003
1004
	ctxLogger.Info(fmt.Sprintf("missed call message saved with id [%s]", payload.MessageID))
1005
	return message, nil
1006
}
1007
1008
func (service *MessageService) createMessageSendExpiredEvent(source string, payload events.MessageSendExpiredPayload) (cloudevents.Event, error) {
1009
	return service.createEvent(events.EventTypeMessageSendExpired, source, payload)
1010
}
1011
1012
func (service *MessageService) createMessageSendExpiredCheckEvent(source string, payload *events.MessageSendExpiredCheckPayload) (cloudevents.Event, error) {
1013
	return service.createEvent(events.EventTypeMessageSendExpiredCheck, source, payload)
1014
}
1015
1016
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
1017
	return service.createEvent(events.EventTypeMessageAPISent, source, payload)
1018
}
1019
1020
func (service *MessageService) createMessagePhoneReceivedEvent(source string, payload events.MessagePhoneReceivedPayload) (cloudevents.Event, error) {
1021
	return service.createEvent(events.EventTypeMessagePhoneReceived, source, payload)
1022
}
1023
1024
func (service *MessageService) createMessagePhoneSendingEvent(source string, payload events.MessagePhoneSendingPayload) (cloudevents.Event, error) {
1025
	return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
1026
}
1027
1028
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
1029
	return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
1030
}
1031
1032
func (service *MessageService) createMessageSendFailedEvent(source string, payload events.MessageSendFailedPayload) (cloudevents.Event, error) {
1033
	return service.createEvent(events.EventTypeMessageSendFailed, source, payload)
1034
}
1035
1036
func (service *MessageService) createMessagePhoneDeliveredEvent(source string, payload events.MessagePhoneDeliveredPayload) (cloudevents.Event, error) {
1037
	return service.createEvent(events.EventTypeMessagePhoneDelivered, source, payload)
1038
}
1039
1040
func (service *MessageService) createMessageSendRetryEvent(source string, payload *events.MessageSendRetryPayload) (cloudevents.Event, error) {
1041
	return service.createEvent(events.EventTypeMessageSendRetry, source, payload)
1042
}
1043