Passed
Push — main ( 570b65...2ff189 )
by Acho
02:48
created

services.*DiscordService.DeleteAllForUser   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
dl 0
loc 11
rs 10
c 0
b 0
f 0
nop 2
1
package services
2
3
import (
4
	"context"
5
	"fmt"
6
	"sync"
7
	"time"
8
9
	"github.com/NdoleStudio/httpsms/pkg/events"
10
	cloudevents "github.com/cloudevents/sdk-go/v2"
11
	"github.com/gofiber/fiber/v2"
12
13
	"github.com/NdoleStudio/httpsms/pkg/discord"
14
15
	"github.com/NdoleStudio/httpsms/pkg/entities"
16
	"github.com/NdoleStudio/httpsms/pkg/repositories"
17
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
18
	"github.com/google/uuid"
19
	"github.com/palantir/stacktrace"
20
)
21
22
// DiscordService is responsible for handling discordIntegrations
23
type DiscordService struct {
24
	service
25
	logger     telemetry.Logger
26
	tracer     telemetry.Tracer
27
	client     *discord.Client
28
	dispatcher *EventDispatcher
29
	repository repositories.DiscordRepository
30
}
31
32
// NewDiscordService creates a new DiscordService
33
func NewDiscordService(
34
	logger telemetry.Logger,
35
	tracer telemetry.Tracer,
36
	client *discord.Client,
37
	repository repositories.DiscordRepository,
38
	dispatcher *EventDispatcher,
39
) (s *DiscordService) {
40
	return &DiscordService{
41
		logger:     logger.WithService(fmt.Sprintf("%T", s)),
42
		tracer:     tracer,
43
		client:     client,
44
		dispatcher: dispatcher,
45
		repository: repository,
46
	}
47
}
48
49
// GetByServerID fetches the entities.Discord by the serverID
50
func (service *DiscordService) GetByServerID(ctx context.Context, serverID string) (*entities.Discord, error) {
51
	ctx, span, _ := service.tracer.StartWithLogger(ctx, service.logger)
52
	defer span.End()
53
	return service.repository.FindByServerID(ctx, serverID)
54
}
55
56
// DeleteAllForUser deletes all entities.Discord for an entities.UserID.
57
func (service *DiscordService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
58
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
59
	defer span.End()
60
61
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
62
		msg := fmt.Sprintf("could not delete all [entities.Discord] for user with ID [%s]", userID)
63
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
64
	}
65
66
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.Discord] for user with ID [%s]", userID))
67
	return nil
