Passed
Push — main ( 203737...4cb1c0 )
by Acho
01:56
created

DeletedMessage   B

Complexity

Conditions 7

Size

Total Lines 39
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 28
dl 0
loc 39
rs 7.808
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"math/rand"
7
	"time"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
11
	"github.com/NdoleStudio/httpsms/pkg/entities"
12
	"github.com/NdoleStudio/httpsms/pkg/repositories"
13
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
14
	"github.com/google/uuid"
15
	"github.com/palantir/stacktrace"
16
)
17
18
// MessageThreadService is handles message requests
19
type MessageThreadService struct {
20
	service
21
	logger          telemetry.Logger
22
	tracer          telemetry.Tracer
23
	repository      repositories.MessageThreadRepository
24
	eventDispatcher *EventDispatcher
25
}
26
27
// NewMessageThreadService creates a new MessageThreadService
28
func NewMessageThreadService(
29
	logger telemetry.Logger,
30
	tracer telemetry.Tracer,
31
	repository repositories.MessageThreadRepository,
32
	eventDispatcher *EventDispatcher,
33
) (s *MessageThreadService) {
34
	return &MessageThreadService{
35
		logger:          logger.WithService(fmt.Sprintf("%T", s)),
36
		tracer:          tracer,
37
		eventDispatcher: eventDispatcher,
38
		repository:      repository,
39
	}
40
}
41
42
// MessageThreadUpdateParams are parameters for updating a thread
43
type MessageThreadUpdateParams struct {
44
	Owner     string
45
	Status    entities.MessageStatus
46
	Contact   string
47
	Content   string
48
	UserID    entities.UserID
49
	MessageID uuid.UUID
50
	Timestamp time.Time
51
}
52
53
// UpdateThread updates a thread between 2 parties when a timestamp changes
54
func (service *MessageThreadService) UpdateThread(ctx context.Context, params MessageThreadUpdateParams) error {
55
	ctx, span := service.tracer.Start(ctx)
56
	defer span.End()
57
58
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
59
60
	thread, err := service.repository.LoadByOwnerContact(ctx, params.UserID, params.Owner, params.Contact)
61
	if stacktrace.GetCode(err) == repositories.ErrCodeNotFound {
62
		ctxLogger.Info(fmt.Sprintf("cannot find thread with owner [%s], and contact [%s]. creating new thread", params.Owner, params.Contact))
63
		return service.createThread(ctx, params)
64
	}
65
66
	if err != nil {
67
		msg := fmt.Sprintf("cannot find thread with owner [%s], and contact [%s]. creating new thread", params.Owner, params.Contact)
68
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
69
	}
70
71
	if thread.OrderTimestamp.Unix() > params.Timestamp.Unix() && thread.Status != entities.MessageStatusSending && thread.HasLastMessage(params.MessageID) {
72
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("thread [%s] has timestamp [%s] and status [%s] which is greater than timestamp [%s] for message [%s] and status [%s]", thread.ID, thread.OrderTimestamp, thread.Status, params.Timestamp, params.MessageID, params.Status)))
73
		return nil
74
	}
75
76
	if thread.Status == entities.MessageStatusDelivered && thread.LastMessageID != nil && thread.HasLastMessage(params.MessageID) {
77
		ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("thread [%s] already has status [%s] not updating with status [%s] for message [%s]", thread.ID, thread.Status, params.Status, params.MessageID)))
78
		return nil
79
	}
80
81
	if err = service.repository.Update(ctx, thread.Update(params.Timestamp, params.MessageID, params.Content, params.Status)); err != nil {
82
		msg := fmt.Sprintf("cannot update message thread with id [%s] after adding message [%s]", thread.ID, params.MessageID)
83
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
84
	}
85
86
	ctxLogger.Info(fmt.Sprintf("thread with id [%s] updated with last message [%s] and status [%s]", thread.ID, thread.LastMessageID, thread.Status))
87
	return nil
