Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

services.*MessageService.DeleteAllForUser   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
dl 0
loc 11
rs 10
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"strings"
7
	"time"
8
9
	"github.com/davecgh/go-spew/spew"
10
11
	"github.com/nyaruka/phonenumbers"
12
13
	"github.com/NdoleStudio/httpsms/pkg/events"
14
	"github.com/NdoleStudio/httpsms/pkg/repositories"
15
	cloudevents "github.com/cloudevents/sdk-go/v2"
16
	"github.com/google/uuid"
17
	"github.com/palantir/stacktrace"
18
19
	"github.com/NdoleStudio/httpsms/pkg/entities"
20
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
21
)
22
23
// MessageService is handles message requests
24
type MessageService struct {
25
	service
26
	logger          telemetry.Logger
27
	tracer          telemetry.Tracer
28
	eventDispatcher *EventDispatcher
29
	phoneService    *PhoneService
30
	repository      repositories.MessageRepository
31
}
32
33
// NewMessageService creates a new MessageService
34
func NewMessageService(
35
	logger telemetry.Logger,
36
	tracer telemetry.Tracer,
37
	repository repositories.MessageRepository,
38
	eventDispatcher *EventDispatcher,
39
	phoneService *PhoneService,
40
) (s *MessageService) {
41
	return &MessageService{
42
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
43
		tracer:          tracer,
44
		repository:      repository,
45
		phoneService:    phoneService,
46
		eventDispatcher: eventDispatcher,
47
	}
48
}
49
50
// MessageGetOutstandingParams parameters for sending a new message
51
type MessageGetOutstandingParams struct {
52
	Source    string
53
	UserID    entities.UserID
54
	Timestamp time.Time
55
	MessageID uuid.UUID
56
}
57
58
// GetOutstanding fetches messages that still to be sent to the phone
59
func (service *MessageService) GetOutstanding(ctx context.Context, params MessageGetOutstandingParams) (*entities.Message, error) {
60
	ctx, span := service.tracer.Start(ctx)
61
	defer span.End()
62
63
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
64
65
	message, err := service.repository.GetOutstanding(ctx, params.UserID, params.MessageID)
66
	if err != nil {
67
		msg := fmt.Sprintf("could not fetch outstanding messages with params [%s]", spew.Sdump(params))
68
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
69
	}
70
71
	event, err := service.createMessagePhoneSendingEvent(params.Source, events.MessagePhoneSendingPayload{
72
		ID:        message.ID,
73
		Owner:     message.Owner,
74
		Contact:   message.Contact,
75
		Timestamp: params.Timestamp,
76
		Encrypted: message.Encrypted,
77
		UserID:    message.UserID,
78
		Content:   message.Content,
79
		SIM:       message.SIM,
80
	})
81
	if err != nil {
82
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
83
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
84
	}
85
86
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
87
88
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
89
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
90
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
91
	}
92
93
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
94
	return message, nil
95
}
96
97
// DeleteAllForUser deletes all entities.Message for an entities.UserID.
98
func (service *MessageService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
99
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
100
	defer span.End()
101
102
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
103
		msg := fmt.Sprintf("could not delete [entities.Message] for user with ID [%s]", userID)
104
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
105
	}
106
107
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.Message] for user with ID [%s]", userID))
108
	return nil
109
}
110
111
// DeleteMessage deletes a message from the database
112
func (service *MessageService) DeleteMessage(ctx context.Context, source string, message *entities.Message) error {
113
	ctx, span := service.tracer.Start(ctx)
114
	defer span.End()
115
116
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
117
118
	if err := service.repository.Delete(ctx, message.UserID, message.ID); err != nil {
119
		msg := fmt.Sprintf("could not delete message with ID [%s] for user wit ID [%s]", message.ID, message.UserID)
120
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
121
	}
122
123
	var prevID *uuid.UUID
124
	var prevStatus *entities.MessageStatus
125
	var prevContent *string
126
	previousMessage, err := service.repository.LastMessage(ctx, message.UserID, message.Owner, message.Contact)
127
	if err == nil {
128
		prevID = &previousMessage.ID
129
		prevStatus = &previousMessage.Status
130
		prevContent = &previousMessage.Content
131
	}
132
133
	event, err := service.createEvent(events.MessageAPIDeleted, source, &events.MessageAPIDeletedPayload{
134
		MessageID:              message.ID,
135
		UserID:                 message.UserID,
136
		Owner:                  message.Owner,
137
		Encrypted:              message.Encrypted,
138
		RequestID:              message.RequestID,
139
		Contact:                message.Contact,
140
		Timestamp:              time.Now().UTC(),
141
		Content:                message.Content,
142
		PreviousMessageID:      prevID,
143
		PreviousMessageStatus:  prevStatus,
144
		PreviousMessageContent: prevContent,
145
		SIM:                    message.SIM,
146
	})
147
	if err != nil {
148
		msg := fmt.Sprintf("cannot create [%T] for message with ID [%s]", event, message.ID)
149
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
150
	}
151
152
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
153
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
154
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
155
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
156
	}
