|
1
|
|
|
package servers |
|
2
|
|
|
|
|
3
|
|
|
import ( |
|
4
|
|
|
"google.golang.org/grpc/status" |
|
5
|
|
|
|
|
6
|
|
|
"github.com/Permify/permify/internal" |
|
7
|
|
|
"github.com/Permify/permify/internal/storage" |
|
8
|
|
|
v1 "github.com/Permify/permify/pkg/pb/base/v1" |
|
9
|
|
|
) |
|
10
|
|
|
|
|
11
|
|
|
type WatchServer struct { |
|
12
|
|
|
v1.UnimplementedWatchServer |
|
13
|
|
|
|
|
14
|
|
|
dr storage.DataReader |
|
15
|
|
|
w storage.Watcher |
|
16
|
|
|
} |
|
17
|
|
|
|
|
18
|
|
|
func NewWatchServer( |
|
19
|
|
|
w storage.Watcher, |
|
20
|
|
|
dr storage.DataReader, |
|
21
|
|
|
) *WatchServer { |
|
22
|
|
|
return &WatchServer{ |
|
23
|
|
|
w: w, |
|
24
|
|
|
dr: dr, |
|
25
|
|
|
} |
|
26
|
|
|
} |
|
27
|
|
|
|
|
28
|
|
|
// Watch function sets up a stream for the client to receive changes. |
|
29
|
|
|
func (r *WatchServer) Watch(request *v1.WatchRequest, server v1.Watch_WatchServer) error { |
|
30
|
|
|
// Start a new context and span for tracing. |
|
31
|
|
|
ctx, span := internal.Tracer.Start(server.Context(), "watch.watch") |
|
32
|
|
|
defer span.End() // Ensure the span ends when the function returns. |
|
33
|
|
|
|
|
34
|
|
|
// Validate the incoming request. |
|
35
|
|
|
v := request.Validate() |
|
36
|
|
|
if v != nil { |
|
37
|
|
|
return v // Return validation error, if any. |
|
38
|
|
|
} |
|
39
|
|
|
|
|
40
|
|
|
// Extract the snapshot token from the request. |
|
41
|
|
|
snap := request.GetSnapToken() |
|
42
|
|
|
if snap == "" { |
|
43
|
|
|
// If the snapshot token is not provided, get the head snapshot from the database. |
|
44
|
|
|
st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId()) |
|
45
|
|
|
if err != nil { |
|
46
|
|
|
return err // If there's an error retrieving the snapshot, return it. |
|
47
|
|
|
} |
|
48
|
|
|
// Encode the snapshot to a string. |
|
49
|
|
|
snap = st.Encode().String() |
|
50
|
|
|
} |
|
51
|
|
|
|
|
52
|
|
|
// Call the Watch function on the watcher, which returns two channels. |
|
53
|
|
|
changes, errs := r.w.Watch(ctx, request.GetTenantId(), snap) |
|
54
|
|
|
|
|
55
|
|
|
// Create a separate goroutine to handle sending changes to the server. |
|
56
|
|
|
go func() { |
|
57
|
|
|
for change := range changes { |
|
58
|
|
|
// For each change, send it to the client. |
|
59
|
|
|
if err := server.Send(&v1.WatchResponse{Changes: change}); err != nil { |
|
60
|
|
|
// If an error occurs while sending, exit the goroutine. |
|
61
|
|
|
// The error is not handled here because the context will be cancelled if an error is detected. |
|
62
|
|
|
return |
|
63
|
|
|
} |
|
64
|
|
|
} |
|
65
|
|
|
}() |
|
66
|
|
|
|
|
67
|
|
|
// Main loop for handling errors. |
|
68
|
|
|
for err := range errs { |
|
69
|
|
|
// If an error occurs, convert it to a status error and return it. |
|
70
|
|
|
// This ends the Watch function, which in turn closes the changes channel and ends the above goroutine. |
|
71
|
|
|
return status.Error(GetStatus(err), err.Error()) |
|
72
|
|
|
} |
|
73
|
|
|
|
|
74
|
|
|
// At this point, the errs channel has been closed, indicating that no more errors will be coming in. |
|
75
|
|
|
// Therefore, it's safe to return nil indicating that the operation was successful. |
|
76
|
|
|
return nil |
|
77
|
|
|
} |
|
78
|
|
|
|