Completed
Push — main ( 5f0133...658183 )
by Acho
16s
created

services.*WebhookService.sendNotification   C

Complexity

Conditions 10

Size

Total Lines 49
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 35
dl 0
loc 49
rs 5.9999
c 0
b 0
f 0
nop 4

How to fix   Complexity   

Complexity

Complex classes like services.*WebhookService.sendNotification often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package services
2
3
import (
4
	"bytes"
5
	"context"
6
	"encoding/json"
7
	"fmt"
8
	"io"
9
	"net/http"
10
	"strings"
11
	"sync"
12
	"time"
13
14
	"github.com/avast/retry-go"
15
	"github.com/pkg/errors"
16
17
	"github.com/gofiber/fiber/v2"
18
19
	"github.com/NdoleStudio/httpsms/pkg/events"
20
21
	"github.com/NdoleStudio/httpsms/pkg/entities"
22
	"github.com/NdoleStudio/httpsms/pkg/repositories"
23
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
24
	cloudevents "github.com/cloudevents/sdk-go/v2"
25
	"github.com/golang-jwt/jwt"
26
	"github.com/google/uuid"
27
	"github.com/lib/pq"
28
	"github.com/palantir/stacktrace"
29
)
30
31
// WebhookService is responsible for handling webhooks
32
type WebhookService struct {
33
	service
34
	logger     telemetry.Logger
35
	tracer     telemetry.Tracer
36
	client     *http.Client
37
	repository repositories.WebhookRepository
38
	dispatcher *EventDispatcher
39
}
40
41
// NewWebhookService creates a new WebhookService
42
func NewWebhookService(
43
	logger telemetry.Logger,
44
	tracer telemetry.Tracer,
45
	client *http.Client,
46
	repository repositories.WebhookRepository,
47
	dispatcher *EventDispatcher,
48
) (s *WebhookService) {
49
	return &WebhookService{
50
		logger:     logger.WithService(fmt.Sprintf("%T", s)),
51
		tracer:     tracer,
52
		client:     client,
53
		dispatcher: dispatcher,
54
		repository: repository,
55
	}
56
}
57
58
// DeleteAllForUser deletes all entities.Webhook for an entities.UserID.
59
func (service *WebhookService) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
60
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
61
	defer span.End()
62
63
	if err := service.repository.DeleteAllForUser(ctx, userID); err != nil {
64
		msg := fmt.Sprintf("could not delete all [entities.Webhook] for user with ID [%s]", userID)
65
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
66
	}
67
68
	ctxLogger.Info(fmt.Sprintf("deleted all [entities.Webhook] for user with ID [%s]", userID))
69
	return nil
