Passed
Pull Request — main (#9)
by Acho
01:36
created

api/pkg/services/notification_service.go   A

Size/Duplication

Total Lines 268
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 24
eloc 178
dl 0
loc 268
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A services.*NotificationService.handleNotificationSent 0 19 3
A services.*NotificationService.Send 0 25 4
A services.*NotificationService.createEvent 0 22 2
A services.NewNotificationService 0 15 1
A services.*NotificationService.updateStatus 0 13 2
B services.*NotificationService.Schedule 0 39 5
A services.*NotificationService.handleNotificationFailed 0 20 3
A services.*NotificationService.createMessageNotificationFailedEvent 0 23 2
A services.*NotificationService.createMessageNotificationSentEvent 0 25 2
1
package services
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"time"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
	cloudevents "github.com/cloudevents/sdk-go/v2"
11
12
	"firebase.google.com/go/messaging"
13
	"github.com/NdoleStudio/httpsms/pkg/entities"
14
	"github.com/NdoleStudio/httpsms/pkg/repositories"
15
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
16
	"github.com/google/uuid"
17
	"github.com/palantir/stacktrace"
18
)
19
20
// NotificationService sends out notifications to mobile phones
21
type NotificationService struct {
22
	logger                      telemetry.Logger
23
	tracer                      telemetry.Tracer
24
	phoneNotificationRepository repositories.PhoneNotificationRepository
25
	phoneRepository             repositories.PhoneRepository
26
	messagingClient             *messaging.Client
27
	eventDispatcher             *EventDispatcher
28
}
29
30
// NewNotificationService creates a new NotificationService
31
func NewNotificationService(
32
	logger telemetry.Logger,
33
	tracer telemetry.Tracer,
34
	messagingClient *messaging.Client,
35
	phoneRepository repositories.PhoneRepository,
36
	phoneNotificationRepository repositories.PhoneNotificationRepository,
37
	dispatcher *EventDispatcher,
38
) (s *NotificationService) {
39
	return &NotificationService{
40
		logger:                      logger.WithService(fmt.Sprintf("%T", s)),
41
		tracer:                      tracer,
42
		messagingClient:             messagingClient,
43
		phoneNotificationRepository: phoneNotificationRepository,
44
		phoneRepository:             phoneRepository,
45
		eventDispatcher:             dispatcher,
46
	}
47
}
48
49
// NotificationSendParams are parameters for sending a notification
50
type NotificationSendParams struct {
51
	UserID              entities.UserID
52
	PhoneID             uuid.UUID
53
	PhoneNotificationID uuid.UUID
54
	Source              string
55
	ScheduledAt         time.Time
56
	MessageID           uuid.UUID
57
}
58
59
// Send sends a message when a message is sent
60
func (service *NotificationService) Send(ctx context.Context, params *NotificationSendParams) error {
61
	ctx, span := service.tracer.Start(ctx)
62
	defer span.End()
63
64
	phone, err := service.phoneRepository.LoadByID(ctx, params.PhoneID)
65
	if err != nil {
66
		msg := fmt.Sprintf("cannot load phone with userID [%s] and phoneID [%s]", params.UserID, params.PhoneID)
67
		return service.handleNotificationFailed(ctx, errors.New(msg), params)
68
	}
69
70
	if phone.FcmToken == nil {
71
		msg := fmt.Sprintf("phone with id [%s] has no FCM token", phone.ID)
72
		return service.handleNotificationFailed(ctx, errors.New(msg), params)
73
	}
74
75
	result, err := service.messagingClient.Send(ctx, &messaging.Message{
76
		Data: map[string]string{
77
			"KEY_MESSAGE_ID": params.MessageID.String(),
78
		},
79
		Token: *phone.FcmToken,
80
	})
81
	if err != nil {
82
		return service.handleNotificationFailed(ctx, err, params)
83
	}
84
	return service.handleNotificationSent(ctx, phone, result, params)
85
}
86
87
// NotificationScheduleParams are parameters for sending a notification
88
type NotificationScheduleParams struct {
89
	UserID    entities.UserID
90
	Owner     string
91
	Source    string
92
	MessageID uuid.UUID
93
}
94
95
// Schedule a notification to be sent to a phone
96
func (service *NotificationService) Schedule(ctx context.Context, params *NotificationScheduleParams) error {
97
	ctx, span := service.tracer.Start(ctx)
98
	defer span.End()
99
100
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
101
102
	phone, err := service.phoneRepository.Load(ctx, params.UserID, params.Owner)
103
	if err != nil {
104
		msg := fmt.Sprintf("cannot load phone with userID [%s] and phone [%s]", params.UserID, params.Owner)
105
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
106
	}
107
108
	notification := &entities.PhoneNotification{
109
		ID:          uuid.New(),
110
		MessageID:   params.MessageID,
111
		UserID:      params.UserID,
112
		PhoneID:     phone.ID,
113
		Status:      entities.PhoneNotificationStatusPending,
114
		ScheduledAt: time.Now().UTC(),
115
		CreatedAt:   time.Now().UTC(),
116
		UpdatedAt:   time.Now().UTC(),
117
	}
118
119
	if err = service.phoneNotificationRepository.Schedule(ctx, phone.MessagesPerMinute, notification); err != nil {
120
		msg := fmt.Sprintf("cannot schedule notification for message [%s] to phone [%s]", params.MessageID, phone.ID)
121
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
122
	}
123
124
	event, err := service.createEvent(params.Source, notification)
125
	if err != nil {
126
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create cloud event for notification [%s]", notification.ID))
127
	}
128
129
	if err = service.eventDispatcher.DispatchWithTimeout(ctx, event, notification.ScheduledAt.Sub(time.Now())); err != nil {
130
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), notification.ID))
131
	}