157
158
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] with id [%s] for message [%s]", event.Type(), event.ID(), message.ID))
159
	return nil
160
}
161
162
// DeleteByOwnerAndContact deletes all the messages between an owner and a contact
163
func (service *MessageService) DeleteByOwnerAndContact(ctx context.Context, userID entities.UserID, owner, contact string) error {
164
	ctx, span := service.tracer.Start(ctx)
165
	defer span.End()
166
167
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
168
169
	if err := service.repository.DeleteByOwnerAndContact(ctx, userID, owner, contact); err != nil {
170
		msg := fmt.Sprintf("could not all delete messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact)
171
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
172
	}
173
174
	ctxLogger.Info(fmt.Sprintf("deleted all messages for user with ID [%s] between owner [%s] and contact [%s] ", userID, owner, contact))
175
	return nil
176
}
177
178
// RespondToMissedCall creates an SMS response to a missed phone call on the android phone
179
func (service *MessageService) RespondToMissedCall(ctx context.Context, source string, payload *events.MessageCallMissedPayload) error {
180
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
181
	defer span.End()
182
183
	phone, err := service.phoneService.Load(ctx, payload.UserID, payload.Owner)
184
	if err != nil {
185
		msg := fmt.Sprintf("cannot find phone with owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
186
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
187
	}
188
189
	if phone.MissedCallAutoReply == nil || strings.TrimSpace(*phone.MissedCallAutoReply) == "" {
190
		ctxLogger.Info(fmt.Sprintf("no auto reply set for phone [%s] for message [%s] with user [%s]", payload.Owner, payload.MessageID, payload.UserID))
191
		return nil
192
	}
193
194
	requestID := fmt.Sprintf("missed-call-%s", payload.MessageID)
195
	owner, _ := phonenumbers.Parse(payload.Owner, phonenumbers.UNKNOWN_REGION)
196
	message, err := service.SendMessage(ctx, MessageSendParams{
197
		Owner:             owner,
198
		Contact:           payload.Contact,
199
		Encrypted:         false,
200
		Content:           *phone.MissedCallAutoReply,
201
		Source:            source,
202
		SendAt:            nil,
203
		RequestID:         &requestID,
204
		UserID:            payload.UserID,
205
		RequestReceivedAt: time.Now().UTC(),
206
	})
207
	if err != nil {
208
		msg := fmt.Sprintf("cannot send auto response message for owner [%s] for user with ID [%s] when handling missed phone call message [%s]", payload.Owner, payload.UserID, payload.MessageID)
209
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
210
	}
211
212
	ctxLogger.Info(fmt.Sprintf("created response message with ID [%s] for missed call event [%s] for user [%s]", message.ID, payload.MessageID, message.UserID))
213
	return nil
214
}
215
216
// MessageGetParams parameters for sending a new message
217
type MessageGetParams struct {
218
	repositories.IndexParams
219
	UserID  entities.UserID
220
	Owner   string
221
	Contact string
222
}
223
224
// GetMessages fetches sent between 2 phone numbers
225
func (service *MessageService) GetMessages(ctx context.Context, params MessageGetParams) (*[]entities.Message, error) {
226
	ctx, span := service.tracer.Start(ctx)
227
	defer span.End()
228
229
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
230
231
	messages, err := service.repository.Index(ctx, params.UserID, params.Owner, params.Contact, params.IndexParams)
232
	if err != nil {
233
		msg := fmt.Sprintf("could not fetch messages with parms [%+#v]", params)
234
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
235
	}
236
237
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(*messages), params))
238
	return messages, nil
239
}
240
241
// GetMessage fetches a message by the ID
242
func (service *MessageService) GetMessage(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
243
	ctx, span := service.tracer.Start(ctx)
244
	defer span.End()
245
246
	message, err := service.repository.Load(ctx, userID, messageID)
247
	if err != nil {
248
		msg := fmt.Sprintf("could not fetch messages with ID [%s]", messageID)
249
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
250
	}
251
252
	return message, nil
253
}
254
255
// MessageStoreEventParams parameters registering a message event
256
type MessageStoreEventParams struct {
257
	MessageID    uuid.UUID
258
	EventName    entities.MessageEventName
259
	Timestamp    time.Time
260
	ErrorMessage *string
261
	Source       string
262
}
263
264
// StoreEvent handles event generated by a mobile phone
265
func (service *MessageService) StoreEvent(ctx context.Context, message *entities.Message, params MessageStoreEventParams) (*entities.Message, error) {
266
	ctx, span := service.tracer.Start(ctx)
267
	defer span.End()
268
269
	var err error
270
271
	switch params.EventName {
272
	case entities.MessageEventNameSent:
273
		err = service.handleMessageSentEvent(ctx, params, message)
274
	case entities.MessageEventNameDelivered:
275
		err = service.handleMessageDeliveredEvent(ctx, params, message)
276
	case entities.MessageEventNameFailed:
277
		err = service.handleMessageFailedEvent(ctx, params, message)
278
	default:
279
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.NewError(fmt.Sprintf("cannot handle message event [%s]", params.EventName)))
280
	}