70
}
71
72
// Index fetches the entities.Webhook for an entities.UserID
73
func (service *WebhookService) Index(ctx context.Context, userID entities.UserID, params repositories.IndexParams) ([]*entities.Webhook, error) {
74
	ctx, span := service.tracer.Start(ctx)
75
	defer span.End()
76
77
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
78
79
	webhooks, err := service.repository.Index(ctx, userID, params)
80
	if err != nil {
81
		msg := fmt.Sprintf("could not fetch webhooks with params [%+#v]", params)
82
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
83
	}
84
85
	ctxLogger.Info(fmt.Sprintf("fetched [%d] webhooks with prams [%+#v]", len(webhooks), params))
86
	return webhooks, nil
87
}
88
89
// Delete an entities.Webhook
90
func (service *WebhookService) Delete(ctx context.Context, userID entities.UserID, webhookID uuid.UUID) error {
91
	ctx, span := service.tracer.Start(ctx)
92
	defer span.End()
93
94
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
95
96
	if _, err := service.repository.Load(ctx, userID, webhookID); err != nil {
97
		msg := fmt.Sprintf("cannot load webhook with userID [%s] and phoneID [%s]", userID, webhookID)
98
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
99
	}
100
101
	if err := service.repository.Delete(ctx, userID, webhookID); err != nil {
102
		msg := fmt.Sprintf("cannot delete webhook with id [%s] and user id [%s]", webhookID, userID)
103
		return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
104
	}
105
106
	ctxLogger.Info(fmt.Sprintf("deleted webhook with id [%s] and user id [%s]", webhookID, userID))
107
	return nil
108
}
109
110
// WebhookStoreParams are parameters for creating a new entities.Webhook
111
type WebhookStoreParams struct {
112
	UserID       entities.UserID
113
	SigningKey   string
114
	URL          string
115
	PhoneNumbers pq.StringArray
116
	Events       pq.StringArray
117
}
118
119
// Store a new entities.Webhook
120
func (service *WebhookService) Store(ctx context.Context, params *WebhookStoreParams) (*entities.Webhook, error) {
121
	ctx, span := service.tracer.Start(ctx)
122
	defer span.End()
123
124
	ctxLogger := service.tracer.CtxLogger(service.logger, span)
125
126
	webhook := &entities.Webhook{
127
		ID:           uuid.New(),
128
		UserID:       params.UserID,
129
		URL:          params.URL,
130
		PhoneNumbers: params.PhoneNumbers,
131
		SigningKey:   params.SigningKey,
132
		Events:       params.Events,
133
		CreatedAt:    time.Now().UTC(),
134
		UpdatedAt:    time.Now().UTC(),
135
	}
136
137
	if err := service.repository.Save(ctx, webhook); err != nil {
138
		msg := fmt.Sprintf("cannot save webhook with id [%s]", webhook.ID)
139
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
140
	}
141
142
	ctxLogger.Info(fmt.Sprintf("webhook saved with id [%s] for user [%s] in the [%T]", webhook.ID, webhook.UserID, service.repository))
143
	return webhook, nil
144
}
145
146
// WebhookUpdateParams are parameters for updating an entities.Webhook
147
type WebhookUpdateParams struct {
148
	UserID       entities.UserID
149
	SigningKey   string
150
	URL          string
151
	Events       pq.StringArray
152
	PhoneNumbers pq.StringArray
153
	WebhookID    uuid.UUID
154
}
155
156
// Update an entities.Webhook
157
func (service *WebhookService) Update(ctx context.Context, params *WebhookUpdateParams) (*entities.Webhook, error) {
158
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
159
	defer span.End()
160
161
	webhook, err := service.repository.Load(ctx, params.UserID, params.WebhookID)
162
	if err != nil {
163
		msg := fmt.Sprintf("cannot load webhook with userID [%s] and phoneID [%s]", params.UserID, params.WebhookID)
164
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
165
	}
166
167
	webhook.URL = params.URL
168
	webhook.SigningKey = params.SigningKey
169
	webhook.Events = params.Events
170
	webhook.PhoneNumbers = params.PhoneNumbers
171
172
	if err = service.repository.Save(ctx, webhook); err != nil {
173
		msg := fmt.Sprintf("cannot save webhook with id [%s] after update", webhook.ID)
174
		return nil, service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
175
	}
176
177
	ctxLogger.Info(fmt.Sprintf("webhook updated with id [%s] in the [%T]", webhook.ID, service.repository))
178
	return webhook, nil
179
}
180
181
// Send an event to a subscribed webhook
182
func (service *WebhookService) Send(ctx context.Context, userID entities.UserID, event cloudevents.Event, phoneNumber string) error {
183
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
184
	defer span.End()
185
186
	webhooks, err := service.repository.LoadByEvent(ctx, userID, event.Type(), phoneNumber)
187
	if err != nil {
188
		msg := fmt.Sprintf("cannot load webhooks for userID [%s] and event [%s]", userID, event.Type())
189
		return service.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), msg))
190
	}
191
192
	if len(webhooks) == 0 {
193
		ctxLogger.Info(fmt.Sprintf("user [%s] has no webhook subscription to event [%s]", userID, event.Type()))
194
		return nil
195
	}
196
197
	var wg sync.WaitGroup
198
	for _, webhook := range webhooks {
199
		wg.Add(1)
200
		go func(webhook *entities.Webhook) {
201
			defer wg.Done()
202
			service.sendNotification(ctx, event, phoneNumber, webhook)
203
		}(webhook)
204
	}
205
	wg.Wait()
206
207
	return nil
