1
|
|
|
package servers |
2
|
|
|
|
3
|
|
|
import ( |
4
|
|
|
"context" |
5
|
|
|
"errors" |
6
|
|
|
"fmt" |
7
|
|
|
"log/slog" |
8
|
|
|
"net" |
9
|
|
|
"net/http" |
10
|
|
|
"net/http/pprof" |
11
|
|
|
"time" |
12
|
|
|
|
13
|
|
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" |
14
|
|
|
|
15
|
|
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit" |
16
|
|
|
|
17
|
|
|
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth" |
18
|
|
|
|
19
|
|
|
grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" |
20
|
|
|
grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator" |
21
|
|
|
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime" |
22
|
|
|
"github.com/rs/cors" |
23
|
|
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" |
24
|
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" |
25
|
|
|
"google.golang.org/grpc" |
26
|
|
|
"google.golang.org/grpc/credentials" |
27
|
|
|
"google.golang.org/grpc/credentials/insecure" |
28
|
|
|
"google.golang.org/grpc/reflection" |
29
|
|
|
"google.golang.org/protobuf/encoding/protojson" |
30
|
|
|
|
31
|
|
|
"github.com/Permify/permify/internal/authn/oidc" |
32
|
|
|
"github.com/Permify/permify/internal/authn/preshared" |
33
|
|
|
"github.com/Permify/permify/internal/config" |
34
|
|
|
"github.com/Permify/permify/internal/invoke" |
35
|
|
|
"github.com/Permify/permify/internal/middleware" |
36
|
|
|
"github.com/Permify/permify/internal/storage" |
37
|
|
|
grpcV1 "github.com/Permify/permify/pkg/pb/base/v1" |
38
|
|
|
health "google.golang.org/grpc/health/grpc_health_v1" |
39
|
|
|
) |
40
|
|
|
|
41
|
|
|
// Container is a struct that holds the invoker and various storage |
42
|
|
|
// for permission-related operations. It serves as a central point of access |
43
|
|
|
// for interacting with the underlying data and services. |
44
|
|
|
type Container struct { |
45
|
|
|
// Invoker for performing permission-related operations |
46
|
|
|
Invoker invoke.Invoker |
47
|
|
|
// DataReader for reading data from storage |
48
|
|
|
DR storage.DataReader |
49
|
|
|
// DataWriter for writing data to storage |
50
|
|
|
DW storage.DataWriter |
51
|
|
|
// BundleReader for reading bundle from storage |
52
|
|
|
BR storage.BundleReader |
53
|
|
|
// BundleWriter for writing bundle to storage |
54
|
|
|
BW storage.BundleWriter |
55
|
|
|
// SchemaReader for reading schemas from storage |
56
|
|
|
SR storage.SchemaReader |
57
|
|
|
// SchemaWriter for writing schemas to storage |
58
|
|
|
SW storage.SchemaWriter |
59
|
|
|
// TenantReader for reading tenant information from storage |
60
|
|
|
TR storage.TenantReader |
61
|
|
|
// TenantWriter for writing tenant information to storage |
62
|
|
|
TW storage.TenantWriter |
63
|
|
|
|
64
|
|
|
W storage.Watcher |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
// NewContainer is a constructor for the Container struct. |
68
|
|
|
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter, |
69
|
|
|
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance. |
70
|
|
|
func NewContainer( |
71
|
|
|
invoker invoke.Invoker, |
72
|
|
|
dr storage.DataReader, |
73
|
|
|
dw storage.DataWriter, |
74
|
|
|
br storage.BundleReader, |
75
|
|
|
bw storage.BundleWriter, |
76
|
|
|
sr storage.SchemaReader, |
77
|
|
|
sw storage.SchemaWriter, |
78
|
|
|
tr storage.TenantReader, |
79
|
|
|
tw storage.TenantWriter, |
80
|
|
|
w storage.Watcher, |
81
|
|
|
) *Container { |
82
|
|
|
return &Container{ |
83
|
|
|
Invoker: invoker, |
84
|
|
|
DR: dr, |
85
|
|
|
DW: dw, |
86
|
|
|
BR: br, |
87
|
|
|
BW: bw, |
88
|
|
|
SR: sr, |
89
|
|
|
SW: sw, |
90
|
|
|
TR: tr, |
91
|
|
|
TW: tw, |
92
|
|
|
W: w, |
93
|
|
|
} |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
// Run is a method that starts the Container and its services, including the gRPC server, |
97
|
|
|
// an optional HTTP server, and an optional profiler server. It also sets up authentication, |
98
|
|
|
// TLS configurations, and interceptors as needed. |
99
|
|
|
func (s *Container) Run( |
100
|
|
|
ctx context.Context, |
101
|
|
|
srv *config.Server, |
102
|
|
|
logger *slog.Logger, |
103
|
|
|
dst *config.Distributed, |
104
|
|
|
authentication *config.Authn, |
105
|
|
|
profiler *config.Profiler, |
106
|
|
|
localInvoker invoke.Invoker, |
107
|
|
|
) error { |
108
|
|
|
var err error |
109
|
|
|
|
110
|
|
|
limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec |
111
|
|
|
|
112
|
|
|
lopts := []logging.Option{ |
113
|
|
|
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
unaryInterceptors := []grpc.UnaryServerInterceptor{ |
117
|
|
|
grpcValidator.UnaryServerInterceptor(), |
118
|
|
|
grpcRecovery.UnaryServerInterceptor(), |
119
|
|
|
ratelimit.UnaryServerInterceptor(limiter), |
120
|
|
|
logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...), |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
streamingInterceptors := []grpc.StreamServerInterceptor{ |
124
|
|
|
grpcValidator.StreamServerInterceptor(), |
125
|
|
|
grpcRecovery.StreamServerInterceptor(), |
126
|
|
|
ratelimit.StreamServerInterceptor(limiter), |
127
|
|
|
logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...), |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
// Configure authentication based on the provided method ("preshared" or "oidc"). |
131
|
|
|
// Add the appropriate interceptors to the unary and streaming interceptors. |
132
|
|
|
if authentication != nil && authentication.Enabled { |
133
|
|
|
switch authentication.Method { |
134
|
|
|
case "preshared": |
135
|
|
|
var authenticator *preshared.KeyAuthn |
136
|
|
|
authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared) |
137
|
|
|
if err != nil { |
138
|
|
|
return err |
139
|
|
|
} |
140
|
|
|
unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
141
|
|
|
streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
142
|
|
|
case "oidc": |
143
|
|
|
var authenticator *oidc.Authn |
144
|
|
|
authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc) |
145
|
|
|
if err != nil { |
146
|
|
|
return err |
147
|
|
|
} |
148
|
|
|
unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
149
|
|
|
streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
150
|
|
|
default: |
151
|
|
|
return fmt.Errorf("unknown authentication method: '%s'", authentication.Method) |
152
|
|
|
} |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
opts := []grpc.ServerOption{ |
156
|
|
|
grpc.ChainUnaryInterceptor(unaryInterceptors...), |
157
|
|
|
grpc.ChainStreamInterceptor(streamingInterceptors...), |
158
|
|
|
grpc.StatsHandler(otelgrpc.NewServerHandler()), |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
if srv.GRPC.TLSConfig.Enabled { |
162
|
|
|
var c credentials.TransportCredentials |
163
|
|
|
c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath) |
164
|
|
|
if err != nil { |
165
|
|
|
return err |
166
|
|
|
} |
167
|
|
|
opts = append(opts, grpc.Creds(c)) |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
// Create a new gRPC server instance with the provided options. |
171
|
|
|
grpcServer := grpc.NewServer(opts...) |
172
|
|
|
|
173
|
|
|
// Register various gRPC services to the server. |
174
|
|
|
grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker)) |
175
|
|
|
grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR)) |
176
|
|
|
grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR)) |
177
|
|
|
grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW)) |
178
|
|
|
grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW)) |
179
|
|
|
grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR)) |
180
|
|
|
|
181
|
|
|
// Register health check and reflection services for gRPC. |
182
|
|
|
health.RegisterHealthServer(grpcServer, NewHealthServer()) |
183
|
|
|
reflection.Register(grpcServer) |
184
|
|
|
|
185
|
|
|
// Create another gRPC server, presumably for invoking permissions. |
186
|
|
|
invokeServer := grpc.NewServer(opts...) |
187
|
|
|
grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker)) |
188
|
|
|
|
189
|
|
|
// Register health check and reflection services for the invokeServer. |
190
|
|
|
health.RegisterHealthServer(invokeServer, NewHealthServer()) |
191
|
|
|
reflection.Register(invokeServer) |
192
|
|
|
|
193
|
|
|
// If profiling is enabled, set up the profiler using the net/http package. |
194
|
|
|
if profiler.Enabled { |
195
|
|
|
// Create a new HTTP ServeMux to register pprof routes. |
196
|
|
|
mux := http.NewServeMux() |
197
|
|
|
mux.HandleFunc("/debug/pprof/", pprof.Index) |
198
|
|
|
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
199
|
|
|
mux.HandleFunc("/debug/pprof/profile", pprof.Profile) |
200
|
|
|
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) |
201
|
|
|
mux.HandleFunc("/debug/pprof/trace", pprof.Trace) |
202
|
|
|
|
203
|
|
|
// Run the profiler server in a separate goroutine. |
204
|
|
|
go func() { |
205
|
|
|
// Log a message indicating the profiler server's start status and port. |
206
|
|
|
slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port)) |
207
|
|
|
|
208
|
|
|
// Define the HTTP server with timeouts and the mux handler for pprof routes. |
209
|
|
|
pprofserver := &http.Server{ |
210
|
|
|
Addr: ":" + profiler.Port, |
211
|
|
|
Handler: mux, |
212
|
|
|
ReadTimeout: 20 * time.Second, |
213
|
|
|
WriteTimeout: 20 * time.Second, |
214
|
|
|
IdleTimeout: 15 * time.Second, |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
// Start the profiler server. |
218
|
|
|
if err := pprofserver.ListenAndServe(); err != nil { |
219
|
|
|
// Check if the error was due to the server being closed, and log it. |
220
|
|
|
if errors.Is(err, http.ErrServerClosed) { |
221
|
|
|
slog.Error("failed to start profiler", slog.Any("error", err)) |
222
|
|
|
} |
223
|
|
|
} |
224
|
|
|
}() |
225
|
|
|
} |
226
|
|
|
|
227
|
|
|
var lis net.Listener |
228
|
|
|
lis, err = net.Listen("tcp", ":"+srv.GRPC.Port) |
229
|
|
|
if err != nil { |
230
|
|
|
return err |
231
|
|
|
} |
232
|
|
|
|
233
|
|
|
var invokeLis net.Listener |
234
|
|
|
invokeLis, err = net.Listen("tcp", ":"+dst.Port) |
235
|
|
|
if err != nil { |
236
|
|
|
return err |
237
|
|
|
} |
238
|
|
|
|
239
|
|
|
// Start the gRPC server. |
240
|
|
|
go func() { |
241
|
|
|
if err := grpcServer.Serve(lis); err != nil { |
242
|
|
|
slog.Error("failed to start grpc server", slog.Any("error", err)) |
243
|
|
|
} |
244
|
|
|
}() |
245
|
|
|
|
246
|
|
|
go func() { |
247
|
|
|
if err := invokeServer.Serve(invokeLis); err != nil { |
248
|
|
|
slog.Error("failed to start invoke grpc server", slog.Any("error", err)) |
249
|
|
|
} |
250
|
|
|
}() |
251
|
|
|
|
252
|
|
|
slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port)) |
253
|
|
|
slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port)) |
254
|
|
|
|
255
|
|
|
var httpServer *http.Server |
256
|
|
|
|
257
|
|
|
// Start the optional HTTP server with CORS and optional TLS configurations. |
258
|
|
|
// Connect to the gRPC server and register the HTTP handlers for each service. |
259
|
|
|
if srv.HTTP.Enabled { |
260
|
|
|
options := []grpc.DialOption{ |
261
|
|
|
grpc.WithBlock(), |
262
|
|
|
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), |
263
|
|
|
} |
264
|
|
|
if srv.GRPC.TLSConfig.Enabled { |
265
|
|
|
c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride) |
266
|
|
|
if err != nil { |
267
|
|
|
return err |
268
|
|
|
} |
269
|
|
|
options = append(options, grpc.WithTransportCredentials(c)) |
270
|
|
|
} else { |
271
|
|
|
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) |
275
|
|
|
defer cancel() |
276
|
|
|
|
277
|
|
|
conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...) |
278
|
|
|
if err != nil { |
279
|
|
|
return err |
280
|
|
|
} |
281
|
|
|
defer func() { |
282
|
|
|
if err = conn.Close(); err != nil { |
283
|
|
|
slog.Error("Failed to close gRPC connection", slog.Any("error", err)) |
284
|
|
|
} |
285
|
|
|
}() |
286
|
|
|
|
287
|
|
|
healthClient := health.NewHealthClient(conn) |
288
|
|
|
muxOpts := []runtime.ServeMuxOption{ |
289
|
|
|
runtime.WithHealthzEndpoint(healthClient), |
290
|
|
|
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{ |
291
|
|
|
Marshaler: &runtime.JSONPb{ |
292
|
|
|
MarshalOptions: protojson.MarshalOptions{ |
293
|
|
|
UseProtoNames: true, |
294
|
|
|
EmitUnpopulated: true, |
295
|
|
|
}, |
296
|
|
|
UnmarshalOptions: protojson.UnmarshalOptions{ |
297
|
|
|
DiscardUnknown: true, |
298
|
|
|
}, |
299
|
|
|
}, |
300
|
|
|
}), |
301
|
|
|
runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc { |
302
|
|
|
type key struct{} |
303
|
|
|
|
304
|
|
|
otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
305
|
|
|
pathParams := r.Context().Value(key{}).(map[string]string) |
306
|
|
|
next(w, r, pathParams) |
307
|
|
|
}), "server", |
308
|
|
|
otelhttp.WithServerName("permify"), |
309
|
|
|
otelhttp.WithSpanNameFormatter(httpNameFormatter), |
310
|
|
|
) |
311
|
|
|
|
312
|
|
|
return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { |
313
|
|
|
r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams)) |
314
|
|
|
otelHandler.ServeHTTP(w, r) |
315
|
|
|
} |
316
|
|
|
}), |
317
|
|
|
} |
318
|
|
|
|
319
|
|
|
mux := runtime.NewServeMux(muxOpts...) |
320
|
|
|
|
321
|
|
|
if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil { |
322
|
|
|
return err |
323
|
|
|
} |
324
|
|
|
if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil { |
325
|
|
|
return err |
326
|
|
|
} |
327
|
|
|
if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil { |
328
|
|
|
return err |
329
|
|
|
} |
330
|
|
|
if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil { |
331
|
|
|
return err |
332
|
|
|
} |
333
|
|
|
if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil { |
334
|
|
|
return err |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
corsHandler := cors.New(cors.Options{ |
338
|
|
|
AllowCredentials: true, |
339
|
|
|
AllowedOrigins: srv.HTTP.CORSAllowedOrigins, |
340
|
|
|
AllowedHeaders: srv.HTTP.CORSAllowedHeaders, |
341
|
|
|
AllowedMethods: []string{ |
342
|
|
|
http.MethodGet, http.MethodPost, |
343
|
|
|
http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut, |
344
|
|
|
}, |
345
|
|
|
}).Handler(mux) |
346
|
|
|
|
347
|
|
|
httpServer = &http.Server{ |
348
|
|
|
Addr: ":" + srv.HTTP.Port, |
349
|
|
|
Handler: corsHandler, |
350
|
|
|
ReadHeaderTimeout: 5 * time.Second, |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
// Start the HTTP server with TLS if enabled, otherwise without TLS. |
354
|
|
|
go func() { |
355
|
|
|
var err error |
356
|
|
|
if srv.HTTP.TLSConfig.Enabled { |
357
|
|
|
err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath) |
358
|
|
|
} else { |
359
|
|
|
err = httpServer.ListenAndServe() |
360
|
|
|
} |
361
|
|
|
if !errors.Is(err, http.ErrServerClosed) { |
362
|
|
|
slog.Error(err.Error()) |
363
|
|
|
} |
364
|
|
|
}() |
365
|
|
|
|
366
|
|
|
slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port)) |
367
|
|
|
} |
368
|
|
|
|
369
|
|
|
// Wait for the context to be canceled (e.g., due to a signal). |
370
|
|
|
<-ctx.Done() |
371
|
|
|
|
372
|
|
|
// Shutdown the servers gracefully. |
373
|
|
|
ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second) |
374
|
|
|
defer cancel() |
375
|
|
|
|
376
|
|
|
if httpServer != nil { |
377
|
|
|
if err := httpServer.Shutdown(ctxShutdown); err != nil { |
378
|
|
|
slog.Error(err.Error()) |
379
|
|
|
return err |
380
|
|
|
} |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
// Gracefully stop the gRPC server. |
384
|
|
|
grpcServer.GracefulStop() |
385
|
|
|
// Gracefully stop the invoke server. |
386
|
|
|
invokeServer.GracefulStop() |
387
|
|
|
|
388
|
|
|
slog.Info("gracefully shutting down") |
389
|
|
|
|
390
|
|
|
return nil |
391
|
|
|
} |
392
|
|
|
|
393
|
|
|
// InterceptorLogger adapts slog logger to interceptor logger. |
394
|
|
|
func InterceptorLogger(l *slog.Logger) logging.Logger { |
395
|
|
|
return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { |
396
|
|
|
l.Log(ctx, slog.Level(lvl), msg, fields...) |
397
|
|
|
}) |
398
|
|
|
} |
399
|
|
|
|
400
|
|
|
func httpNameFormatter(_ string, req *http.Request) string { |
401
|
|
|
pp, ok := runtime.HTTPPattern(req.Context()) |
402
|
|
|
path := "<not found>" |
403
|
|
|
if ok { |
404
|
|
|
path = pp.String() |
405
|
|
|
} |
406
|
|
|
return req.Method + " " + path |
407
|
|
|
} |
408
|
|
|
|