Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

services.*MessageThreadService.DeleteAllForUser   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
dl 0
loc 11
rs 10
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"math/rand"
7
	"time"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
11
	"github.com/NdoleStudio/httpsms/pkg/entities"
12
	"github.com/NdoleStudio/httpsms/pkg/repositories"
13
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
14
	"github.com/google/uuid"
15
	"github.com/palantir/stacktrace"
16
)
17
18
// MessageThreadService is handles message requests
19
type MessageThreadService struct {
20
	service
21
	logger          telemetry.Logger
22
	tracer          telemetry.Tracer
23
	repository      repositories.MessageThreadRepository
24
	eventDispatcher *EventDispatcher
25
}
26
27
// NewMessageThreadService creates a new MessageThreadService
28
func NewMessageThreadService(
29
	logger telemetry.Logger,
30
	tracer telemetry.Tracer,
31
	repository repositories.MessageThreadRepository,
32
	eventDispatcher *EventDispatcher,
33
) (s *MessageThreadService) {
34
	return &MessageThreadService{
35
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
36
		tracer:          tracer,
37
		eventDispatcher: eventDispatcher,
38
		repository:      repository,
39
	}
40
}
41
42
// MessageThreadUpdateParams are parameters for updating a thread
43
type MessageThreadUpdateParams struct {
44
	Owner     string
45
	Status    entities.MessageStatus
46
	Contact   string
47
	Content   string
48
	UserID    entities.UserID
49
	MessageID uuid.UUID
50
	Timestamp time.Time
51
}
52
53
// DeleteAllForUser deletes all entities.MessageThread for an entities.UserID.
54
func (service *MessageThreadService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
55
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
56
	defer span.End()
57
58
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
59
		msg := fmt.Sprintf("could not delete [entities.MessageThread] for user with ID [%s]", userID)
60
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
61
	}
62
63
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.MessageThread] for user with ID [%s]", userID))
64
	return nil
65
}
66
67
// UpdateThread updates a thread between 2 parties when a timestamp changes
68
func (service *MessageThreadService) UpdateThread(ctx context.Context, params MessageThreadUpdateParams) error {
69
	ctx, span := service.tracer.Start(ctx)
70
	defer span.End()
71
72
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
73
74
	thread, err := service.repository.LoadByOwnerContact(ctx, params.UserID, params.Owner, params.Contact)
75
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
76
		ctxLogger.Info(fmt.Sprintf("cannot find thread with owner [%s], and contact [%s]. creating new thread", params.Owner, params.Contact))
77
		return service.createThread(ctx, params)
78
	}
79
80
	if err != nil {
81
		msg := fmt.Sprintf("cannot find thread with owner [%s], and contact [%s]. creating new thread", params.Owner, params.Contact)
82
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
83
	}
84
85
	if thread.OrderTimestamp.Unix() > params.Timestamp.Unix() && thread.Status != entities.MessageStatusSending && thread.HasLastMessage(params.MessageID) {
86
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("thread [%s] has timestamp [%s] and status [%s] which is greater than timestamp [%s] for message [%s] and status [%s]", thread.ID, thread.OrderTimestamp, thread.Status, params.Timestamp, params.MessageID, params.Status)))
87
		return nil
88
	}
89
90
	if thread.Status == entities.MessageStatusDelivered && thread.LastMessageID != nil && thread.HasLastMessage(params.MessageID) {
91
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("thread [%s] already has status [%s] not updating with status [%s] for message [%s]", thread.ID, thread.Status, params.Status, params.MessageID)))
92
		return nil
93
	}
94
95
	if err = service.repository.Update(ctx, thread.Update(params.Timestamp, params.MessageID, params.Content, params.Status)); err != nil {
96
		msg := fmt.Sprintf("cannot update message thread with id [%s] after adding message [%s]", thread.ID, params.MessageID)
97
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
98
	}
99
100
	ctxLogger.Info(fmt.Sprintf("thread with id [%s] updated with last message [%s] and status [%s]", thread.ID, thread.LastMessageID, thread.Status))
101
	return nil