208
}
209
210
func (service *WebhookService) sendNotification(ctx context.Context, event cloudevents.Event, owner string, webhook *entities.Webhook) {
211
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
212
	defer span.End()
213
214
	attempts := 0
215
	err := retry.Do(func() error {
216
		attempts++
217
218
		requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
219
		defer cancel()
220
221
		request, err := service.createRequest(requestCtx, event, webhook)
222
		if err != nil {
223
			msg := fmt.Sprintf("cannot create [%s] event to webhook [%s] for user [%s] after [%d] attempts", event.Type(), webhook.URL, webhook.UserID, attempts)
224
			return service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
225
		}
226
227
		response, err := service.client.Do(request)
228
		if err != nil {
229
			ctxLogger.Warn(stacktrace.Propagate(err, fmt.Sprintf("cannot send [%s] event to webhook [%s] for user [%s] after [%d] attempts", event.Type(), webhook.URL, webhook.UserID, attempts)))
230
			if attempts == 1 {
231
				return err
232
			}
233
			service.handleWebhookSendFailed(ctx, event, webhook, owner, err, nil)
234
			return nil
235
		}
236
237
		defer func() {
238
			err = response.Body.Close()
239
			if err != nil {
240
				ctxLogger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot close response body for [%s] event with ID [%s] after [%d] attempts", event.Type(), event.ID(), attempts)))
241
			}
242
		}()
243
244
		if response.StatusCode >= 400 {
245
			ctxLogger.Info(fmt.Sprintf("cannot send [%s] event to webhook [%s] for user [%s] with response code [%d]", event.Type(), webhook.URL, webhook.UserID, response.StatusCode))
246
			if attempts == 1 {
247
				return stacktrace.NewError(http.StatusText(response.StatusCode))
248
			}
249
			service.handleWebhookSendFailed(ctx, event, webhook, owner, stacktrace.NewError(http.StatusText(response.StatusCode)), response)
250
			return nil
251
		}
252
253
		ctxLogger.Info(fmt.Sprintf("sent webhook to url [%s] for event [%s] with ID [%s] and response code [%d]", webhook.URL, event.Type(), event.ID(), response.StatusCode))
254
		return nil
255
	}, retry.Attempts(2))
256
	if err != nil {
257
		msg := fmt.Sprintf("cannot handle [%s] event to webhook [%s] for user [%s] after [%d] attempts", event.Type(), webhook.URL, webhook.UserID, attempts)
258
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
259
	}
260
}
261
262
func (service *WebhookService) createRequest(ctx context.Context, event cloudevents.Event, webhook *entities.Webhook) (*http.Request, error) {
263
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
264
	defer span.End()
265
266
	payload, err := json.Marshal(service.getPayload(ctxLogger, event, webhook))
267
	if err != nil {
268
		msg := fmt.Sprintf("cannot marshal payload for user [%s] and webhook [%s] for event [%s]", webhook.UserID, webhook.ID, event.ID())
269
		return nil, stacktrace.Propagate(err, msg)
270
	}
271
272
	request, err := http.NewRequestWithContext(ctx, http.MethodPost, webhook.URL, bytes.NewReader(payload))
273
	if err != nil {
274
		msg := fmt.Sprintf("cannot create request for user [%s] and webhook [%s] for event [%s]", webhook.UserID, webhook.ID, event.ID())
275
		return nil, stacktrace.Propagate(err, msg)
276
	}
277
278
	request.Header.Add("X-Event-Type", event.Type())
279
	request.Header.Set("Content-Type", "application/json")
280
281
	if strings.TrimSpace(webhook.SigningKey) != "" {
282
		token, err := service.getAuthToken(webhook)
283
		if err != nil {
284
			msg := fmt.Sprintf("cannot generate auth token for user [%s] and webhook [%s]", webhook.UserID, webhook.ID)
285
			return nil, stacktrace.Propagate(err, msg)
286
		}
287
		request.Header.Add("Authorization", fmt.Sprintf("Bearer %s", token))
288
	}
289
290
	return request, nil
291
}
292
293
func (service *WebhookService) getPayload(ctxLogger telemetry.Logger, event cloudevents.Event, webhook *entities.Webhook) any {
294
	if event.Type() != events.EventTypeMessagePhoneReceived {
295
		return event
296
	}
297
298
	if !strings.HasPrefix(webhook.URL, "https://discord.com/api/webhooks/") {
299
		return event
300
	}
301
302
	payload := new(events.MessagePhoneReceivedPayload)
303
304
	err := event.DataAs(payload)
305
	if err != nil {
306
		ctxLogger.Error(stacktrace.Propagate(err, fmt.Sprintf("cannot unmarshal event [%s] with ID [%s] into [%T]", event.Type(), event.ID(), payload)))
307
		return event
308
	}
309
310
	return map[string]any{
311
		"avatar_url": "https://httpsms.com/avatar.png",
312
		"username":   "httpsms.com",
313
		"content":    "✉ new message received",
314
		"embeds": []fiber.Map{
315
			{
316
				"fields": []fiber.Map{
317
					{
318
						"name":   "From:",
319
						"value":  service.getFormattedNumber(ctxLogger, payload.Contact),
320
						"inline": true,
321
					},
322
					{
323
						"name":   "To:",
324
						"value":  service.getFormattedNumber(ctxLogger, payload.Owner),
325
						"inline": true,
326
					},
327
					{
328
						"name":  "Content:",
329
						"value": payload.Content,
330
					},
331
					{
332
						"name":  "MessageID:",
333
						"value": payload.MessageID,
334
					},
335
				},
336
			},
337
		},
338
	}
339
}
340
341
func (service *WebhookService) getAuthToken(webhook *entities.Webhook) (string, error) {
342
	token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.StandardClaims{
343
		Audience:  webhook.URL,
344
		ExpiresAt: time.Now().UTC().Add(10 * time.Minute).Unix(),
345
		IssuedAt:  time.Now().UTC().Unix(),
346
		Issuer:    "api.httpsms.com",
347
		NotBefore: time.Now().UTC().Add(-10 * time.Minute).Unix(),
348
		Subject:   string(webhook.UserID),
349
	})
