1
|
|
|
package servers |
2
|
|
|
|
3
|
|
|
import ( |
4
|
|
|
"context" |
5
|
|
|
"errors" |
6
|
|
|
"fmt" |
7
|
|
|
"log/slog" |
8
|
|
|
"net" |
9
|
|
|
"net/http" |
10
|
|
|
"net/http/pprof" |
11
|
|
|
"time" |
12
|
|
|
|
13
|
|
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" |
14
|
|
|
|
15
|
|
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit" |
16
|
|
|
|
17
|
|
|
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth" |
18
|
|
|
|
19
|
|
|
grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" |
20
|
|
|
grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator" |
21
|
|
|
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime" |
22
|
|
|
"github.com/rs/cors" |
23
|
|
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" |
24
|
|
|
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" // HTTP telemetry |
25
|
|
|
"google.golang.org/grpc" |
26
|
|
|
"google.golang.org/grpc/credentials" |
27
|
|
|
"google.golang.org/grpc/credentials/insecure" |
28
|
|
|
"google.golang.org/grpc/reflection" |
29
|
|
|
"google.golang.org/protobuf/encoding/protojson" |
30
|
|
|
|
31
|
|
|
health "google.golang.org/grpc/health/grpc_health_v1" // gRPC health check |
32
|
|
|
|
33
|
|
|
oidc "github.com/Permify/permify/internal/authn/openid" |
34
|
|
|
"github.com/Permify/permify/internal/authn/preshared" |
35
|
|
|
"github.com/Permify/permify/internal/config" |
36
|
|
|
"github.com/Permify/permify/internal/invoke" |
37
|
|
|
"github.com/Permify/permify/internal/middleware" |
38
|
|
|
"github.com/Permify/permify/internal/storage" |
39
|
|
|
grpcV1 "github.com/Permify/permify/pkg/pb/base/v1" |
40
|
|
|
) |
41
|
|
|
|
42
|
|
|
// Container is a struct that holds the invoker and various storage |
43
|
|
|
// for permission-related operations. It serves as a central point of access |
44
|
|
|
// for interacting with the underlying data and services. |
45
|
|
|
type Container struct { |
46
|
|
|
// Invoker for performing permission-related operations |
47
|
|
|
Invoker invoke.Invoker |
48
|
|
|
// DataReader for reading data from storage |
49
|
|
|
DR storage.DataReader |
50
|
|
|
// DataWriter for writing data to storage |
51
|
|
|
DW storage.DataWriter |
52
|
|
|
// BundleReader for reading bundle from storage |
53
|
|
|
BR storage.BundleReader |
54
|
|
|
// BundleWriter for writing bundle to storage |
55
|
|
|
BW storage.BundleWriter |
56
|
|
|
// SchemaReader for reading schemas from storage |
57
|
|
|
SR storage.SchemaReader |
58
|
|
|
// SchemaWriter for writing schemas to storage |
59
|
|
|
SW storage.SchemaWriter |
60
|
|
|
// TenantReader for reading tenant information from storage |
61
|
|
|
TR storage.TenantReader |
62
|
|
|
// TenantWriter for writing tenant information to storage |
63
|
|
|
TW storage.TenantWriter |
64
|
|
|
|
65
|
|
|
W storage.Watcher |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
// NewContainer is a constructor for the Container struct. |
69
|
|
|
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter, |
70
|
|
|
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance. |
71
|
|
|
func NewContainer( |
72
|
|
|
invoker invoke.Invoker, |
73
|
|
|
dr storage.DataReader, |
74
|
|
|
dw storage.DataWriter, |
75
|
|
|
br storage.BundleReader, |
76
|
|
|
bw storage.BundleWriter, |
77
|
|
|
sr storage.SchemaReader, |
78
|
|
|
sw storage.SchemaWriter, |
79
|
|
|
tr storage.TenantReader, |
80
|
|
|
tw storage.TenantWriter, |
81
|
|
|
w storage.Watcher, |
82
|
|
|
) *Container { |
83
|
|
|
return &Container{ |
84
|
|
|
Invoker: invoker, |
85
|
|
|
DR: dr, |
86
|
|
|
DW: dw, |
87
|
|
|
BR: br, |
88
|
|
|
BW: bw, |
89
|
|
|
SR: sr, |
90
|
|
|
SW: sw, |
91
|
|
|
TR: tr, |
92
|
|
|
TW: tw, |
93
|
|
|
W: w, |
94
|
|
|
} |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
// Run is a method that starts the Container and its services, including the gRPC server, |
98
|
|
|
// an optional HTTP server, and an optional profiler server. It also sets up authentication, |
99
|
|
|
// TLS configurations, and interceptors as needed. |
100
|
|
|
func (s *Container) Run( |
101
|
|
|
ctx context.Context, |
102
|
|
|
srv *config.Server, |
103
|
|
|
logger *slog.Logger, |
104
|
|
|
dst *config.Distributed, |
105
|
|
|
authentication *config.Authn, |
106
|
|
|
profiler *config.Profiler, |
107
|
|
|
localInvoker invoke.Invoker, |
108
|
|
|
) error { |
109
|
|
|
var err error |
110
|
|
|
|
111
|
|
|
limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec |
112
|
|
|
|
113
|
|
|
lopts := []logging.Option{ |
114
|
|
|
logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
unaryInterceptors := []grpc.UnaryServerInterceptor{ |
118
|
|
|
grpcValidator.UnaryServerInterceptor(), |
119
|
|
|
grpcRecovery.UnaryServerInterceptor(), |
120
|
|
|
ratelimit.UnaryServerInterceptor(limiter), |
121
|
|
|
logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...), |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
streamingInterceptors := []grpc.StreamServerInterceptor{ |
125
|
|
|
grpcValidator.StreamServerInterceptor(), |
126
|
|
|
grpcRecovery.StreamServerInterceptor(), |
127
|
|
|
ratelimit.StreamServerInterceptor(limiter), |
128
|
|
|
logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...), |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
// Configure authentication based on the provided method ("preshared" or "oidc"). |
132
|
|
|
// Add the appropriate interceptors to the unary and streaming interceptors. |
133
|
|
|
if authentication != nil && authentication.Enabled { |
134
|
|
|
switch authentication.Method { |
135
|
|
|
case "preshared": |
136
|
|
|
var authenticator *preshared.KeyAuthn |
137
|
|
|
authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared) |
138
|
|
|
if err != nil { |
139
|
|
|
return err |
140
|
|
|
} |
141
|
|
|
unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
142
|
|
|
streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
143
|
|
|
case "oidc": // OpenID Connect authentication |
144
|
|
|
var authenticator *oidc.Authn // OIDC authenticator |
145
|
|
|
authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc) // Create OIDC authenticator |
146
|
|
|
if err != nil { // Check for errors |
147
|
|
|
return err // Return error |
148
|
|
|
} // OIDC authenticator created |
149
|
|
|
unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
150
|
|
|
streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
151
|
|
|
default: // Unknown authentication method |
152
|
|
|
return fmt.Errorf("unknown authentication method: '%s'", authentication.Method) // Return error |
153
|
|
|
} |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
opts := []grpc.ServerOption{ |
157
|
|
|
grpc.ChainUnaryInterceptor(unaryInterceptors...), |
158
|
|
|
grpc.ChainStreamInterceptor(streamingInterceptors...), |
159
|
|
|
grpc.StatsHandler(otelgrpc.NewServerHandler()), |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
if srv.GRPC.TLSConfig.Enabled { |
163
|
|
|
var c credentials.TransportCredentials |
164
|
|
|
c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath) |
165
|
|
|
if err != nil { |
166
|
|
|
return err |
167
|
|
|
} |
168
|
|
|
opts = append(opts, grpc.Creds(c)) |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
// Create a new gRPC server instance with the provided options. |
172
|
|
|
grpcServer := grpc.NewServer(opts...) |
173
|
|
|
|
174
|
|
|
// Register various gRPC services to the server. |
175
|
|
|
grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker)) |
176
|
|
|
grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR)) |
177
|
|
|
grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR)) |
178
|
|
|
grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW)) |
179
|
|
|
grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW)) |
180
|
|
|
grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR)) |
181
|
|
|
|
182
|
|
|
// Register health check and reflection services for gRPC. |
183
|
|
|
health.RegisterHealthServer(grpcServer, NewHealthServer()) // Register health server |
184
|
|
|
reflection.Register(grpcServer) |
185
|
|
|
|
186
|
|
|
// Create another gRPC server, presumably for invoking permissions. |
187
|
|
|
invokeServer := grpc.NewServer(opts...) |
188
|
|
|
grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker)) |
189
|
|
|
|
190
|
|
|
// Register health check and reflection services for the invokeServer. |
191
|
|
|
health.RegisterHealthServer(invokeServer, NewHealthServer()) // Register health server for invoker |
192
|
|
|
reflection.Register(invokeServer) |
193
|
|
|
|
194
|
|
|
// If profiling is enabled, set up the profiler using the net/http package. |
195
|
|
|
if profiler.Enabled { |
196
|
|
|
// Create a new HTTP ServeMux to register pprof routes. |
197
|
|
|
mux := http.NewServeMux() |
198
|
|
|
mux.HandleFunc("/debug/pprof/", pprof.Index) |
199
|
|
|
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
200
|
|
|
mux.HandleFunc("/debug/pprof/profile", pprof.Profile) |
201
|
|
|
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) |
202
|
|
|
mux.HandleFunc("/debug/pprof/trace", pprof.Trace) |
203
|
|
|
|
204
|
|
|
// Run the profiler server in a separate goroutine. |
205
|
|
|
go func() { |
206
|
|
|
// Log a message indicating the profiler server's start status and port. |
207
|
|
|
slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port)) |
208
|
|
|
|
209
|
|
|
// Define the HTTP server with timeouts and the mux handler for pprof routes. |
210
|
|
|
pprofserver := &http.Server{ |
211
|
|
|
Addr: ":" + profiler.Port, |
212
|
|
|
Handler: mux, |
213
|
|
|
ReadTimeout: 20 * time.Second, |
214
|
|
|
WriteTimeout: 20 * time.Second, |
215
|
|
|
IdleTimeout: 15 * time.Second, |
216
|
|
|
} |
217
|
|
|
|
218
|
|
|
// Start the profiler server. |
219
|
|
|
if err := pprofserver.ListenAndServe(); err != nil { |
220
|
|
|
// Check if the error was due to the server being closed, and log it. |
221
|
|
|
if errors.Is(err, http.ErrServerClosed) { // Server closed error |
222
|
|
|
slog.Error("failed to start profiler", slog.Any("error", err)) // Log profiler error |
223
|
|
|
} |
224
|
|
|
} |
225
|
|
|
}() |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
var lis net.Listener |
229
|
|
|
lis, err = net.Listen("tcp", ":"+srv.GRPC.Port) |
230
|
|
|
if err != nil { |
231
|
|
|
return err |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
var invokeLis net.Listener |
235
|
|
|
invokeLis, err = net.Listen("tcp", ":"+dst.Port) |
236
|
|
|
if err != nil { |
237
|
|
|
return err |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
// Start the gRPC server. |
241
|
|
|
go func() { |
242
|
|
|
if err := grpcServer.Serve(lis); err != nil { // gRPC server error |
243
|
|
|
slog.Error("failed to start grpc server", slog.Any("error", err)) // Log gRPC error |
244
|
|
|
} |
245
|
|
|
}() |
246
|
|
|
|
247
|
|
|
go func() { |
248
|
|
|
if err := invokeServer.Serve(invokeLis); err != nil { // Invoker server error |
249
|
|
|
slog.Error("failed to start invoke grpc server", slog.Any("error", err)) // Log invoker error |
250
|
|
|
} |
251
|
|
|
}() |
252
|
|
|
|
253
|
|
|
slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port)) |
254
|
|
|
slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port)) |
255
|
|
|
|
256
|
|
|
var httpServer *http.Server |
257
|
|
|
|
258
|
|
|
// Start the optional HTTP server with CORS and optional TLS configurations. |
259
|
|
|
// Connect to the gRPC server and register the HTTP handlers for each service. |
260
|
|
|
if srv.HTTP.Enabled { |
261
|
|
|
options := []grpc.DialOption{ |
262
|
|
|
grpc.WithBlock(), |
263
|
|
|
} |
264
|
|
|
if srv.GRPC.TLSConfig.Enabled { |
265
|
|
|
c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride) |
266
|
|
|
if err != nil { |
267
|
|
|
return err |
268
|
|
|
} |
269
|
|
|
options = append(options, grpc.WithTransportCredentials(c)) |
270
|
|
|
} else { |
271
|
|
|
options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) |
275
|
|
|
defer cancel() |
276
|
|
|
|
277
|
|
|
conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...) |
278
|
|
|
if err != nil { |
279
|
|
|
return err |
280
|
|
|
} |
281
|
|
|
defer func() { |
282
|
|
|
if err = conn.Close(); err != nil { // Connection close error |
283
|
|
|
slog.Error("Failed to close gRPC connection", slog.Any("error", err)) // Log close error |
284
|
|
|
} |
285
|
|
|
}() |
286
|
|
|
|
287
|
|
|
healthClient := health.NewHealthClient(conn) // Create health client |
288
|
|
|
muxOpts := []runtime.ServeMuxOption{ |
289
|
|
|
runtime.WithHealthzEndpoint(healthClient), // Add health endpoint |
290
|
|
|
runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{ |
291
|
|
|
Marshaler: &runtime.JSONPb{ |
292
|
|
|
MarshalOptions: protojson.MarshalOptions{ |
293
|
|
|
UseProtoNames: true, |
294
|
|
|
EmitUnpopulated: true, |
295
|
|
|
}, |
296
|
|
|
UnmarshalOptions: protojson.UnmarshalOptions{ |
297
|
|
|
DiscardUnknown: true, |
298
|
|
|
}, |
299
|
|
|
}, |
300
|
|
|
}), |
301
|
|
|
runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc { // OpenTelemetry middleware |
302
|
|
|
type key struct{} // Context key type |
303
|
|
|
|
304
|
|
|
otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // HTTP handler with telemetry |
305
|
|
|
pathParams := r.Context().Value(key{}).(map[string]string) // Get path params |
306
|
|
|
next(w, r, pathParams) // Call next handler |
307
|
|
|
}), "server", // Server name |
308
|
|
|
otelhttp.WithServerName("permify"), // Set server name |
309
|
|
|
otelhttp.WithSpanNameFormatter(httpNameFormatter), // Format span name |
310
|
|
|
) // OpenTelemetry handler created |
311
|
|
|
|
312
|
|
|
return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { // Middleware handler |
313
|
|
|
r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams)) // Add path params to context |
314
|
|
|
otelHandler.ServeHTTP(w, r) // Serve with telemetry |
315
|
|
|
} // Return middleware handler |
316
|
|
|
}), // Middleware registered |
317
|
|
|
} |
318
|
|
|
|
319
|
|
|
mux := runtime.NewServeMux(muxOpts...) |
320
|
|
|
|
321
|
|
|
if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil { |
322
|
|
|
return err |
323
|
|
|
} |
324
|
|
|
if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil { |
325
|
|
|
return err |
326
|
|
|
} |
327
|
|
|
if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil { |
328
|
|
|
return err |
329
|
|
|
} |
330
|
|
|
if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil { |
331
|
|
|
return err |
332
|
|
|
} |
333
|
|
|
if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil { |
334
|
|
|
return err |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
corsHandler := cors.New(cors.Options{ // CORS configuration |
338
|
|
|
AllowCredentials: true, // Allow credentials |
339
|
|
|
AllowedOrigins: srv.HTTP.CORSAllowedOrigins, // Allowed origins |
340
|
|
|
AllowedHeaders: srv.HTTP.CORSAllowedHeaders, // Allowed headers |
341
|
|
|
AllowedMethods: []string{ // Allowed HTTP methods |
342
|
|
|
http.MethodGet, http.MethodPost, // GET and POST |
343
|
|
|
http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut, // Other methods |
344
|
|
|
}, // Methods configured |
345
|
|
|
}).Handler(mux) // CORS handler created |
346
|
|
|
|
347
|
|
|
httpServer = &http.Server{ // HTTP server configuration |
348
|
|
|
Addr: ":" + srv.HTTP.Port, // Server address |
349
|
|
|
Handler: corsHandler, // CORS handler |
350
|
|
|
ReadHeaderTimeout: 5 * time.Second, |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
// Start the HTTP server with TLS if enabled, otherwise without TLS. |
354
|
|
|
go func() { |
355
|
|
|
var err error |
356
|
|
|
if srv.HTTP.TLSConfig.Enabled { |
357
|
|
|
err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath) |
358
|
|
|
} else { |
359
|
|
|
err = httpServer.ListenAndServe() |
360
|
|
|
} |
361
|
|
|
if !errors.Is(err, http.ErrServerClosed) { |
362
|
|
|
slog.Error(err.Error()) |
363
|
|
|
} |
364
|
|
|
}() |
365
|
|
|
|
366
|
|
|
slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port)) |
367
|
|
|
} |
368
|
|
|
|
369
|
|
|
// Wait for the context to be canceled (e.g., due to a signal). |
370
|
|
|
<-ctx.Done() |
371
|
|
|
|
372
|
|
|
// Shutdown the servers gracefully. |
373
|
|
|
ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second) |
374
|
|
|
defer cancel() |
375
|
|
|
|
376
|
|
|
if httpServer != nil { |
377
|
|
|
if err := httpServer.Shutdown(ctxShutdown); err != nil { |
378
|
|
|
slog.Error(err.Error()) |
379
|
|
|
return err |
380
|
|
|
} |
381
|
|
|
} |
382
|
|
|
|
383
|
|
|
// Gracefully stop the gRPC server. |
384
|
|
|
grpcServer.GracefulStop() |
385
|
|
|
// Gracefully stop the invoke server. |
386
|
|
|
invokeServer.GracefulStop() |
387
|
|
|
|
388
|
|
|
slog.Info("gracefully shutting down") |
389
|
|
|
|
390
|
|
|
return nil |
391
|
|
|
} |
392
|
|
|
|
393
|
|
|
// InterceptorLogger adapts slog logger to interceptor logger. |
394
|
|
|
func InterceptorLogger(l *slog.Logger) logging.Logger { |
395
|
|
|
return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) { |
396
|
|
|
l.Log(ctx, slog.Level(lvl), msg, fields...) |
397
|
|
|
}) |
398
|
|
|
} // End of InterceptorLogger |
399
|
|
|
|
400
|
|
|
func httpNameFormatter(_ string, req *http.Request) string { // Format HTTP span name |
401
|
|
|
pp, ok := runtime.HTTPPattern(req.Context()) // Get HTTP pattern |
402
|
|
|
path := "<not found>" // Default path |
403
|
|
|
if ok { // Pattern found |
404
|
|
|
path = pp.String() // Get path string |
405
|
|
|
} // Path determined |
406
|
|
|
return req.Method + " " + path // Return formatted name |
407
|
|
|
} // End of httpNameFormatter |
408
|
|
|
|