api/pkg/services/phone_notification_service.go   A
last analyzed

Size/Duplication

Total Lines 354
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 36
eloc 235
dl 0
loc 354
rs 9.52
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A services.NewNotificationService 0 15 1
A services.*PhoneNotificationService.createMessageNotificationSentEvent 0 25 2
A services.*PhoneNotificationService.dispatchMessageNotificationSend 0 16 3
A services.*PhoneNotificationService.dispatchMessageNotificationScheduled 0 21 3
A services.*PhoneNotificationService.createMessageNotificationSendEvent 0 2 1
A services.*PhoneNotificationService.createMessageNotificationFailedEvent 0 23 2
A services.*PhoneNotificationService.Send 0 33 4
A services.*PhoneNotificationService.handleNotificationSent 0 19 3
A services.*PhoneNotificationService.DeleteAllForUser 0 11 2
A services.*PhoneNotificationService.SendHeartbeatFCM 0 32 4
B services.*PhoneNotificationService.Schedule 0 38 5
A services.*PhoneNotificationService.handleNotificationFailed 0 20 3
A services.*PhoneNotificationService.createMessageNotificationScheduledEvent 0 2 1
A services.*PhoneNotificationService.updateStatus 0 13 2
1
package services
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"time"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
	cloudevents "github.com/cloudevents/sdk-go/v2"
11
12
	"firebase.google.com/go/messaging"
13
	"github.com/NdoleStudio/httpsms/pkg/entities"
14
	"github.com/NdoleStudio/httpsms/pkg/repositories"
15
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
16
	"github.com/google/uuid"
17
	"github.com/palantir/stacktrace"
18
)
19
20
// PhoneNotificationService sends out notifications to mobile phones
21
type PhoneNotificationService struct {
22
	service
23
	logger                      telemetry.Logger
24
	tracer                      telemetry.Tracer
25
	phoneNotificationRepository repositories.PhoneNotificationRepository
26
	phoneRepository             repositories.PhoneRepository
27
	messagingClient             *messaging.Client
28
	eventDispatcher             *EventDispatcher
29
}
30
31
// NewNotificationService creates a new PhoneNotificationService
32
func NewNotificationService(
33
	logger telemetry.Logger,
34
	tracer telemetry.Tracer,
35
	messagingClient *messaging.Client,
36
	phoneRepository repositories.PhoneRepository,
37
	phoneNotificationRepository repositories.PhoneNotificationRepository,
38
	dispatcher *EventDispatcher,
39
) (s *PhoneNotificationService) {
40
	return &PhoneNotificationService{
41
		logger:                      logger.WithService(fmt.Sprintf("%T", s)),
42
		tracer:                      tracer,
43
		messagingClient:             messagingClient,
44
		phoneNotificationRepository: phoneNotificationRepository,
45
		phoneRepository:             phoneRepository,
46
		eventDispatcher:             dispatcher,
47
	}
48
}
49
50
// DeleteAllForUser deletes all entities.PhoneNotification for an entities.UserID.
51
func (service *PhoneNotificationService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
52
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
53
	defer span.End()
54
55
	if err := service.phoneNotificationRepository.DeleteAllForUser(ctx, userID); err != nil {
56
		msg := fmt.Sprintf("could not delete all [entities.PhoneNotification] for user with ID [%s]", userID)
57
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
58
	}
59
60
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.PhoneNotification] for user with ID [%s]", userID))
61
	return nil
