handlers.*HeartbeatHandler.Store   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 45
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 32
dl 0
loc 45
rs 7.2453
c 0
b 0
f 0
nop 1
1
package handlers
2
3
import (
4
	"fmt"
5
	"sync"
6
7
	"github.com/NdoleStudio/httpsms/pkg/entities"
8
9
	"github.com/NdoleStudio/httpsms/pkg/requests"
10
	"github.com/NdoleStudio/httpsms/pkg/services"
11
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
12
	"github.com/NdoleStudio/httpsms/pkg/validators"
13
	"github.com/davecgh/go-spew/spew"
14
	"github.com/gofiber/fiber/v2"
15
	"github.com/palantir/stacktrace"
16
)
17
18
// HeartbeatHandler handles heartbeat http requests.
19
type HeartbeatHandler struct {
20
	handler
21
	logger    telemetry.Logger
22
	tracer    telemetry.Tracer
23
	validator *validators.HeartbeatHandlerValidator
24
	service   *services.HeartbeatService
25
}
26
27
// NewHeartbeatHandler creates a new HeartbeatHandler
28
func NewHeartbeatHandler(
29
	logger telemetry.Logger,
30
	tracer telemetry.Tracer,
31
	validator *validators.HeartbeatHandlerValidator,
32
	service *services.HeartbeatService,
33
) (h *HeartbeatHandler) {
34
	return &HeartbeatHandler{
35
		logger:    logger.WithService(fmt.Sprintf("%T", h)),
36
		tracer:    tracer,
37
		validator: validator,
38
		service:   service,
39
	}
40
}
41
42
// RegisterRoutes registers the routes for the HeartbeatHandler
43
func (h *HeartbeatHandler) RegisterRoutes(router fiber.Router, middlewares ...fiber.Handler) {
44
	router.Get("/v1/heartbeats", h.computeRoute(middlewares, h.Index)...)
45
}
46
47
// RegisterPhoneAPIKeyRoutes registers the routes for the HeartbeatHandler
48
func (h *HeartbeatHandler) RegisterPhoneAPIKeyRoutes(router fiber.Router, middlewares ...fiber.Handler) {
49
	router.Post("/v1/heartbeats", h.computeRoute(middlewares, h.Store)...)
50
}
51
52
// Index returns the heartbeats of a phone number
53
// @Summary      Get heartbeats of an owner phone number
54
// @Description  Get the last time a phone number requested for outstanding messages. It will be sorted by timestamp in descending order.
55
// @Security	 ApiKeyAuth
56
// @Tags         Heartbeats
57
// @Accept       json
58
// @Produce      json
59
// @Param        owner		query  string  	true 	"the owner's phone number" 			default(+18005550199)
60
// @Param        skip		query  int  	false	"number of heartbeats to skip"		minimum(0)
61
// @Param        query		query  string  	false 	"filter containing query"
62
// @Param        limit		query  int  	false	"number of heartbeats to return"	minimum(1)	maximum(20)
63
// @Success      200 		{object}	responses.HeartbeatsResponse
64
// @Failure      400		{object}	responses.BadRequest
65
// @Failure 	 401	    {object}	responses.Unauthorized
66
// @Failure      422		{object}	responses.UnprocessableEntity
67
// @Failure      500		{object}	responses.InternalServerError
68
// @Router       /heartbeats [get]
69
func (h *HeartbeatHandler) Index(c *fiber.Ctx) error {
70
	ctx, span := h.tracer.StartFromFiberCtx(c)
71
	defer span.End()
72
73
	ctxLogger := h.tracer.CtxLogger(h.logger, span)
74
75
	var request requests.HeartbeatIndex
76
	if err := c.QueryParser(&request); err != nil {
77
		msg := fmt.Sprintf("cannot marshall params [%s] into %T", c.OriginalURL(), request)
78
		ctxLogger.Warn(stacktrace.Propagate(err, msg))
79
		return h.responseBadRequest(c, err)
80
	}
81
82
	if errors := h.validator.ValidateIndex(ctx, request.Sanitize()); len(errors) != 0 {
83
		msg := fmt.Sprintf("validation errors [%s], while fetching heartbeats [%+#v]", spew.Sdump(errors), request)
84
		ctxLogger.Warn(stacktrace.NewError(msg))
85
		return h.responseUnprocessableEntity(c, errors, "validation errors while fetching heartbeats")
86
	}
87
88
	heartbeats, err := h.service.Index(ctx, h.userIDFomContext(c), request.Owner, request.ToIndexParams())
89
	if err != nil {
90
		msg := fmt.Sprintf("cannot get messgaes with params [%+#v]", request)
91
		ctxLogger.Error(stacktrace.Propagate(err, msg))
92
		return h.responseInternalServerError(c)
93
	}
94
95
	return h.responseOK(c, fmt.Sprintf("fetched %d %s", len(*heartbeats), h.pluralize("heartbeat", len(*heartbeats))), heartbeats)
96
}
97
98
// Store the heartbeat of a phone number
99
// @Summary      Register heartbeat of an owner phone number
100
// @Description  Store the heartbeat to make notify that a phone number is still active
101
// @Security	 ApiKeyAuth
102
// @Tags         Heartbeats
103
// @Accept       json
104
// @Produce      json
105
// @Param        payload   	body 		requests.HeartbeatStore  		true "Payload of the heartbeat request"
106
// @Success      200 		{object}	responses.HeartbeatResponse
107
// @Failure      400		{object}	responses.BadRequest
108
// @Failure 	 401	    {object}	responses.Unauthorized
109
// @Failure      422		{object}	responses.UnprocessableEntity
110
// @Failure      500		{object}	responses.InternalServerError
111
// @Router       /heartbeats [post]
112
func (h *HeartbeatHandler) Store(c *fiber.Ctx) error {
113
	ctx, span := h.tracer.StartFromFiberCtx(c)
114
	defer span.End()
115
116
	ctxLogger := h.tracer.CtxLogger(h.logger, span)
117
118
	var request requests.HeartbeatStore
119
	if err := c.BodyParser(&request); err != nil {
120
		msg := fmt.Sprintf("cannot marshall params [%s] into %T", c.OriginalURL(), request)
121
		ctxLogger.Warn(stacktrace.Propagate(err, msg))
122
		return h.responseBadRequest(c, err)
123
	}
124
125
	if errors := h.validator.ValidateStore(ctx, request.Sanitize()); len(errors) != 0 {
126
		msg := fmt.Sprintf("validation errors [%s], while storing heartbeat [%+#v]", spew.Sdump(errors), request)
127
		ctxLogger.Warn(stacktrace.NewError(msg))
128
		return h.responseUnprocessableEntity(c, errors, "validation errors while storing heartbeat")
129
	}
130
131
	for _, phoneNumber := range request.PhoneNumbers {
132
		if !h.authorizePhoneAPIKey(c, phoneNumber) {
133
			ctxLogger.Warn(stacktrace.NewError(fmt.Sprintf("phone API Key ID [%s] is not authorized to store heartbeat for phone number [%s]", h.userFromContext(c).PhoneAPIKeyID, phoneNumber)))
134
			return h.responsePhoneAPIKeyUnauthorized(c, phoneNumber, h.userFromContext(c))
135
		}
136
	}
137
138
	params := request.ToStoreParams(h.userFromContext(c), c.OriginalURL(), c.Get("X-Client-Version"))
139
140
	wg := sync.WaitGroup{}
141
	responses := make([]*entities.Heartbeat, len(params))
142
	for index, value := range params {
143
		wg.Add(1)
144
		go func(input services.HeartbeatStoreParams, index int) {
145
			response, err := h.service.Store(ctx, input)
146
			if err != nil {
147
				msg := fmt.Sprintf("cannot store heartbeat with params [%+#v]", request)
148
				ctxLogger.Error(stacktrace.Propagate(err, msg))
149
			}
150
			responses[index] = response
151
			wg.Done()
152
		}(value, index)
153
	}
154
155
	wg.Wait()
156
	return h.responseCreated(c, fmt.Sprintf("[%d] heartbeats received successfully", len(responses)), responses)
157
}
158