281
282
	if err != nil {
283
		msg := fmt.Sprintf("could not handle phone event [%s] for message with id [%s]", params.EventName, message.ID)
284
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
285
	}
286
287
	return service.repository.Load(ctx, message.UserID, params.MessageID)
288
}
289
290
// MessageReceiveParams parameters registering a message event
291
type MessageReceiveParams struct {
292
	Contact   string
293
	UserID    entities.UserID
294
	Owner     phonenumbers.PhoneNumber
295
	Content   string
296
	SIM       entities.SIM
297
	Timestamp time.Time
298
	Encrypted bool
299
	Source    string
300
}
301
302
// ReceiveMessage handles message received by a mobile phone
303
func (service *MessageService) ReceiveMessage(ctx context.Context, params *MessageReceiveParams) (*entities.Message, error) {
304
	ctx, span := service.tracer.Start(ctx)
305
	defer span.End()
306
307
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
308
309
	eventPayload := events.MessagePhoneReceivedPayload{
310
		MessageID: uuid.New(),
311
		UserID:    params.UserID,
312
		Encrypted: params.Encrypted,
313
		Owner:     phonenumbers.Format(&params.Owner, phonenumbers.E164),
314
		Contact:   params.Contact,
315
		Timestamp: params.Timestamp,
316
		Content:   params.Content,
317
		SIM:       params.SIM,
318
	}
319
320
	ctxLogger.Info(fmt.Sprintf("creating cloud event for received with ID [%s]", eventPayload.MessageID))
321
322
	event, err := service.createMessagePhoneReceivedEvent(params.Source, eventPayload)
323
	if err != nil {
324
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
325
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
326
	}
327
328
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s]", event.Type(), event.ID(), eventPayload.MessageID))
329
330
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
331
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
332
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
333
	}
334
	ctxLogger.Info(fmt.Sprintf("event [%s] dispatched succesfully", event.ID()))
335
336
	return service.storeReceivedMessage(ctx, eventPayload)
337
}
338
339
func (service *MessageService) handleMessageSentEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
340
	ctx, span := service.tracer.Start(ctx)
341
	defer span.End()
342
343
	event, err := service.createMessagePhoneSentEvent(params.Source, events.MessagePhoneSentPayload{
344
		ID:        message.ID,
345
		Owner:     message.Owner,
346
		UserID:    message.UserID,
347
		RequestID: message.RequestID,
348
		Timestamp: params.Timestamp,
349
		Contact:   message.Contact,
350
		Encrypted: message.Encrypted,
351
		Content:   message.Content,
352
		SIM:       message.SIM,
353
	})
354
	if err != nil {
355
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
356
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
357
	}
358
359
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
360
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
361
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
362
	}
363
	return nil
364
}
365
366
func (service *MessageService) handleMessageDeliveredEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
367
	ctx, span := service.tracer.Start(ctx)
368
	defer span.End()
369
370
	event, err := service.createMessagePhoneDeliveredEvent(params.Source, events.MessagePhoneDeliveredPayload{
371
		ID:        message.ID,
372
		Owner:     message.Owner,
373
		UserID:    message.UserID,
374
		RequestID: message.RequestID,
375
		Timestamp: params.Timestamp,
376
		Encrypted: message.Encrypted,
377
		Contact:   message.Contact,
378
		Content:   message.Content,
379
		SIM:       message.SIM,
380
	})
381
	if err != nil {
382
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessagePhoneSent, message.ID)
383
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
384
	}
385
386
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, time.Second); err != nil {
387
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s] for message [%s]", event.Type(), event.ID(), message.ID)
388
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
389
	}
390
	return nil
391
}
392
393
func (service *MessageService) handleMessageFailedEvent(ctx context.Context, params MessageStoreEventParams, message *entities.Message) error {
394
	ctx, span := service.tracer.Start(ctx)
395
	defer span.End()
396
397
	errorMessage := "UNKNOWN ERROR"
398
	if params.ErrorMessage != nil {
399
		errorMessage = *params.ErrorMessage
400
	}
401
402
	event, err := service.createMessageSendFailedEvent(params.Source, events.MessageSendFailedPayload{
403
		ID:           message.ID,
404
		Owner:        message.Owner,
405
		ErrorMessage: errorMessage,
406
		Timestamp:    params.Timestamp,
407
		Encrypted:    message.Encrypted,
408
		Contact:      message.Contact,
409
		RequestID:    message.RequestID,
410
		UserID:       message.UserID,
411
		Content:      message.Content,
412
		SIM:          message.SIM,
413
	})
414
	if err != nil {
415
		msg := fmt.Sprintf("cannot create event [%s] for message [%s]", events.EventTypeMessageSendFailed, message.ID)
416
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
417
	}
418
419
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
420
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
421
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
422
	}