62
}
63
64
// SendHeartbeatFCM sends a heartbeat message so the phone can request a heartbeat
65
func (service *PhoneNotificationService) SendHeartbeatFCM(ctx context.Context, payload *events.PhoneHeartbeatMissedPayload) error {
66
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
67
	defer span.End()
68
69
	phone, err := service.phoneRepository.LoadByID(ctx, payload.UserID, payload.PhoneID)
70
	if err != nil {
71
		msg := fmt.Sprintf("cannot load phone with userID [%s] and phoneID [%s]", payload.UserID, payload.PhoneID)
72
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
73
	}
74
75
	if phone.FcmToken == nil {
76
		msg := fmt.Sprintf("phone with id [%s] has no FCM token", phone.ID)
77
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
78
	}
79
80
	result, err := service.messagingClient.Send(ctx, &messaging.Message{
81
		Data: map[string]string{
82
			"KEY_HEARTBEAT_ID": time.Now().UTC().Format(time.RFC3339),
83
		},
84
		Android: &messaging.AndroidConfig{
85
			Priority: "high",
86
		},
87
		Token: *phone.FcmToken,
88
	})
89
	if err != nil {
90
		msg := fmt.Sprintf("cannot send heartbeat FCM to phone with id [%s] for user [%s]", phone.ID, phone.UserID)
91
		ctxLogger.Warn(stacktrace.Propagate(err, msg))
92
		return nil
93
	}
94
95
	ctxLogger.Info(fmt.Sprintf("successfully sent heartbeat FCM [%s] to phone with ID [%s] for user [%s] and monitor [%s]", result, payload.PhoneID, payload.UserID, payload.MonitorID))
96
	return nil
97
}
98
99
// PhoneNotificationSendParams are parameters for sending a notification
100
type PhoneNotificationSendParams struct {
101
	UserID              entities.UserID
102
	PhoneID             uuid.UUID
103
	PhoneNotificationID uuid.UUID
104
	Source              string
105
	ScheduledAt         time.Time
106
	MessageID           uuid.UUID
107
}
108
109
// Send sends a message when a message is sent
110
func (service *PhoneNotificationService) Send(ctx context.Context, params *PhoneNotificationSendParams) error {
111
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
112
	defer span.End()
113
114
	phone, err := service.phoneRepository.LoadByID(ctx, params.UserID, params.PhoneID)
115
	if err != nil {
116
		msg := fmt.Sprintf("cannot load phone with userID [%s] and phoneID [%s]", params.UserID, params.PhoneID)
117
		return service.handleNotificationFailed(ctx, errors.New(msg), params)
118
	}
119
120
	if phone.FcmToken == nil {
121
		msg := fmt.Sprintf("phone with id [%s] has no FCM token", phone.ID)
122
		return service.handleNotificationFailed(ctx, errors.New(msg), params)
123
	}
124
125
	ttl := phone.MessageExpirationDuration()
126
	result, err := service.messagingClient.Send(ctx, &messaging.Message{
127
		Data: map[string]string{
128
			"KEY_MESSAGE_ID": params.MessageID.String(),
129
		},
130
		Android: &messaging.AndroidConfig{
131
			Priority: "normal",
132
			TTL:      &ttl,
133
		},
134
		Token: *phone.FcmToken,
135
	})
136
	if err != nil {
137
		ctxLogger.Warn(stacktrace.Propagate(err, fmt.Sprintf("cannot send FCM to phone with ID [%s] for user with ID [%s] and message [%s]", phone.ID, phone.UserID, params.MessageID)))
138
		msg := fmt.Sprintf("cannot send notification for to your phone [%s]. Reinstall the httpSMS app on your Android phone.", phone.PhoneNumber)
139
		return service.handleNotificationFailed(ctx, errors.New(msg), params)
140
	}
141
142
	return service.handleNotificationSent(ctx, phone, result, params)
143
}
144
145
// PhoneNotificationScheduleParams are parameters for sending a notification
146
type PhoneNotificationScheduleParams struct {
147
	UserID    entities.UserID
148
	Owner     string
149
	Source    string
150
	Encrypted bool
151
	Contact   string
152
	Content   string
153
	SIM       entities.SIM
154
	MessageID uuid.UUID
155
}
156
157
// Schedule a notification to be sent to a phone
158
func (service *PhoneNotificationService) Schedule(ctx context.Context, params *PhoneNotificationScheduleParams) error {
159
	ctx, span := service.tracer.Start(ctx)
160
	defer span.End()
161
162
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
163
164
	phone, err := service.phoneRepository.Load(ctx, params.UserID, params.Owner)
165
	if err != nil {
166
		msg := fmt.Sprintf("cannot load phone with userID [%s] and phone [%s]", params.UserID, params.Owner)
167
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
168
	}
169
170
	notification := &entities.PhoneNotification{
171
		ID:          uuid.New(),
172
		MessageID:   params.MessageID,
173
		UserID:      params.UserID,
174
		PhoneID:     phone.ID,
175
		Status:      entities.PhoneNotificationStatusPending,
176
		ScheduledAt: time.Now().UTC(),
177
		CreatedAt:   time.Now().UTC(),
178
		UpdatedAt:   time.Now().UTC(),
179
	}
180
181
	if err = service.phoneNotificationRepository.Schedule(ctx, phone.MessagesPerMinute, notification); err != nil {
182
		msg := fmt.Sprintf("cannot schedule notification for message [%s] to phone [%s]", params.MessageID, phone.ID)
183
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
184
	}
185
186
	if err = service.dispatchMessageNotificationScheduled(ctx, params, notification); err != nil {
187
		ctxLogger.Error(err)
188
	}
189
190
	if err = service.dispatchMessageNotificationSend(ctx, params.Source, notification); err != nil {
191
		return service.tracer.WrapErrorSpan(span, err)
192
	}
193
194
	ctxLogger.Info(fmt.Sprintf("message with id [%s] notification scheduled for [%s] with id [%s]", params.MessageID, notification.ScheduledAt, notification.ID))
195
	return nil
196
}
197
198
func (service *PhoneNotificationService) dispatchMessageNotificationSend(ctx context.Context, source string, notification *entities.PhoneNotification) error {
199
	event, err := service.createMessageNotificationSendEvent(source, &events.MessageNotificationSendPayload{
200
		MessageID:      notification.MessageID,
201
		UserID:         notification.UserID,
202
		PhoneID:        notification.PhoneID,
203
		ScheduledAt:    notification.ScheduledAt,
204
		NotificationID: notification.ID,
205
	})
206
	if err != nil {
207
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationSend, notification.ID))
208
	}