350
	return token.SignedString([]byte(webhook.SigningKey))
351
}
352
353
func (service *WebhookService) handleWebhookSendFailed(ctx context.Context, event cloudevents.Event, webhook *entities.Webhook, owner string, err error, response *http.Response) {
354
	ctx, span, ctxLogger := service.tracer.StartWithLogger(ctx, service.logger)
355
	defer span.End()
356
357
	payload := &events.WebhookSendFailedPayload{
358
		WebhookID:              webhook.ID,
359
		WebhookURL:             webhook.URL,
360
		UserID:                 webhook.UserID,
361
		EventID:                event.ID(),
362
		Owner:                  owner,
363
		EventType:              event.Type(),
364
		EventPayload:           string(event.Data()),
365
		HTTPResponseStatusCode: nil,
366
		ErrorMessage:           err.Error(),
367
	}
368
369
	if errors.Is(err, context.DeadlineExceeded) {
370
		payload.ErrorMessage = "TIMOUT after 10 seconds"
371
	}
372
373
	if response != nil {
374
		payload.HTTPResponseStatusCode = &response.StatusCode
375
		payload.ErrorMessage = http.StatusText(response.StatusCode)
376
377
		body, err := io.ReadAll(response.Body)
378
		if err == nil && len(body) > 0 {
379
			payload.ErrorMessage = string(body)
380
		}
381
	}
382
383
	event, err = service.createEvent(events.EventTypeWebhookSendFailed, event.Source(), payload)
384
	if err != nil {
385
		msg := fmt.Sprintf("cannot create event [%s] for user with id [%s]", events.EventTypeWebhookSendFailed, payload.UserID)
386
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
387
		return
388
	}
389
390
	if err = service.dispatcher.Dispatch(ctx, event); err != nil {
391
		msg := fmt.Sprintf("cannot dispatch event [%s] for user with id [%s]", event.Type(), payload.UserID)
392
		ctxLogger.Error(service.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)))
393
		return
394
	}
395
396
	ctxLogger.Info(fmt.Sprintf("dispatched [%s] event with ID [%s] for user with id [%s]", event.Type(), event.ID(), payload.UserID))
397
}
398