423
	return nil
424
}
425
426
// MessageSendParams parameters for sending a new message
427
type MessageSendParams struct {
428
	Owner             *phonenumbers.PhoneNumber
429
	Contact           string
430
	Encrypted         bool
431
	Content           string
432
	Source            string
433
	SendAt            *time.Time
434
	RequestID         *string
435
	UserID            entities.UserID
436
	RequestReceivedAt time.Time
437
}
438
439
// SendMessage a new message
440
func (service *MessageService) SendMessage(ctx context.Context, params MessageSendParams) (*entities.Message, error) {
441
	ctx, span := service.tracer.Start(ctx)
442
	defer span.End()
443
444
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
445
446
	sendAttempts, sim := service.phoneSettings(ctx, params.UserID, phonenumbers.Format(params.Owner, phonenumbers.E164))
447
448
	eventPayload := events.MessageAPISentPayload{
449
		MessageID:         uuid.New(),
450
		UserID:            params.UserID,
451
		Encrypted:         params.Encrypted,
452
		MaxSendAttempts:   sendAttempts,
453
		RequestID:         params.RequestID,
454
		Owner:             phonenumbers.Format(params.Owner, phonenumbers.E164),
455
		Contact:           params.Contact,
456
		RequestReceivedAt: params.RequestReceivedAt,
457
		Content:           params.Content,
458
		ScheduledSendTime: params.SendAt,
459
		SIM:               sim,
460
	}
461
462
	event, err := service.createMessageAPISentEvent(params.Source, eventPayload)
463
	if err != nil {
464
		msg := fmt.Sprintf("cannot create %T from payload with message id [%s]", event, eventPayload.MessageID)
465
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
466
	}
467
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
468
469
	message, err := service.storeSentMessage(ctx, eventPayload)
470
	if err != nil {
471
		msg := fmt.Sprintf("cannot store message with id [%s]", eventPayload.MessageID)
472
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
473
	}
474
475
	timeout := service.getSendDelay(ctxLogger, eventPayload, params.SendAt)
476
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, timeout); err != nil {
477
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
478
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
479
	}
480
481
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s] and delay [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID, timeout))
482
	return message, err
483
}
484
485
// MissedCallParams parameters for sending a new message
486
type MissedCallParams struct {
487
	Owner     *phonenumbers.PhoneNumber
488
	Contact   string
489
	Source    string
490
	SIM       entities.SIM
491
	Timestamp time.Time
492
	UserID    entities.UserID
493
}
494
495
// RegisterMissedCall a new message
496
func (service *MessageService) RegisterMissedCall(ctx context.Context, params *MissedCallParams) (*entities.Message, error) {
497
	ctx, span := service.tracer.Start(ctx)
498
	defer span.End()
499
500
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
501
502
	eventPayload := &events.MessageCallMissedPayload{
503
		MessageID: uuid.New(),
504
		UserID:    params.UserID,
505
		Timestamp: params.Timestamp,
506
		Owner:     phonenumbers.Format(params.Owner, phonenumbers.E164),
507
		Contact:   params.Contact,
508
		SIM:       params.SIM,
509
	}
510
511
	event, err := service.createEvent(events.MessageCallMissed, params.Source, eventPayload)
512
	if err != nil {
513
		msg := fmt.Sprintf("cannot create [%T] from payload with message id [%s]", event, eventPayload.MessageID)
514
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
515
	}
516
517
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] and message id [%s] and user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
518
519
	message, err := service.storeMissedCallMessage(ctx, eventPayload)
520
	if err != nil {
521
		msg := fmt.Sprintf("cannot store missed call message message with id [%s]", eventPayload.MessageID)
522
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
523
	}
524
525
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
526
		msg := fmt.Sprintf("cannot dispatch event type [%s] and id [%s]", event.Type(), event.ID())
527
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
528
	}
529
530
	ctxLogger.Info(fmt.Sprintf("[%s] event with ID [%s] dispatched succesfully for message [%s] with user [%s]", event.Type(), event.ID(), eventPayload.MessageID, eventPayload.UserID))
531
	return message, err