88
}
89
90
// MessageThreadStatusParams are parameters for updating a thread status
91
type MessageThreadStatusParams struct {
92
	IsArchived      bool
93
	UserID          entities.UserID
94
	MessageThreadID uuid.UUID
95
}
96
97
// UpdateStatus updates a thread between an owner and a contact
98
func (service *MessageThreadService) UpdateStatus(ctx context.Context, params MessageThreadStatusParams) (*entities.MessageThread, error) {
99
	ctx, span := service.tracer.Start(ctx)
100
	defer span.End()
101
102
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
103
104
	thread, err := service.repository.Load(ctx, params.UserID, params.MessageThreadID)
105
	if err != nil {
106
		msg := fmt.Sprintf("cannot find thread with id [%s]", params.MessageThreadID)
107
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
108
	}
109
110
	if err = service.repository.Update(ctx, thread.UpdateArchive(params.IsArchived)); err != nil {
111
		msg := fmt.Sprintf("cannot update message thread with id [%s] with archive status [%t]", thread.ID, params.IsArchived)
112
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
113
	}
114
115
	ctxLogger.Info(fmt.Sprintf("thread with id [%s] updated with archive status [%t]", thread.ID, thread.IsArchived))
116
	return thread, nil
117
}
118
119
// UpdateAfterDeletedMessage updates a thread after the last message has been deleted
120
func (service *MessageThreadService) UpdateAfterDeletedMessage(ctx context.Context, payload *events.MessageAPIDeletedPayload) error {
121
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
122
	defer span.End()
123
124
	thread, err := service.repository.LoadByOwnerContact(ctx, payload.UserID, payload.Owner, payload.Contact)
125
	if err != nil {
126
		msg := fmt.Sprintf("cannot find thread for user [%s] with owner [%s], and contact [%s]", payload.UserID, payload.Owner, payload.Contact)
127
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
128
	}
129
130
	if payload.PreviousMessageID == nil {
131
		if err = service.repository.Delete(ctx, thread.UserID, thread.ID); err != nil {
132
			msg := fmt.Sprintf("cannot delete thread with ID [%s] for user [%s] and owner [%s]", thread.ID, thread.UserID, thread.Owner)
133
			ctxLogger.Error(stacktrace.Propagate(err, msg))
134
			return nil
135
		}
136
		msg := fmt.Sprintf("previous message ID is nil for thread with ID [%s] and user [%s]", thread.ID, thread.UserID)
137
		ctxLogger.Info(msg)
138
		return nil
139
	}
140
141
	if thread.LastMessageID != nil && *thread.LastMessageID != payload.MessageID {
142
		msg := fmt.Sprintf("last message ID [%s] does not match message ID [%s] for thread with ID [%s]", *thread.LastMessageID, payload.MessageID, thread.ID)
143
		ctxLogger.Info(msg)
144
		return nil
145
	}
146
147
	thread.LastMessageContent = payload.PreviousMessageContent
148
	thread.LastMessageID = payload.PreviousMessageID
149
	thread.Status = *payload.PreviousMessageStatus
150
	thread.UpdatedAt = time.Now().UTC()
151
152
	if err = service.repository.Update(ctx, thread); err != nil {
153
		msg := fmt.Sprintf("cannot update thread with ID [%s] for user with ID [%s]", thread.ID, thread.UserID)
154
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
155
	}
156
157
	ctxLogger.Info(fmt.Sprintf("last message has been removed from thread with ID [%s] and userID [%s]", thread.ID, thread.UserID))
158
	return nil
159
}
160
161
func (service *MessageThreadService) createThread(ctx context.Context, params MessageThreadUpdateParams) error {
162
	ctx, span := service.tracer.Start(ctx)
163
	defer span.End()
164
165
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
166
167
	thread := &entities.MessageThread{
168
		ID:                 uuid.New(),
169
		Owner:              params.Owner,
170
		Contact:            params.Contact,
171
		UserID:             params.UserID,
172
		IsArchived:         false,
173
		Color:              service.getColor(),
174
		LastMessageContent: &params.Content,
175
		Status:             params.Status,
176
		LastMessageID:      &params.MessageID,
177
		CreatedAt:          time.Now().UTC(),
178
		UpdatedAt:          time.Now().UTC(),
179
		OrderTimestamp:     params.Timestamp,
180
	}
181
182
	if err := service.repository.Store(ctx, thread); err != nil {
183
		msg := fmt.Sprintf("cannot store thread with id [%s] for message with ID [%s]", thread.ID, params.MessageID)
184
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
185
	}
186
187
	ctxLogger.Info(fmt.Sprintf(
188
		"created thread [%s] for message ID [%s] with owner [%s] and contact [%s]",
189
		thread.ID,
190
		thread.LastMessageID,
191
		thread.Owner,
192
		thread.Contact,
193
	))
194
195
	return nil
196
}
197
198
func (service *MessageThreadService) getColor() string {
199
	colors := []string{
200
		"deep-purple",
201
		"indigo",
202
		"blue",
203
		"red",
204
		"pink",
205
		"purple",
206
		"light-blue",
207
		"cyan",
208
		"teal",
209
		"green",
210
		"light-green",
211
		"lime",
212
		"yellow",
213
		"amber",
214
		"orange",
215
		"deep-orange",
216
		"brown",
217
	}
218
	generator := rand.New(rand.NewSource(time.Now().UnixNano()))
219
	return colors[generator.Intn(len(colors))]
220
}
221
222
// MessageThreadGetParams parameters fetching threads
223
type MessageThreadGetParams struct {
224
	repositories.IndexParams
225
	IsArchived bool
226
	UserID     entities.UserID
227
	Owner      string
228
}
229
230
// GetThreads fetches threads for an owner
231
func (service *MessageThreadService) GetThreads(ctx context.Context, params MessageThreadGetParams) (*[]entities.MessageThread, error) {
232
	ctx, span := service.tracer.Start(ctx)
233
	defer span.End()
234
235
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
236
237
	threads, err := service.repository.Index(ctx, params.UserID, params.Owner, params.IsArchived, params.IndexParams)
238
	if err != nil {
239
		msg := fmt.Sprintf("could not fetch messages threads for params [%+#v]", params)
240
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
241
	}
242
243
	ctxLogger.Info(fmt.Sprintf("fetched [%d] threads with params [%+#v]", len(*threads), params))
244
	return threads, nil
245
}
246
247
// GetThread fetches an entities.MessageThread  message thread by the ID
248
func (service *MessageThreadService) GetThread(ctx context.Context, userID entities.UserID, messageThreadID uuid.UUID) (*entities.MessageThread, error) {
249
	ctx, span := service.tracer.Start(ctx)
250
	defer span.End()
251
252
	thread, err := service.repository.Load(ctx, userID, messageThreadID)
253
	if err != nil {
254
		msg := fmt.Sprintf("could not fetch thread with ID [%s] for user [%s]", messageThreadID, userID)
255
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
256
	}
257
258
	return thread, nil
259
}
260
261
// DeleteThread deletes an entities.MessageThread from the database
262
func (service *MessageThreadService) DeleteThread(ctx context.Context, source string, thread *entities.MessageThread) error {
263
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
264
	defer span.End()
265
266
	if err := service.repository.Delete(ctx, thread.UserID, thread.ID); err != nil {
267
		msg := fmt.Sprintf("could not delete message thread with ID [%s] for user with ID [%s]", thread.ID, thread.UserID)
268
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
269
	}
270
271
	event, err := service.createEvent(events.MessageThreadAPIDeleted, source, &events.MessageThreadAPIDeletedPayload{
272
		MessageThreadID: thread.ID,
273
		UserID:          thread.UserID,
274
		Owner:           thread.Owner,
275
		Contact:         thread.Contact,
276
		IsArchived:      thread.IsArchived,
277
		Color:           thread.Color,
278
		Status:          thread.Status,
279
		Timestamp:       time.Now().UTC(),
280
	})
281
	if err != nil {
282
		msg := fmt.Sprintf("cannot create [%T] for message thread dleted with ID [%s]", event, thread.ID)
283
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
284
	}
285
286
	ctxLogger.Info(fmt.Sprintf("created event [%s] with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID))
287
	if err = service.eventDispatcher.Dispatch(ctx, event); err != nil {
288
		msg := fmt.Sprintf("cannot dispatch event [%s] with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID)
289
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
290
	}
291
292
	ctxLogger.Info(fmt.Sprintf("dispatched [%s] event with id [%s] for message thread [%s]", event.Type(), event.ID(), thread.ID))
293
	return nil
294
}
295