132
133
	ctxLogger.Info(fmt.Sprintf("message with id [%s] notification scheduled for [%s] with id [%s]", params.MessageID, notification.ScheduledAt, notification.ID))
134
	return nil
135
}
136
137
func (service *NotificationService) handleNotificationFailed(ctx context.Context, err error, params *NotificationSendParams) error {
138
	ctx, span := service.tracer.Start(ctx)
139
	defer span.End()
140
141
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
142
143
	msg := fmt.Sprintf("cannot send notification for message [%s] to phone [%s]", params.MessageID, params.PhoneNotificationID)
144
	ctxLogger.Warn(stacktrace.Propagate(err, msg))
145
146
	event, err := service.createMessageNotificationFailedEvent(params.Source, err.Error(), params)
147
	if err != nil {
148
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationFailed, params.PhoneNotificationID))
149
	}
150
151
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
152
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), params.PhoneNotificationID))
153
	}
154
155
	service.updateStatus(ctx, params.PhoneNotificationID, entities.PhoneNotificationStatusFailed)
156
	return nil
157
}
158
159
func (service *NotificationService) handleNotificationSent(ctx context.Context, phone *entities.Phone, result string, params *NotificationSendParams) error {
160
	ctx, span := service.tracer.Start(ctx)
161
	defer span.End()
162
163
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
164
165
	ctxLogger.Info(fmt.Sprintf("sent notification [%s] for message [%s] to phone [%s]", result, params.MessageID, params.PhoneID))
166
167
	event, err := service.createMessageNotificationSentEvent(params.Source, phone, result, params)
168
	if err != nil {
169
		return stacktrace.Propagate(err, fmt.Sprintf("cannot create [%s] event for notification [%s]", events.EventTypeMessageNotificationSent, params.PhoneNotificationID))
170
	}
171
172
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
173
		return stacktrace.Propagate(err, fmt.Sprintf("cannot dispatch event [%s] for notification [%s]", event.Type(), params.PhoneNotificationID))
174
	}