209
210
	if _, err = service.eventDispatcher.DispatchWithTimeout(ctx, event, notification.ScheduledAt.Sub(time.Now())); err != nil {
211
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), notification.ID))
212
	}
213
	return nil
214
}
215
216
func (service *PhoneNotificationService) dispatchMessageNotificationScheduled(ctx context.Context, params *PhoneNotificationScheduleParams, notification *entities.PhoneNotification) error {
217
	event, err := service.createMessageNotificationScheduledEvent(params.Source, &events.MessageNotificationScheduledPayload{
218
		MessageID:      notification.MessageID,
219
		Owner:          params.Owner,
220
		Contact:        params.Contact,
221
		Encrypted:      params.Encrypted,
222
		Content:        params.Content,
223
		SIM:            params.SIM,
224
		UserID:         notification.UserID,
225
		PhoneID:        notification.PhoneID,
226
		ScheduledAt:    notification.ScheduledAt,
227
		NotificationID: notification.ID,
228
	})
229
	if err != nil {
230
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationScheduled, notification.ID))
231
	}
232
233
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
234
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), notification.ID))
235
	}
236
	return nil
237
}
238
239
func (service *PhoneNotificationService) handleNotificationFailed(ctx context.Context, err error, params *PhoneNotificationSendParams) error {
240
	ctx, span := service.tracer.Start(ctx)
241
	defer span.End()
242
243
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
244
245
	msg := fmt.Sprintf("cannot send notification for message [%s] to phone [%s]", params.MessageID, params.PhoneNotificationID)
246
	ctxLogger.Warn(stacktrace.Propagate(err, msg))
247
248
	event, err := service.createMessageNotificationFailedEvent(params.Source, err.Error(), params)
249
	if err != nil {
250
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationFailed, params.PhoneNotificationID))
251
	}
252
253
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
254
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), params.PhoneNotificationID))
255
	}
256
257
	service.updateStatus(ctx, params.PhoneNotificationID, entities.PhoneNotificationStatusFailed)
258
	return nil