102
}
103
104
// MessageThreadStatusParams are parameters for updating a thread status
105
type MessageThreadStatusParams struct {
106
	IsArchived      bool
107
	UserID          entities.UserID
108
	MessageThreadID uuid.UUID
109
}
110
111
// UpdateStatus updates a thread between an owner and a contact
112
func (service *MessageThreadService) UpdateStatus(ctx context.Context, params MessageThreadStatusParams) (*entities.MessageThread, error) {
113
	ctx, span := service.tracer.Start(ctx)
114
	defer span.End()
115
116
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
117
118
	thread, err := service.repository.Load(ctx, params.UserID, params.MessageThreadID)
119
	if err != nil {
120
		msg := fmt.Sprintf("cannot find thread with id [%s]", params.MessageThreadID)
121
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
122
	}
123
124
	if err = service.repository.Update(ctx, thread.UpdateArchive(params.IsArchived)); err != nil {
125
		msg := fmt.Sprintf("cannot update message thread with id [%s] with archive status [%t]", thread.ID, params.IsArchived)
126
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
127
	}
128
129
	ctxLogger.Info(fmt.Sprintf("thread with id [%s] updated with archive status [%t]", thread.ID, thread.IsArchived))
130
	return thread, nil
131
}
132
133
// UpdateAfterDeletedMessage updates a thread after the last message has been deleted
134
func (service *MessageThreadService) UpdateAfterDeletedMessage(ctx context.Context, payload *events.MessageAPIDeletedPayload) error {
135
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
136
	defer span.End()
137
138
	thread, err := service.repository.LoadByOwnerContact(ctx, payload.UserID, payload.Owner, payload.Contact)
139
	if err != nil {
140
		msg := fmt.Sprintf("cannot find thread for user [%s] with owner [%s], and contact [%s]", payload.UserID, payload.Owner, payload.Contact)
141
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
142
	}
143
144
	if payload.PreviousMessageID == nil {
145
		if err = service.repository.Delete(ctx, thread.UserID, thread.ID); err != nil {
146
			msg := fmt.Sprintf("cannot delete thread with ID [%s] for user [%s] and owner [%s]", thread.ID, thread.UserID, thread.Owner)
147
			ctxLogger.Error(stacktrace.Propagate(err, msg))
148
			return nil
149
		}
150
		msg := fmt.Sprintf("previous message ID is nil for thread with ID [%s] and user [%s]", thread.ID, thread.UserID)
151
		ctxLogger.Info(msg)
152
		return nil
153
	}
154
155
	if thread.LastMessageID != nil && *thread.LastMessageID != payload.MessageID {
156
		msg := fmt.Sprintf("last message ID [%s] does not match message ID [%s] for thread with ID [%s]", *thread.LastMessageID, payload.MessageID, thread.ID)
157
		ctxLogger.Info(msg)
158
		return nil
159
	}
160
161
	thread.LastMessageContent = payload.PreviousMessageContent
162
	thread.LastMessageID = payload.PreviousMessageID
163
	thread.Status = *payload.PreviousMessageStatus
164
	thread.UpdatedAt = time.Now().UTC()
165
166
	if err = service.repository.Update(ctx, thread); err != nil {
167
		msg := fmt.Sprintf("cannot update thread with ID [%s] for user with ID [%s]", thread.ID, thread.UserID)
168
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
169
	}
170
171
	ctxLogger.Info(fmt.Sprintf("last message has been removed from thread with ID [%s] and userID [%s]", thread.ID, thread.UserID))
172
	return nil
173
}
174
175
func (service *MessageThreadService) createThread(ctx context.Context, params MessageThreadUpdateParams) error {
176
	ctx, span := service.tracer.Start(ctx)
177
	defer span.End()
178
179
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
180
181
	thread := &entities.MessageThread{
182
		ID:                 uuid.New(),
183
		Owner:              params.Owner,
184
		Contact:            params.Contact,
185
		UserID:             params.UserID,
186
		IsArchived:         false,
187
		Color:              service.getColor(),
188
		LastMessageContent: &params.Content,
189
		Status:             params.Status,
190
		LastMessageID:      &params.MessageID,
191
		CreatedAt:          time.Now().UTC(),
192
		UpdatedAt:          time.Now().UTC(),
193
		OrderTimestamp:     params.Timestamp,
194
	}
195
196
	if err := service.repository.Store(ctx, thread); err != nil {
197
		msg := fmt.Sprintf("cannot store thread with id [%s] for message with ID [%s]", thread.ID, params.MessageID)
198
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
199
	}
200
201
	ctxLogger.Info(fmt.Sprintf(
202
		"created thread [%s] for message ID [%s] with owner [%s] and contact [%s]",
203
		thread.ID,
204
		thread.LastMessageID,
205
		thread.Owner,
206
		thread.Contact,
207
	))
208
209
	return nil
210
}
211
212
func (service *MessageThreadService) getColor() string {
213
	colors := []string{
214
		"deep-purple",
215
		"indigo",
216
		"blue",
217
		"red",
218
		"pink",
219
		"purple",
220
		"light-blue",
221
		"cyan",
222
		"teal",
223
		"green",
224
		"light-green",
225
		"lime",
226
		"yellow",
227
		"amber",
228
		"orange",
229
		"deep-orange",
230
		"brown",
231
	}
232
	generator := rand.New(rand.NewSource(time.Now().UnixNano()))
233
	return colors[generator.Intn(len(colors))]
234
}
235
236
// MessageThreadGetParams parameters fetching threads
237
type MessageThreadGetParams struct {
238
	repositories.IndexParams
239
	IsArchived bool
240
	UserID     entities.UserID
241
	Owner      string
242
}
243
244
// GetThreads fetches threads for an owner
245
func (service *MessageThreadService) GetThreads(ctx context.Context, params MessageThreadGetParams) (*[]entities.MessageThread, error) {
246
	ctx, span := service.tracer.Start(ctx)
247
	defer span.End()
248
249
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
250
251
	threads, err := service.repository.Index(ctx, params.UserID, params.Owner, params.IsArchived, params.IndexParams)
252
	if err != nil {
253
		msg := fmt.Sprintf("could not fetch messages threads for params [%+#v]", params)
254
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
255
	}
256
257
	ctxLogger.Info(fmt.Sprintf("fetched [%d] threads with params [%+#v]", len(*threads), params))
258
	return threads, nil
259
}
260
261
// GetThread fetches an entities.MessageThread  message thread by the ID
262
func (service *MessageThreadService) GetThread(ctx context.Context, userID entities.UserID, messageThreadID uuid.UUID) (*entities.MessageThread, error) {
263
	ctx, span := service.tracer.Start(ctx)
264
	defer span.End()
265
266
	thread, err := service.repository.Load(ctx, userID, messageThreadID)
267
	if err != nil {
268
		msg := fmt.Sprintf("could not fetch thread with ID [%s] for user [%s]", messageThreadID, userID)
269
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
270
	}
271
272
	return thread, nil
273
}
274
275
// DeleteThread deletes an entities.MessageThread from the database
276
func (service *MessageThreadService) DeleteThread(ctx context.Context, source string, thread *entities.MessageThread) error {
277
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
278
	defer span.End()
279
280
	if err := service.repository.Delete(ctx, thread.UserID, thread.ID); err != nil {
281
		msg := fmt.Sprintf("could not delete message thread with ID [%s] for user with ID [%s]", thread.ID, thread.UserID)
282
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
283
	}
284
285
	event, err := service.createEvent(events.MessageThreadAPIDeleted, source, &events.MessageThreadAPIDeletedPayload{
286
		MessageThreadID: thread.ID,
287
		UserID:          thread.UserID,
288
		Owner:           thread.Owner,
289
		Contact:         thread.Contact,
290
		IsArchived:      thread.IsArchived,
291
		Color:           thread.Color,
292
		Status:          thread.Status,
293
		Timestamp:       time.Now().UTC(),
294
	})
295
	if err != nil {
296
		msg := fmt.Sprintf("cannot create [%T] for message thread dleted with ID [%s]", event, thread.ID)
297
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
298
	}
299
300
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID))
301
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
302
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID)
303
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
304
	}
305
306
	ctxLogger.Info(fmt.Sprintf("dispatched [%s] event with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID))
307
	return nil
308
}
309