175
176
	service.updateStatus(ctx, params.PhoneNotificationID, entities.PhoneNotificationStatusSent)
177
	return nil
178
}
179
180
func (service *NotificationService) createEvent(source string, notification *entities.PhoneNotification) (cloudevents.Event, error) {
181
	event := cloudevents.NewEvent()
182
183
	event.SetSource(source)
184
	event.SetType(events.EventTypeMessageNotificationScheduled)
185
	event.SetTime(time.Now().UTC())
186
	event.SetID(uuid.New().String())
187
188
	payload := events.MessageNotificationScheduledPayload{
189
		MessageID:      notification.MessageID,
190
		UserID:         notification.UserID,
191
		PhoneID:        notification.PhoneID,
192
		ScheduledAt:    notification.ScheduledAt,
193
		NotificationID: notification.ID,
194
	}
195
196
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
197
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
198
		return event, stacktrace.Propagate(err, msg)
199
	}
200
201
	return event, nil
202
}
203
204
func (service *NotificationService) createMessageNotificationSentEvent(source string, phone *entities.Phone, fcmMessageID string, params *NotificationSendParams) (cloudevents.Event, error) {
205
	event := cloudevents.NewEvent()
206
207
	event.SetSource(source)
208
	event.SetType(events.EventTypeMessageNotificationSent)
209
	event.SetTime(time.Now().UTC())
210
	event.SetID(uuid.New().String())
211
212
	payload := events.MessageNotificationSentPayload{
213
		MessageID:                params.MessageID,
214
		UserID:                   params.UserID,
215
		PhoneID:                  params.PhoneID,
216
		ScheduledAt:              params.ScheduledAt,
217
		MessageExpirationTimeout: phone.MessageExpirationTimeout,
218
		FcmMessageID:             fcmMessageID,
219
		NotificationSentAt:       time.Now().UTC(),
220
		NotificationID:           params.PhoneNotificationID,
221
	}
222
223
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
224
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
225
		return event, stacktrace.Propagate(err, msg)
226
	}
227
228
	return event, nil
229
}
230
231
func (service *NotificationService) createMessageNotificationFailedEvent(source string, errorMessage string, params *NotificationSendParams) (cloudevents.Event, error) {
232
	event := cloudevents.NewEvent()
233
234
	event.SetSource(source)
235
	event.SetType(events.EventTypeMessageNotificationFailed)
236
	event.SetTime(time.Now().UTC())
237
	event.SetID(uuid.New().String())
238
239
	payload := events.MessageNotificationFailedPayload{
240
		MessageID:            params.MessageID,
241
		UserID:               params.UserID,
242
		PhoneID:              params.PhoneID,
243
		ErrorMessage:         errorMessage,
244
		NotificationFailedAt: time.Now().UTC(),
245
		NotificationID:       params.PhoneNotificationID,
246
	}
247
248
	if err := event.SetData(cloudevents.ApplicationJSON, payload); err != nil {
249
		msg := fmt.Sprintf("cannot encode %T [%#+v] as JSON", payload, payload)
250
		return event, stacktrace.Propagate(err, msg)
251
	}
252
253
	return event, nil
254
}
255
256
func (service *NotificationService) updateStatus(ctx context.Context, notificationID uuid.UUID, status entities.PhoneNotificationStatus) {
257
	ctx, span := service.tracer.Start(ctx)
258
	defer span.End()
259
260
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
261
262
	err := service.phoneNotificationRepository.UpdateStatus(ctx, notificationID, status)
263
	if err != nil {
264
		msg := fmt.Sprintf("cannot update status of notificaiton with id [%s] to [%s]", notificationID, status)
265
		ctxLogger.Error(stacktrace.Propagate(err, msg))
266
	}
267
268
	ctxLogger.Info(fmt.Sprintf("updated status of notificaiton with id [%s] to [%s]", notificationID, status))
269
}
270