68
}
69
70
// Index fetches the entities.Discord for an entities.UserID
71
func (service *DiscordService) Index(ctx context.Context, userID entities.UserID, params repositories.IndexParams) ([]*entities.Discord, error) {
72
	ctx, span := service.tracer.Start(ctx)
73
	defer span.End()
74
75
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
76
77
	discordIntegrations, err := service.repository.Index(ctx, userID, params)
78
	if err != nil {
79
		msg := fmt.Sprintf("could not fetch discord integrations with params [%+#v]", params)
80
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
81
	}
82
83
	ctxLogger.Info(fmt.Sprintf("fetched [%d] discord integrations with prams [%+#v]", len(discordIntegrations), params))
84
	return discordIntegrations, nil
85
}
86
87
// Delete an entities.Discord
88
func (service *DiscordService) Delete(ctx context.Context, userID entities.UserID, discordID uuid.UUID) error {
89
	ctx, span := service.tracer.Start(ctx)
90
	defer span.End()
91
92
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
93
94
	if _, err := service.repository.Load(ctx, userID, discordID); err != nil {
95
		msg := fmt.Sprintf("cannot load discord integration with userID [%s] and discordID [%s]", userID, discordID)
96
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
97
	}
98
99
	if err := service.repository.Delete(ctx, userID, discordID); err != nil {
100
		msg := fmt.Sprintf("cannot delete discord integration with id [%s] and discordID [%s]", discordID, userID)
101
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
102
	}
103
104
	ctxLogger.Info(fmt.Sprintf("deleted discord integration with id [%s] and user id [%s]", discordID, userID))
105
	return nil
106
}
107
108
// DiscordStoreParams are parameters for creating a new entities.Discord
109
type DiscordStoreParams struct {
110
	UserID            entities.UserID
111
	Name              string
112
	ServerID          string
113
	IncomingChannelID string
114
}
115
116
// Store a new entities.Discord
117
func (service *DiscordService) Store(ctx context.Context, params *DiscordStoreParams) (*entities.Discord, error) {
118
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
119
	defer span.End()
120
121
	if err := service.createSlashCommand(ctx, params.ServerID); err != nil {
122
		msg := fmt.Sprintf("cannot create slash command for server [%s]", params.ServerID)
123
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
124
	}
125
126
	discordIntegration := &entities.Discord{
127
		ID:                uuid.New(),
128
		UserID:            params.UserID,
129
		Name:              params.Name,
130
		ServerID:          params.ServerID,
131
		IncomingChannelID: params.IncomingChannelID,
132
		CreatedAt:         time.Now().UTC(),
133
		UpdatedAt:         time.Now().UTC(),
134
	}
135
136
	if err := service.repository.Save(ctx, discordIntegration); err != nil {
137
		msg := fmt.Sprintf("cannot save discord integration with id [%s]", discordIntegration.ID)
138
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
139
	}
140
141
	ctxLogger.Info(fmt.Sprintf("discord integration saved with id [%s] in the [%T]", discordIntegration.ID, service.repository))
142
	return discordIntegration, nil
143
}
144
145
func (service *DiscordService) createSlashCommand(ctx context.Context, serverID string) error {
146
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
147
	defer span.End()
148
149
	command, _, err := service.client.Application.CreateCommand(ctx, serverID, &discord.CommandCreateRequest{
150
		Name:        "httpsms",
151
		Type:        1,
152
		Description: "Send an SMS via httpsms.com",
153
		Options: []discord.CommandCreateRequestOption{
154
			{
155
				Name:        "from",
156
				Description: "Sender phone number",
157
				Type:        3,
158
				Required:    true,
159
			},
160
			{
161
				Name:        "to",
162
				Description: "Recipient phone number",
163
				Type:        3,
164
				Required:    true,
165
			},
166
			{
167
				Name:        "message",
168
				Description: "Text message content",
169
				Type:        3,
170
				Required:    true,
171
			},
172
		},
173
	})
174
	if err != nil {
175
		msg := fmt.Sprintf("cannot create slash command for server [%s]", serverID)
176
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
177
	}
178
179
	ctxLogger.Info(fmt.Sprintf("upserted a slash command with ID [%s] for discord server [%s] and applicationID [%s]", command.ID, serverID, command.ApplicationID))
180
	return nil
181
}
182
183
// DiscordUpdateParams are parameters for updating an entities.Discord
184
type DiscordUpdateParams struct {
185
	UserID            entities.UserID
186
	Name              string
187
	ServerID          string
188
	IncomingChannelID string
189
	DiscordID         uuid.UUID
190
}
191
192
// Update an entities.Discord
193
func (service *DiscordService) Update(ctx context.Context, params *DiscordUpdateParams) (*entities.Discord, error) {
194
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
195
	defer span.End()
196
197
	discordIntegration, err := service.repository.Load(ctx, params.UserID, params.DiscordID)
198
	if err != nil {
199
		msg := fmt.Sprintf("cannot load discord integration with userID [%s] and discordID [%s]", params.UserID, params.DiscordID)
200
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
201
	}
202
203
	if err = service.createSlashCommand(ctx, params.ServerID); err != nil {
204
		msg := fmt.Sprintf("cannot create slash command for server [%s]", params.ServerID)
205
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
206
	}
207
208
	discordIntegration.Name = params.Name
209
	discordIntegration.ServerID = params.ServerID
210
	discordIntegration.IncomingChannelID = params.IncomingChannelID
211
212
	if err = service.repository.Save(ctx, discordIntegration); err != nil {
213
		msg := fmt.Sprintf("cannot save discord integration with id [%s] after update", discordIntegration.ID)
214
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
215
	}
216
217
	ctxLogger.Info(fmt.Sprintf("discord integration updated with id [%s] in the [%T]", discordIntegration.ID, service.repository))
218
	return discordIntegration, nil
219
}
220
221
// HandleMessageReceived sends an incoming SMS to a discord channel
222
func (service *DiscordService) HandleMessageReceived(ctx context.Context, userID entities.UserID, event cloudevents.Event) error {
223
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
224
	defer span.End()
225
226
	discordIntegrations, err := service.repository.FetchHavingIncomingChannel(ctx, userID)
227
	if err != nil {
228
		msg := fmt.Sprintf("cannot load discord integrations for user with ID [%s]", userID)
229
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
230
	}
231
232
	if len(discordIntegrations) == 0 {
233
		ctxLogger.Info(fmt.Sprintf("user [%s] has no discord integration for event [%s]", userID, event.Type()))
234
		return nil
235
	}
236
237
	var wg sync.WaitGroup
238
	for _, discordIntegration := range discordIntegrations {
239
		wg.Add(1)
240
		go func(webhook *entities.Discord) {
241
			defer wg.Done()
242
			service.sendMessage(ctx, event, webhook)
243
		}(discordIntegration)
244
	}
245
	wg.Wait()
246
247
	return nil
248
}
249
250
func (service *DiscordService) sendMessage(ctx context.Context, event cloudevents.Event, discord *entities.Discord) {
251
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
252
	defer span.End()
253
254
	payload := new(events.MessagePhoneReceivedPayload)
255
	if err := event.DataAs(payload); err != nil {
256
		ctxLogger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot unmarshal event [%s] with ID [%s] into [%T]", event.Type(), event.ID(), payload)))
257
		return
258
	}
259
260
	request := service.createDiscordMessage(ctxLogger, payload)
261
	message, response, err := service.client.Channel.CreateMessage(ctx, discord.IncomingChannelID, request)
262
	if err != nil {
263
		msg := fmt.Sprintf("cannot send [%s] event to discord channel [%s] for user [%s]", event.Type(), discord.IncomingChannelID, discord.UserID)
264
		ctxLogger.Warn(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
265
266
		eventPayload := &events.DiscordSendFailedPayload{
267
			DiscordID:        discord.ID,
268
			UserID:           discord.UserID,
269
			MessageID:        payload.MessageID,
270
			Owner:            payload.Owner,
271
			EventType:        event.Type(),
272
			ErrorMessage:     err.Error(),
273
			DiscordChannelID: discord.IncomingChannelID,
274
		}
275
276
		if response != nil {
277
			eventPayload.HTTPResponseStatusCode = &response.HTTPResponse.StatusCode
278
			eventPayload.ErrorMessage = string(*response.Body)
279
		}
280
281
		service.handleDiscordMessageFailed(ctx, event.Source(), eventPayload)
282
		return
283
	}
284
285
	ctxLogger.Info(fmt.Sprintf("sent discord message [%s] to channel [%s] for [%s] event with ID [%s]", message["id"].(string), discord.IncomingChannelID, event.Type(), event.ID()))
286
}
287
288
func (service *DiscordService) createDiscordMessage(ctxLogger telemetry.Logger, payload *events.MessagePhoneReceivedPayload) fiber.Map {
289
	return fiber.Map{
290
		"content": "✉ new message received",
291
		"embeds": []fiber.Map{
292
			{
293
				"fields": []fiber.Map{
294
					{
295
						"name":   "From:",
296
						"value":  service.getFormattedNumber(ctxLogger, payload.Contact),
297
						"inline": true,
298
					},
299
					{
300
						"name":   "To:",
301
						"value":  service.getFormattedNumber(ctxLogger, payload.Owner),
302
						"inline": true,
303
					},
304
					{
305
						"name":  "Content:",
306
						"value": payload.Content,
307
					},
308
					{
309
						"name":  "MessageID:",
310
						"value": payload.MessageID,
311
					},
312
				},
313
			},
314
		},
315
	}
316
}
317
318
func (service *DiscordService) handleDiscordMessageFailed(ctx context.Context, source string, payload *events.DiscordSendFailedPayload) {
319
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
320
	defer span.End()
321
322
	event, err := service.createEvent(events.EventTypeDiscordSendFailed, source, payload)
323
	if err != nil {
324
		msg := fmt.Sprintf("cannot create event [%s] for user with id [%s]", events.EventTypeDiscordSendFailed, payload.UserID)
325
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
326
		return
327
	}
328
329
	if err = service.dispatcher.Dispatch(ctx, event); err != nil {
330
		msg := fmt.Sprintf("cannot dispatch event [%s] for user with id [%s]", event.Type(), payload.UserID)
331
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
332
		return
333
	}
334
335
	ctxLogger.Info(fmt.Sprintf("dispatched event [%s] for user with id [%s]", event.Type(), payload.UserID))
336
}
337