Passed
Push — main ( 50082e...98fd2d )
by Acho
01:27
created

handlers.*HeartbeatHandler.Store   B

Complexity

Conditions 6

Size

Total Lines 38
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 28
dl 0
loc 38
rs 8.2746
c 0
b 0
f 0
nop 1
1
package handlers
2
3
import (
4
	"fmt"
5
	"sync"
6
7
	"github.com/NdoleStudio/httpsms/pkg/entities"
8
9
	"github.com/NdoleStudio/httpsms/pkg/requests"
10
	"github.com/NdoleStudio/httpsms/pkg/services"
11
	"github.com/NdoleStudio/httpsms/pkg/telemetry"
12
	"github.com/NdoleStudio/httpsms/pkg/validators"
13
	"github.com/davecgh/go-spew/spew"
14
	"github.com/gofiber/fiber/v2"
15
	"github.com/palantir/stacktrace"
16
)
17
18
// HeartbeatHandler handles heartbeat http requests.
19
type HeartbeatHandler struct {
20
	handler
21
	logger    telemetry.Logger
22
	tracer    telemetry.Tracer
23
	validator *validators.HeartbeatHandlerValidator
24
	service   *services.HeartbeatService
25
}
26
27
// NewHeartbeatHandler creates a new HeartbeatHandler
28
func NewHeartbeatHandler(
29
	logger telemetry.Logger,
30
	tracer telemetry.Tracer,
31
	validator *validators.HeartbeatHandlerValidator,
32
	service *services.HeartbeatService,
33
) (h *HeartbeatHandler) {
34
	return &HeartbeatHandler{
35
		logger:    logger.WithService(fmt.Sprintf("%T", h)),
36
		tracer:    tracer,
37
		validator: validator,
38
		service:   service,
39
	}
40
}
41
42
// RegisterRoutes registers the routes for the MessageHandler
43
func (h *HeartbeatHandler) RegisterRoutes(router fiber.Router) {
44
	router.Get("/heartbeats", h.Index)
45
	router.Post("/heartbeats", h.Store)
46
}
47
48
// Index returns the heartbeats of a phone number
49
// @Summary      Get heartbeats of an owner phone number
50
// @Description  Get the last time a phone number requested for outstanding messages. It will be sorted by timestamp in descending order.
51
// @Security	 ApiKeyAuth
52
// @Tags         Heartbeats
53
// @Accept       json
54
// @Produce      json
55
// @Param        owner		query  string  	true 	"the owner's phone number" 			default(+18005550199)
56
// @Param        skip		query  int  	false	"number of heartbeats to skip"		minimum(0)
57
// @Param        query		query  string  	false 	"filter containing query"
58
// @Param        limit		query  int  	false	"number of heartbeats to return"	minimum(1)	maximum(20)
59
// @Success      200 		{object}	responses.HeartbeatsResponse
60
// @Failure      400		{object}	responses.BadRequest
61
// @Failure 	 401	    {object}	responses.Unauthorized
62
// @Failure      422		{object}	responses.UnprocessableEntity
63
// @Failure      500		{object}	responses.InternalServerError
64
// @Router       /heartbeats [get]
65
func (h *HeartbeatHandler) Index(c *fiber.Ctx) error {
66
	ctx, span := h.tracer.StartFromFiberCtx(c)
67
	defer span.End()
68
69
	ctxLogger := h.tracer.CtxLogger(h.logger, span)
70
71
	var request requests.HeartbeatIndex
72
	if err := c.QueryParser(&request); err != nil {
73
		msg := fmt.Sprintf("cannot marshall params [%s] into %T", c.OriginalURL(), request)
74
		ctxLogger.Warn(stacktrace.Propagate(err, msg))
75
		return h.responseBadRequest(c, err)
76
	}
77
78
	if errors := h.validator.ValidateIndex(ctx, request.Sanitize()); len(errors) != 0 {
79
		msg := fmt.Sprintf("validation errors [%s], while fetching heartbeats [%+#v]", spew.Sdump(errors), request)
80
		ctxLogger.Warn(stacktrace.NewError(msg))
81
		return h.responseUnprocessableEntity(c, errors, "validation errors while fetching heartbeats")
82
	}
83
84
	heartbeats, err := h.service.Index(ctx, h.userIDFomContext(c), request.Owner, request.ToIndexParams())
85
	if err != nil {
86
		msg := fmt.Sprintf("cannot get messgaes with params [%+#v]", request)
87
		ctxLogger.Error(stacktrace.Propagate(err, msg))
88
		return h.responseInternalServerError(c)
89
	}
90
91
	return h.responseOK(c, fmt.Sprintf("fetched %d %s", len(*heartbeats), h.pluralize("heartbeat", len(*heartbeats))), heartbeats)
92
}
93
94
// Store the heartbeat of a phone number
95
// @Summary      Register heartbeat of an owner phone number
96
// @Description  Store the heartbeat to make notify that a phone number is still active
97
// @Security	 ApiKeyAuth
98
// @Tags         Heartbeats
99
// @Accept       json
100
// @Produce      json
101
// @Param        payload   	body 		requests.HeartbeatStore  		true "Payload of the heartbeat request"
102
// @Success      200 		{object}	responses.HeartbeatResponse
103
// @Failure      400		{object}	responses.BadRequest
104
// @Failure 	 401	    {object}	responses.Unauthorized
105
// @Failure      422		{object}	responses.UnprocessableEntity
106
// @Failure      500		{object}	responses.InternalServerError
107
// @Router       /heartbeats [post]
108
func (h *HeartbeatHandler) Store(c *fiber.Ctx) error {
109
	ctx, span := h.tracer.StartFromFiberCtx(c)
110
	defer span.End()
111
112
	ctxLogger := h.tracer.CtxLogger(h.logger, span)
113
114
	var request requests.HeartbeatStore
115
	if err := c.BodyParser(&request); err != nil {
116
		msg := fmt.Sprintf("cannot marshall params [%s] into %T", c.OriginalURL(), request)
117
		ctxLogger.Warn(stacktrace.Propagate(err, msg))
118
		return h.responseBadRequest(c, err)
119
	}
120
121
	if errors := h.validator.ValidateStore(ctx, request.Sanitize()); len(errors) != 0 {
122
		msg := fmt.Sprintf("validation errors [%s], while storing heartbeat [%+#v]", spew.Sdump(errors), request)
123
		ctxLogger.Warn(stacktrace.NewError(msg))
124
		return h.responseUnprocessableEntity(c, errors, "validation errors while storing heartbeat")
125
	}
126
127
	params := request.ToStoreParams(h.userFromContext(c), c.OriginalURL(), c.Get("X-Client-Version"))
128
129
	wg := sync.WaitGroup{}
130
	responses := make([]*entities.Heartbeat, len(params))
131
	for index, value := range params {
132
		wg.Add(1)
133
		go func(input services.HeartbeatStoreParams, index int) {
134
			response, err := h.service.Store(ctx, input)
135
			if err != nil {
136
				msg := fmt.Sprintf("cannot store heartbeat with params [%+#v]", request)
137
				ctxLogger.Error(stacktrace.Propagate(err, msg))
138
			}
139
			responses[index] = response
140
			wg.Done()
141
		}(value, index)
142
	}
143
144
	wg.Wait()
145
	return h.responseCreated(c, fmt.Sprintf("[%d] heartbeats received successfully", len(responses)), responses)
146
}
147