532
}
533
534
func (service *MessageService) getSendDelay(ctxLogger telemetry.Logger, eventPayload events.MessageAPISentPayload, sendAt *time.Time) time.Duration {
535
	if sendAt == nil {
536
		return time.Duration(0)
537
	}
538
539
	delay := sendAt.Sub(time.Now().UTC())
540
	if delay < 0 {
541
		ctxLogger.Info(fmt.Sprintf("message [%s] has send time [%s] in the past. sending immediately", eventPayload.MessageID, sendAt.String()))
542
		return time.Duration(0)
543
	}
544
545
	return delay
546
}
547
548
// StoreReceivedMessage a new message
549
func (service *MessageService) storeReceivedMessage(ctx context.Context, params events.MessagePhoneReceivedPayload) (*entities.Message, error) {
550
	ctx, span := service.tracer.Start(ctx)
551
	defer span.End()
552
553
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
554
555
	message := &entities.Message{
556
		ID:                params.MessageID,
557
		Owner:             params.Owner,
558
		UserID:            params.UserID,
559
		Contact:           params.Contact,
560
		Content:           params.Content,
561
		SIM:               params.SIM,
562
		Encrypted:         params.Encrypted,
563
		Type:              entities.MessageTypeMobileOriginated,
564
		Status:            entities.MessageStatusReceived,
565
		RequestReceivedAt: params.Timestamp,
566
		CreatedAt:         time.Now().UTC(),
567
		UpdatedAt:         time.Now().UTC(),
568
		OrderTimestamp:    params.Timestamp,
569
		ReceivedAt:        &params.Timestamp,
570
	}
571
572
	if err := service.repository.Store(ctx, message); err != nil {
573
		msg := fmt.Sprintf("cannot save message with id [%s]", params.MessageID)
574
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
575
	}
576
577
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", message.ID))
578
	return message, nil
579
}
580
581
// HandleMessageParams are parameters for handling a message event
582
type HandleMessageParams struct {
583
	ID        uuid.UUID
584
	Source    string
585
	UserID    entities.UserID
586
	Timestamp time.Time
587
}
588
589
// HandleMessageSending handles when a message is being sent
590
func (service *MessageService) HandleMessageSending(ctx context.Context, params HandleMessageParams) error {
591
	ctx, span := service.tracer.Start(ctx)
592
	defer span.End()
593
594
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
595
596
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
597
	if err != nil {
598
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
599
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
600
	}
601
602
	if !message.IsSending() {
603
		msg := fmt.Sprintf("message has wrong status [%s]. expected %s", message.Status, entities.MessageStatusSending)
604
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
605
	}
606
607
	if err = service.repository.Update(ctx, message.AddSendAttempt(params.Timestamp)); err != nil {
608
		msg := fmt.Sprintf("cannot update message with id [%s] after sending", message.ID)
609
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
610
	}
611
612
	ctxLogger.Info(fmt.Sprintf("message with id [%s] updated after adding send attempt", message.ID))
613
	return nil
614
}
615
616
// HandleMessageSent handles when a message has been sent by a mobile phone
617
func (service *MessageService) HandleMessageSent(ctx context.Context, params HandleMessageParams) error {
618
	ctx, span := service.tracer.Start(ctx)
619
	defer span.End()
620
621
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
622
623
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
624
	if err != nil {
625
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
626
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
627
	}
628
629
	if message.IsSent() || message.IsDelivered() {
630
		ctxLogger.Info(fmt.Sprintf("message [%s] for [%s] has already been processed with status [%s]", message.ID, message.UserID, message.Status))
631
		return nil
632
	}
633
634
	if !message.IsSending() && !message.IsExpired() {
635
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusExpired)
636
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
637
	}
638
639
	if err = service.repository.Update(ctx, message.Sent(params.Timestamp)); err != nil {
640
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
641
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
642
	}
643
644
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
645
	return nil
646
}
647
648
// HandleMessageFailedParams are parameters for handling a failed message event
649
type HandleMessageFailedParams struct {
650
	ID           uuid.UUID
651
	UserID       entities.UserID
652
	ErrorMessage string
653
	Timestamp    time.Time
654
}
655
656
// HandleMessageFailed handles when a message could not be sent by a mobile phone
657
func (service *MessageService) HandleMessageFailed(ctx context.Context, params HandleMessageFailedParams) error {
658
	ctx, span := service.tracer.Start(ctx)
659
	defer span.End()
660
661
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
662
663
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
664
	if err != nil {
665
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
666
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
667
	}
668
669
	if message.IsDelivered() {
670
		msg := fmt.Sprintf("message has already been delivered with status [%s]", message.Status)
671
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
672
	}
673
674
	if err = service.repository.Update(ctx, message.Failed(params.Timestamp, params.ErrorMessage)); err != nil {
675
		msg := fmt.Sprintf("cannot update message with id [%s] as sent", message.ID)
676
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
677
	}
678
679
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
680
	return nil
681
}
682
683
// HandleMessageDelivered handles when a message is has been delivered by a mobile phone
684
func (service *MessageService) HandleMessageDelivered(ctx context.Context, params HandleMessageParams) error {
685
	ctx, span := service.tracer.Start(ctx)
686
	defer span.End()
687
688
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
689
690
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
691
	if err != nil {
692
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
693
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
694
	}
695
696
	if !message.IsSent() && !message.IsSending() && !message.IsExpired() && !message.IsScheduled() {
697
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s, %s]", message.Status, entities.MessageStatusSent, entities.MessageStatusScheduled, entities.MessageStatusSending, entities.MessageStatusExpired)
698
		ctxLogger.Warn(stacktrace.NewError(msg))
699
		return nil
700
	}
701
702
	if err = service.repository.Update(ctx, message.Delivered(params.Timestamp)); err != nil {
703
		msg := fmt.Sprintf("cannot update message with id [%s] as delivered", message.ID)
704
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
705
	}