259
}
260
261
func (service *PhoneNotificationService) handleNotificationSent(ctx context.Context, phone *entities.Phone, result string, params *PhoneNotificationSendParams) error {
262
	ctx, span := service.tracer.Start(ctx)
263
	defer span.End()
264
265
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
266
267
	ctxLogger.Info(fmt.Sprintf("sent notification [%s] for message [%s] to phone [%s]", result, params.MessageID, params.PhoneID))
268
269
	event, err := service.createMessageNotificationSentEvent(params.Source, phone, result, params)
270
	if err != nil {
271
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationSent, params.PhoneNotificationID))
272
	}
273
274
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
275
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), params.PhoneNotificationID))
276
	}
277
278
	service.updateStatus(ctx, params.PhoneNotificationID, entities.PhoneNotificationStatusSent)
279
	return nil
280
}
281
282
func (service *PhoneNotificationService) createMessageNotificationScheduledEvent(source string, payload *events.MessageNotificationScheduledPayload) (cloudevents.Event, error) {
283
	return service.createEvent(events.EventTypeMessageNotificationScheduled, source, payload)
284
}
285
286
func (service *PhoneNotificationService) createMessageNotificationSendEvent(source string, payload *events.MessageNotificationSendPayload) (cloudevents.Event, error) {
287
	return service.createEvent(events.EventTypeMessageNotificationSend, source, payload)
288
}
289
290
func (service *PhoneNotificationService) createMessageNotificationSentEvent(source string, phone *entities.Phone, fcmMessageID string, params *PhoneNotificationSendParams) (cloudevents.Event, error) {
291
	event := cloudevents.NewEvent()
292
293
	event.SetSource(source)
294
	event.SetType(events.EventTypeMessageNotificationSent)
295
	event.SetTime(time.Now().UTC())
296
	event.SetID(uuid.New().String())
297
298
	payload := events.MessageNotificationSentPayload{
299
		MessageID:                 params.MessageID,
300
		UserID:                    params.UserID,
301
		PhoneID:                   params.PhoneID,
302
		ScheduledAt:               params.ScheduledAt,
303
		MessageExpirationDuration: phone.MessageExpirationDuration(),
304
		FcmMessageID:              fcmMessageID,
305
		NotificationSentAt:        time.Now().UTC(),
306
		NotificationID:            params.PhoneNotificationID,
307
	}
308
309
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
310
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
311
		return event, stacktrace.Propagate(err, msg)
312
	}
313
314
	return event, nil
315
}
316
317
func (service *PhoneNotificationService) createMessageNotificationFailedEvent(source string, errorMessage string, params *PhoneNotificationSendParams) (cloudevents.Event, error) {
318
	event := cloudevents.NewEvent()
319
320
	event.SetSource(source)
321
	event.SetType(events.EventTypeMessageNotificationFailed)
322
	event.SetTime(time.Now().UTC())
323
	event.SetID(uuid.New().String())
324
325
	payload := events.MessageNotificationFailedPayload{
326
		MessageID:            params.MessageID,
327
		UserID:               params.UserID,
328
		PhoneID:              params.PhoneID,
329
		ErrorMessage:         errorMessage,
330
		NotificationFailedAt: time.Now().UTC(),
331
		NotificationID:       params.PhoneNotificationID,
332
	}
333
334
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
335
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
336
		return event, stacktrace.Propagate(err, msg)
337
	}
338
339
	return event, nil
340
}
341
342
func (service *PhoneNotificationService) updateStatus(ctx context.Context, notificationID uuid.UUID, status entities.PhoneNotificationStatus) {
343
	ctx, span := service.tracer.Start(ctx)
344
	defer span.End()
345
346
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
347
348
	err := service.phoneNotificationRepository.UpdateStatus(ctx, notificationID, status)
349
	if err != nil {
350
		msg := fmt.Sprintf("cannot update status of notificaiton with id [%s] to [%s]", notificationID, status)
351
		ctxLogger.Error(stacktrace.Propagate(err, msg))
352
	}
353
354
	ctxLogger.Info(fmt.Sprintf("updated status of notificaiton with id [%s] to [%s]", notificationID, status))
355
}
356