Passed
Pull Request — master (#2516)
by Tolga
04:08
created

servers.*WatchServer.Watch   B

Complexity

Conditions 8

Size

Total Lines 48
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 20
nop 2
dl 0
loc 48
rs 7.3333
c 0
b 0
f 0
1
package servers
2
3
import (
4
	"google.golang.org/grpc/status"
5
6
	"github.com/Permify/permify/internal"
7
	"github.com/Permify/permify/internal/storage"
8
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
9
)
10
11
type WatchServer struct {
12
	v1.UnimplementedWatchServer
13
14
	dr storage.DataReader
15
	w  storage.Watcher
16
}
17
18
func NewWatchServer(
19
	w storage.Watcher,
20
	dr storage.DataReader,
21
) *WatchServer {
22
	return &WatchServer{
23
		w:  w,
24
		dr: dr,
25
	}
26
}
27
28
// Watch function sets up a stream for the client to receive changes.
29
func (r *WatchServer) Watch(request *v1.WatchRequest, server v1.Watch_WatchServer) error {
30
	// Start a new context and span for tracing.
31
	ctx, span := internal.Tracer.Start(server.Context(), "watch.watch")
32
	defer span.End() // Ensure the span ends when the function returns.
33
34
	// Validate the incoming request.
35
	v := request.Validate()
36
	if v != nil {
37
		return v // Return validation error, if any.
38
	}
39
40
	// Extract the snapshot token from the request.
41
	snap := request.GetSnapToken()
42
	if snap == "" {
43
		// If the snapshot token is not provided, get the head snapshot from the database.
44
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
45
		if err != nil {
46
			return err // If there's an error retrieving the snapshot, return it.
47
		}
48
		// Encode the snapshot to a string.
49
		snap = st.Encode().String()
50
	}
51
52
	// Call the Watch function on the watcher, which returns two channels.
53
	changes, errs := r.w.Watch(ctx, request.GetTenantId(), snap)
54
55
	// Create a separate goroutine to handle sending changes to the server.
56
	go func() {
57
		for change := range changes {
58
			// For each change, send it to the client.
59
			if err := server.Send(&v1.WatchResponse{Changes: change}); err != nil {
60
				// If an error occurs while sending, exit the goroutine.
61
				// The error is not handled here because the context will be cancelled if an error is detected.
62
				return
63
			}
64
		}
65
	}()
66
67
	// Main loop for handling errors.
68
	for err := range errs {
69
		// If an error occurs, convert it to a status error and return it.
70
		// This ends the Watch function, which in turn closes the changes channel and ends the above goroutine.
71
		return status.Error(GetStatus(err), err.Error())
72
	}
73
74
	// At this point, the errs channel has been closed, indicating that no more errors will be coming in.
75
	// Therefore, it's safe to return nil indicating that the operation was successful.
76
	return nil
77
}
78