706
707
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
708
	return nil
709
}
710
711
// HandleMessageNotificationScheduled handles the event when the notification of a message has been scheduled
712
func (service *MessageService) HandleMessageNotificationScheduled(ctx context.Context, params HandleMessageParams) error {
713
	ctx, span := service.tracer.Start(ctx)
714
	defer span.End()
715
716
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
717
718
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
719
	if err != nil {
720
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
721
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
722
	}
723
724
	if !message.IsPending() && !message.IsExpired() && !message.IsSending() {
725
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("received scheduled event for message with id [%s] message has status [%s]", message.ID, message.Status)))
726
	}
727
728
	if err = service.repository.Update(ctx, message.NotificationScheduled(params.Timestamp)); err != nil {
729
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
730
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
731
	}
732
733
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been scheduled to send at [%s]", message.ID, message.NotificationScheduledAt.String()))
734
	return nil
735
}
736
737
// HandleMessageNotificationSent handles the event when the notification of a message has been sent
738
func (service *MessageService) HandleMessageNotificationSent(ctx context.Context, params HandleMessageParams) error {
739
	ctx, span := service.tracer.Start(ctx)
740
	defer span.End()
741
742
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
743
744
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
745
	if err != nil {
746
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
747
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
748
	}
749
750
	if err = service.repository.Update(ctx, message.AddSendAttemptCount()); err != nil {
751
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
752
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
753
	}
754
755
	ctxLogger.Info(fmt.Sprintf("notification for message with id [%s] has been sent at [%s]", message.ID, params.Timestamp.String()))
756
	return nil
757
}
758
759
// HandleMessageExpired handles when a message is has been expired
760
func (service *MessageService) HandleMessageExpired(ctx context.Context, params HandleMessageParams) error {
761
	ctx, span := service.tracer.Start(ctx)
762
	defer span.End()
763
764
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
765
766
	message, err := service.repository.Load(ctx, params.UserID, params.ID)
767
	if err != nil {
768
		msg := fmt.Sprintf("cannot find message with id [%s]", params.ID)
769
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
770
	}
771
772
	if !message.IsSending() && !message.IsScheduled() && !message.IsPending() {
773
		msg := fmt.Sprintf("message has wrong status [%s]. expected [%s, %s, %s]", message.Status, entities.MessageStatusSending, entities.MessageStatusScheduled, entities.MessageStatusPending)
774
		return service.tracer.WrapErrorSpan(span, stacktrace.NewError(msg))
775
	}
776
777
	if err = service.repository.Update(ctx, message.Expired(params.Timestamp)); err != nil {
778
		msg := fmt.Sprintf("cannot update message with id [%s] as expired", message.ID)
779
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
780
	}
781
782
	ctxLogger.Info(fmt.Sprintf("message with id [%s] has been updated to status [%s]", message.ID, message.Status))
783
784
	if !message.CanBeRescheduled() {
785
		return nil
786
	}
787
788
	event, err := service.createMessageSendRetryEvent(params.Source, &events.MessageSendRetryPayload{
789
		MessageID: message.ID,
790
		Timestamp: time.Now().UTC(),
791
		Contact:   message.Contact,
792
		Owner:     message.Owner,
793
		Encrypted: message.Encrypted,
794
		UserID:    message.UserID,
795
		Content:   message.Content,
796
		SIM:       message.SIM,
797
	})
798
	if err != nil {
799
		msg := fmt.Sprintf("cannot create [%s] event for expired message with ID [%s]", events.EventTypeMessageSendRetry, message.ID)
800
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
801
	}
802
803
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
804
		msg := fmt.Sprintf("cannot dispatch [%s] event for message with ID [%s]", event.Type(), message.ID)
805
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
806
	}
807
808
	ctxLogger.Info(fmt.Sprintf("retried sending message with ID [%s]", message.ID))
809
	return nil
810
}
811
812
// MessageScheduleExpirationParams are parameters for scheduling the expiration of a message event
813
type MessageScheduleExpirationParams struct {
814
	MessageID                 uuid.UUID
815
	UserID                    entities.UserID
816
	NotificationSentAt        time.Time
817
	PhoneID                   uuid.UUID
818
	MessageExpirationDuration time.Duration
819
	Source                    string
820
}
821
822
// ScheduleExpirationCheck schedules an event to check if a message is expired
823
func (service *MessageService) ScheduleExpirationCheck(ctx context.Context, params MessageScheduleExpirationParams) error {
824
	ctx, span := service.tracer.Start(ctx)
825
	defer span.End()
826
827
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
828
829
	if params.MessageExpirationDuration == 0 {
830
		ctxLogger.Info(fmt.Sprintf("message expiration duration not set for message [%s] using phone [%s]", params.MessageID, params.PhoneID))
831
		return nil
832
	}
833
834
	event, err := service.createMessageSendExpiredCheckEvent(params.Source, &events.MessageSendExpiredCheckPayload{
835
		MessageID:   params.MessageID,
836
		ScheduledAt: params.NotificationSentAt.Add(params.MessageExpirationDuration),
837
		UserID:      params.UserID,
838
	})
839
	if err != nil {
840
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpiredCheck, params.MessageID)
841
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
842
	}
