Passed
Push — main ( 203737...4cb1c0 )
by Acho
01:56
created

repositories.*gormMessageRepository.Search   B

Complexity

Conditions 7

Size

Total Lines 44
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 31
dl 0
loc 44
rs 7.736
c 0
b 0
f 0
nop 6
1
package repositories
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
8
	"gorm.io/gorm/clause"
9
10
	"github.com/NdoleStudio/httpsms/pkg/entities"
11
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
12
	"github.com/cockroachdb/cockroach-go/v2/crdb/crdbgorm"
13
	"github.com/google/uuid"
14
	"github.com/palantir/stacktrace"
15
	"gorm.io/gorm"
16
)
17
18
// gormMessageRepository is responsible for persisting entities.Message
19
type gormMessageRepository struct {
20
	logger telemetry.Logger
21
	tracer telemetry.Tracer
22
	db     *gorm.DB
23
}
24
25
// NewGormMessageRepository creates the GORM version of the MessageRepository
26
func NewGormMessageRepository(
27
	logger telemetry.Logger,
28
	tracer telemetry.Tracer,
29
	db *gorm.DB,
30
) MessageRepository {
31
	return &gormMessageRepository{
32
		logger: logger.WithService(fmt.Sprintf("%T", &gormMessageRepository{})),
33
		tracer: tracer,
34
		db:     db,
35
	}
36
}
37
38
// DeleteByOwnerAndContact deletes all the messages between and owner and a contact
39
func (repository *gormMessageRepository) DeleteByOwnerAndContact(ctx context.Context, userID entities.UserID, owner string, contact string) error {
40
	ctx, span := repository.tracer.Start(ctx)
41
	defer span.End()
42
43
	err := repository.db.WithContext(ctx).
44
		Where("user_id = ?", userID).
45
		Where("owner = ?", owner).
46
		Where("contact = ?", contact).
47
		Delete(&entities.Message{}).
48
		Error
49
	if err != nil {
50
		msg := fmt.Sprintf("cannot delete messages between owner [%s] and contact [%s] for user with ID [%s]", owner, contact, userID)
51
		return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
52
	}
53
54
	return nil
55
}
56
57
// Delete a message by the ID
58
func (repository *gormMessageRepository) Delete(ctx context.Context, userID entities.UserID, messageID uuid.UUID) error {
59
	ctx, span := repository.tracer.Start(ctx)
60
	defer span.End()
61
62
	err := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("id = ?", messageID).Delete(&entities.Message{}).Error
63
	if err != nil {
64
		msg := fmt.Sprintf("cannot delete message with ID [%s] for user with ID [%s]", messageID, userID)
65
		return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
66
	}
67
68
	return nil
69
}
70
71
// Index entities.Message between 2 parties
72
func (repository *gormMessageRepository) Index(ctx context.Context, userID entities.UserID, owner string, contact string, params IndexParams) (*[]entities.Message, error) {
73
	ctx, span := repository.tracer.Start(ctx)
74
	defer span.End()
75
76
	query := repository.db.
77
		WithContext(ctx).
78
		Where("user_id = ?", userID).
79
		Where("owner = ?", owner).
80
		Where("contact =  ?", contact)
81
	if len(params.Query) > 0 {
82
		queryPattern := "%" + params.Query + "%"
83
		query.Where("content ILIKE ?", queryPattern)
84
	}
85
86
	messages := new([]entities.Message)
87
	if err := query.Order("order_timestamp DESC").Limit(params.Limit).Offset(params.Skip).Find(&messages).Error; err != nil {
88
		msg := fmt.Sprintf("cannot fetch messges with owner [%s] and contact [%s] and params [%+#v]", owner, contact, params)
89
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
90
	}
91
92
	return messages, nil
93
}
94
95
func (repository *gormMessageRepository) LastMessage(ctx context.Context, userID entities.UserID, owner string, contact string) (*entities.Message, error) {
96
	ctx, span := repository.tracer.Start(ctx)
97
	defer span.End()
98
99
	query := repository.db.
100
		WithContext(ctx).
101
		Where("user_id = ?", userID).
102
		Where("owner = ?", owner).
103
		Where("contact =  ?", contact)
104
105
	message := new(entities.Message)
106
107
	err := query.Order("order_timestamp DESC").First(&message).Error
108
	if errors.Is(err, gorm.ErrRecordNotFound) {
109
		msg := fmt.Sprintf("cannot get last message for [%s] with owner [%s] and contact [%s]", userID, owner, contact)
110
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
111
	}
112
113
	if err != nil {
114
		msg := fmt.Sprintf("cannot get last message for [%s] with owner [%s] and contact [%s]", userID, owner, contact)
115
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
116
	}
117
118
	return message, nil
119
}
120
121
func (repository *gormMessageRepository) Search(ctx context.Context, userID entities.UserID, owners []string, types []entities.MessageType, statuses []entities.MessageStatus, params IndexParams) ([]*entities.Message, error) {
122
	ctx, span := repository.tracer.Start(ctx)
123
	defer span.End()
124
125
	query := repository.db.
126
		WithContext(ctx).
127
		Where("user_id = ?", userID)
128
129
	if len(owners) > 0 {
130
		query = query.Where("owner IN ?", owners)
131
	}
132
	if len(types) > 0 {
133
		query = query.Where("type IN ?", types)
134
	}
135
	if len(statuses) > 0 {
136
		query = query.Where("status IN ?", statuses)
137
	}
138
139
	if len(params.Query) > 0 {
140
		queryPattern := "%" + params.Query + "%"
141
		subQuery := repository.db.Where("content ILIKE ?", queryPattern).
142
			Or("contact ILIKE ?", queryPattern).
143
			Or("failure_reason ILIKE ?", queryPattern).
144
			Or("request_id ILIKE ?", queryPattern)
145
146
		if _, err := uuid.Parse(params.Query); err == nil {
147
			subQuery = subQuery.Or("id = ?", params.Query)
148
		}
149
150
		query = query.Where(subQuery)
151
	}
152
153
	messages := make([]*entities.Message, 0, params.Limit)
154
	err := query.Order(repository.order(params, "created_at")).
155
		Limit(params.Limit).
156
		Offset(params.Skip).
157
		Find(&messages).
158
		Error
159
	if err != nil {
160
		msg := fmt.Sprintf("cannot search messages with for user [%s] params [%+#v]", userID, params)
161
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
162
	}
163
164
	return messages, nil
165
}
166
167
// Store a new entities.Message
168
func (repository *gormMessageRepository) Store(ctx context.Context, message *entities.Message) error {
169
	ctx, span := repository.tracer.Start(ctx)
170
	defer span.End()
171
172
	if err := repository.db.WithContext(ctx).Create(message).Error; err != nil {
173
		msg := fmt.Sprintf("cannot save message with ID [%s]", message.ID)
174
		return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
175
	}
176
177
	return nil
178
}
179
180
// Load an entities.Message by ID
181
func (repository *gormMessageRepository) Load(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
182
	ctx, span := repository.tracer.Start(ctx)
183
	defer span.End()
184
185
	message := new(entities.Message)
186
	err := repository.db.WithContext(ctx).Where("user_id = ?", userID).Where("id = ?", messageID).First(message).Error
187
	if errors.Is(err, gorm.ErrRecordNotFound) {
188
		msg := fmt.Sprintf("message with ID [%s] and userID [%s] does not exist", messageID, userID)
189
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
190
	}
191
192
	if err != nil {
193
		msg := fmt.Sprintf("cannot load message with ID [%s]", messageID)
194
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
195
	}
196
197
	return message, nil
198
}
199
200
// Update an entities.Message
201
func (repository *gormMessageRepository) Update(ctx context.Context, message *entities.Message) error {
202
	ctx, span := repository.tracer.Start(ctx)
203
	defer span.End()
204
205
	if err := repository.db.WithContext(ctx).Save(message).Error; err != nil {
206
		msg := fmt.Sprintf("cannot update message with ID [%s]", message.ID)
207
		return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
208
	}
209
210
	return nil
211
}
212
213
// GetOutstanding fetches messages that still to be sent to the phone
214
func (repository *gormMessageRepository) GetOutstanding(ctx context.Context, userID entities.UserID, messageID uuid.UUID) (*entities.Message, error) {
215
	ctx, span := repository.tracer.Start(ctx)
216
	defer span.End()
217
218
	message := new(entities.Message)
219
	err := crdbgorm.ExecuteTx(ctx, repository.db, nil,
220
		func(tx *gorm.DB) error {
221
			return tx.WithContext(ctx).Model(message).
222
				Clauses(clause.Returning{}).
223
				Where("user_id = ?", userID).
224
				Where("id = ?", messageID).
225
				Where(repository.db.Where("status = ?", entities.MessageStatusScheduled).Or("status = ?", entities.MessageStatusPending).Or("status = ?", entities.MessageStatusExpired)).
226
				Update("status", entities.MessageStatusSending).Error
227
		},
228
	)
229
	if errors.Is(err, gorm.ErrRecordNotFound) {
230
		msg := fmt.Sprintf("outstanding message with ID [%s] and userID [%s] does not exist", messageID, userID)
231
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
232
	}
233
234
	if err != nil {
235
		msg := fmt.Sprintf("cannot fetch outstanding message with userID [%s] and messageID [%s]", userID, messageID)
236
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
237
	}
238
239
	if message == nil || message.ID == uuid.Nil {
240
		msg := fmt.Sprintf("outstanding message with ID [%s] and userID [%s] does not exist", messageID, userID)
241
		return nil, repository.tracer.WrapErrorSpan(span, stacktrace.NewErrorWithCode(ErrCodeNotFound, msg))
242
	}
243
244
	return message, nil
245
}
246
247
func (repository *gormMessageRepository) order(params IndexParams, defaultSortBy string) string {
248
	sortBy := defaultSortBy
249
	if len(params.SortBy) > 0 {
250
		sortBy = params.SortBy
251
	}
252
253
	direction := "ASC"
254
	if params.SortDescending {
255
		direction = "DESC"
256
	}
257
258
	return fmt.Sprintf("%s %s", sortBy, direction)
259
}
260