843
844
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, params.MessageExpirationDuration); err != nil {
845
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
846
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
847
	}
848
849
	ctxLogger.Info(fmt.Sprintf("scheduled message id [%s] to expire at [%s]", params.MessageID, params.NotificationSentAt.Add(params.MessageExpirationDuration)))
850
	return nil
851
}
852
853
// MessageCheckExpired are parameters for checking if a message is expired
854
type MessageCheckExpired struct {
855
	MessageID uuid.UUID
856
	UserID    entities.UserID
857
	Source    string
858
}
859
860
// CheckExpired checks if a message has expired
861
func (service *MessageService) CheckExpired(ctx context.Context, params MessageCheckExpired) error {
862
	ctx, span := service.tracer.Start(ctx)
863
	defer span.End()
864
865
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
866
867
	message, err := service.repository.Load(ctx, params.UserID, params.MessageID)
868
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
869
		ctxLogger.Info(fmt.Sprintf("message has been deleted for userID [%s] and messageID [%s]", params.UserID, params.MessageID))
870
		return nil
871
	}
872
873
	if err != nil {
874
		msg := fmt.Sprintf("cannot load message with userID [%s] and messageID [%s]", params.UserID, params.MessageID)
875
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
876
	}
877
878
	if !message.IsPending() && !message.IsSending() && !message.IsScheduled() {
879
		ctxLogger.Info(fmt.Sprintf("message with ID [%s] has status [%s] and is not expired", message.ID, message.Status))
880
		return nil
881
	}
882
883
	event, err := service.createMessageSendExpiredEvent(params.Source, events.MessageSendExpiredPayload{
884
		MessageID:        message.ID,
885
		Owner:            message.Owner,
886
		Contact:          message.Contact,
887
		Encrypted:        message.Encrypted,
888
		RequestID:        message.RequestID,
889
		IsFinal:          message.SendAttemptCount == message.MaxSendAttempts,
890
		SendAttemptCount: message.SendAttemptCount,
891
		UserID:           message.UserID,
892
		Timestamp:        time.Now().UTC(),
893
		Content:          message.Content,
894
		SIM:              message.SIM,
895
	})
896
	if err != nil {
897
		msg := fmt.Sprintf("cannot create event [%s] for message with id [%s]", events.EventTypeMessageSendExpired, params.MessageID)
898
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
899
	}
900
901
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
902
		msg := fmt.Sprintf("cannot dispatch event [%s] for message with ID [%s]", event.Type(), params.MessageID)
903
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
904
	}
905
906
	ctxLogger.Info(fmt.Sprintf("message [%s] has expired with status [%s]", params.MessageID, message.Status))
907
	return nil
908
}
909
910
// MessageSearchParams are parameters for searching messages
911
type MessageSearchParams struct {
912
	repositories.IndexParams
913
	UserID   entities.UserID
914
	Owners   []string
915
	Types    []entities.MessageType
916
	Statuses []entities.MessageStatus
917
}
918
919
// SearchMessages fetches all the messages for a user
920
func (service *MessageService) SearchMessages(ctx context.Context, params *MessageSearchParams) ([]*entities.Message, error) {
921
	ctx, span := service.tracer.Start(ctx)
922
	defer span.End()
923
924
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
925
926
	messages, err := service.repository.Search(ctx, params.UserID, params.Owners, params.Types, params.Statuses, params.IndexParams)
927
	if err != nil {
928
		msg := fmt.Sprintf("could not search messages with parms [%+#v]", params)
929
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
930
	}
931
932
	ctxLogger.Info(fmt.Sprintf("fetched [%d] messages with prams [%+#v]", len(messages), params))
933
	return messages, nil
934
}
935
936
func (service *MessageService) phoneSettings(ctx context.Context, userID entities.UserID, owner string) (uint, entities.SIM) {
937
	ctx, span := service.tracer.Start(ctx)
938
	defer span.End()
939
940
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
941
942
	phone, err := service.phoneService.Load(ctx, userID, owner)
943
	if err != nil {
944
		msg := fmt.Sprintf("cannot load phone for userID [%s] and owner [%s]. using default max send attempt of 2", userID, owner)
945
		ctxLogger.Error(stacktrace.Propagate(err, msg))
946
		return 2, entities.SIM1
947
	}
948
949
	return phone.MaxSendAttemptsSanitized(), phone.SIM
950
}
951
952
// storeSentMessage a new message
953
func (service *MessageService) storeSentMessage(ctx context.Context, payload events.MessageAPISentPayload) (*entities.Message, error) {
954
	ctx, span := service.tracer.Start(ctx)
955
	defer span.End()
956
957
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
958
959
	timestamp := payload.RequestReceivedAt
960
	if payload.ScheduledSendTime != nil {
961
		timestamp = *payload.ScheduledSendTime
962
	}
963
964
	message := &entities.Message{
965
		ID:                payload.MessageID,
966
		Owner:             payload.Owner,
967
		Contact:           payload.Contact,
968
		UserID:            payload.UserID,
969
		Content:           payload.Content,
970
		RequestID:         payload.RequestID,
971
		SIM:               payload.SIM,
972
		Encrypted:         payload.Encrypted,
973
		ScheduledSendTime: payload.ScheduledSendTime,
974
		Type:              entities.MessageTypeMobileTerminated,
975
		Status:            entities.MessageStatusPending,
976
		RequestReceivedAt: payload.RequestReceivedAt,
977
		CreatedAt:         time.Now().UTC(),
978
		UpdatedAt:         time.Now().UTC(),
979
		MaxSendAttempts:   payload.MaxSendAttempts,
980
		OrderTimestamp:    timestamp,
981
	}
982
983
	if err := service.repository.Store(ctx, message); err != nil {
984
		msg := fmt.Sprintf("cannot save message with id [%s]", payload.MessageID)
985
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
986
	}
987
988
	ctxLogger.Info(fmt.Sprintf("message saved with id [%s]", payload.MessageID))
989
	return message, nil
990
}
991
992
// storeMissedCallMessage a new message
993
func (service *MessageService) storeMissedCallMessage(ctx context.Context, payload *events.MessageCallMissedPayload) (*entities.Message, error) {
994
	ctx, span := service.tracer.Start(ctx)
995
	defer span.End()
996
997
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
998
999
	message := &entities.Message{
1000
		ID:                payload.MessageID,
1001
		Owner:             payload.Owner,
1002
		Contact:           payload.Contact,
1003
		UserID:            payload.UserID,
1004
		SIM:               payload.SIM,
1005
		Type:              entities.MessageTypeCallMissed,
1006
		Status:            entities.MessageStatusReceived,
1007
		RequestReceivedAt: payload.Timestamp,
1008
		CreatedAt:         time.Now().UTC(),
1009
		UpdatedAt:         time.Now().UTC(),
1010
		OrderTimestamp:    payload.Timestamp,
1011
	}
1012
1013
	if err := service.repository.Store(ctx, message); err != nil {
1014
		msg := fmt.Sprintf("cannot save missed call message with id [%s]", payload.MessageID)
1015
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
1016
	}
1017
1018
	ctxLogger.Info(fmt.Sprintf("missed call message saved with id [%s]", payload.MessageID))
1019
	return message, nil
1020
}
1021
1022
func (service *MessageService) createMessageSendExpiredEvent(source string, payload events.MessageSendExpiredPayload) (cloudevents.Event, error) {
1023
	return service.createEvent(events.EventTypeMessageSendExpired, source, payload)
1024
}
1025
1026
func (service *MessageService) createMessageSendExpiredCheckEvent(source string, payload *events.MessageSendExpiredCheckPayload) (cloudevents.Event, error) {
1027
	return service.createEvent(events.EventTypeMessageSendExpiredCheck, source, payload)
1028
}
1029
1030
func (service *MessageService) createMessageAPISentEvent(source string, payload events.MessageAPISentPayload) (cloudevents.Event, error) {
1031
	return service.createEvent(events.EventTypeMessageAPISent, source, payload)
1032
}
1033
1034
func (service *MessageService) createMessagePhoneReceivedEvent(source string, payload events.MessagePhoneReceivedPayload) (cloudevents.Event, error) {
1035
	return service.createEvent(events.EventTypeMessagePhoneReceived, source, payload)
1036
}
1037
1038
func (service *MessageService) createMessagePhoneSendingEvent(source string, payload events.MessagePhoneSendingPayload) (cloudevents.Event, error) {
1039
	return service.createEvent(events.EventTypeMessagePhoneSending, source, payload)
1040
}
1041
1042
func (service *MessageService) createMessagePhoneSentEvent(source string, payload events.MessagePhoneSentPayload) (cloudevents.Event, error) {
1043
	return service.createEvent(events.EventTypeMessagePhoneSent, source, payload)
1044
}
1045
1046
func (service *MessageService) createMessageSendFailedEvent(source string, payload events.MessageSendFailedPayload) (cloudevents.Event, error) {
1047
	return service.createEvent(events.EventTypeMessageSendFailed, source, payload)
1048
}
1049
1050
func (service *MessageService) createMessagePhoneDeliveredEvent(source string, payload events.MessagePhoneDeliveredPayload) (cloudevents.Event, error) {
1051
	return service.createEvent(events.EventTypeMessagePhoneDelivered, source, payload)
1052
}
1053
1054
func (service *MessageService) createMessageSendRetryEvent(source string, payload *events.MessageSendRetryPayload) (cloudevents.Event, error) {
1055
	return service.createEvent(events.EventTypeMessageSendRetry, source, payload)
1056
}
1057