Passed
Pull Request — master (#2053)
by
unknown
03:15
created

internal/servers/server.go   B

Size/Duplication

Total Lines 405
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 44
eloc 257
dl 0
loc 405
rs 8.8798
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A servers.NewContainer 0 23 1
F servers.*Container.Run 0 292 39
A servers.httpNameFormatter 0 7 2
A servers.InterceptorLogger 0 3 2
1
package servers
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"net"
9
	"net/http"
10
	"net/http/pprof"
11
	"time"
12
13
	grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
14
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
15
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
16
17
	grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
18
	grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
19
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
20
	"github.com/rs/cors"
21
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
22
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
23
	"google.golang.org/grpc"
24
	"google.golang.org/grpc/credentials"
25
	"google.golang.org/grpc/credentials/insecure"
26
	"google.golang.org/grpc/reflection"
27
	"google.golang.org/protobuf/encoding/protojson"
28
29
	health "google.golang.org/grpc/health/grpc_health_v1"
30
31
	"github.com/Permify/permify/internal/authn/oidc"
32
	"github.com/Permify/permify/internal/authn/preshared"
33
	"github.com/Permify/permify/internal/config"
34
	"github.com/Permify/permify/internal/invoke"
35
	"github.com/Permify/permify/internal/middleware"
36
	"github.com/Permify/permify/internal/storage"
37
	grpcV1 "github.com/Permify/permify/pkg/pb/base/v1"
38
)
39
40
// Container is a struct that holds the invoker and various storage
41
// for permission-related operations. It serves as a central point of access
42
// for interacting with the underlying data and services.
43
type Container struct {
44
	// Invoker for performing permission-related operations
45
	Invoker invoke.Invoker
46
	// DataReader for reading data from storage
47
	DR storage.DataReader
48
	// DataWriter for writing data to storage
49
	DW storage.DataWriter
50
	// BundleReader for reading bundle from storage
51
	BR storage.BundleReader
52
	// BundleWriter for writing bundle to storage
53
	BW storage.BundleWriter
54
	// SchemaReader for reading schemas from storage
55
	SR storage.SchemaReader
56
	// SchemaWriter for writing schemas to storage
57
	SW storage.SchemaWriter
58
	// TenantReader for reading tenant information from storage
59
	TR storage.TenantReader
60
	// TenantWriter for writing tenant information to storage
61
	TW storage.TenantWriter
62
63
	W storage.Watcher
64
}
65
66
// NewContainer is a constructor for the Container struct.
67
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter,
68
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance.
69
func NewContainer(
70
	invoker invoke.Invoker,
71
	dr storage.DataReader,
72
	dw storage.DataWriter,
73
	br storage.BundleReader,
74
	bw storage.BundleWriter,
75
	sr storage.SchemaReader,
76
	sw storage.SchemaWriter,
77
	tr storage.TenantReader,
78
	tw storage.TenantWriter,
79
	w storage.Watcher,
80
) *Container {
81
	return &Container{
82
		Invoker: invoker,
83
		DR:      dr,
84
		DW:      dw,
85
		BR:      br,
86
		BW:      bw,
87
		SR:      sr,
88
		SW:      sw,
89
		TR:      tr,
90
		TW:      tw,
91
		W:       w,
92
	}
93
}
94
95
// Run is a method that starts the Container and its services, including the gRPC server,
96
// an optional HTTP server, and an optional profiler server. It also sets up authentication,
97
// TLS configurations, and interceptors as needed.
98
func (s *Container) Run(
99
	ctx context.Context,
100
	srv *config.Server,
101
	logger *slog.Logger,
102
	dst *config.Distributed,
103
	authentication *config.Authn,
104
	profiler *config.Profiler,
105
	localInvoker invoke.Invoker,
106
) error {
107
	var err error
108
109
	limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec
110
111
	lopts := []logging.Option{
112
		logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
113
	}
114
115
	unaryInterceptors := []grpc.UnaryServerInterceptor{
116
		grpcValidator.UnaryServerInterceptor(),
117
		grpcRecovery.UnaryServerInterceptor(),
118
		ratelimit.UnaryServerInterceptor(limiter),
119
		logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...),
120
	}
121
122
	streamingInterceptors := []grpc.StreamServerInterceptor{
123
		grpcValidator.StreamServerInterceptor(),
124
		grpcRecovery.StreamServerInterceptor(),
125
		ratelimit.StreamServerInterceptor(limiter),
126
		logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...),
127
	}
128
129
	// Configure authentication based on the provided method ("preshared" or "oidc").
130
	// Add the appropriate interceptors to the unary and streaming interceptors.
131
	if authentication != nil && authentication.Enabled {
132
		switch authentication.Method {
133
		case "preshared":
134
			var authenticator *preshared.KeyAuthn
135
			authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared)
136
			if err != nil {
137
				return err
138
			}
139
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
140
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
141
		case "oidc":
142
			var authenticator *oidc.Authn
143
			authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc)
144
			if err != nil {
145
				return err
146
			}
147
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
148
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
149
		default:
150
			return fmt.Errorf("unknown authentication method: '%s'", authentication.Method)
151
		}
152
	}
153
154
	opts := []grpc.ServerOption{
155
		grpc.ChainUnaryInterceptor(unaryInterceptors...),
156
		grpc.ChainStreamInterceptor(streamingInterceptors...),
157
		grpc.StatsHandler(otelgrpc.NewServerHandler()),
158
	}
159
160
	if srv.GRPC.TLSConfig.Enabled {
161
		var c credentials.TransportCredentials
162
		c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath)
163
		if err != nil {
164
			return err
165
		}
166
		opts = append(opts, grpc.Creds(c))
167
	}
168
169
	// Create a new gRPC server instance with the provided options.
170
	grpcServer := grpc.NewServer(opts...)
171
172
	// Register various gRPC services to the server.
173
	grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker))
174
	grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR))
175
	grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR))
176
	grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW))
177
	grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW))
178
	grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR))
179
180
	// Register health check and reflection services for gRPC.
181
	health.RegisterHealthServer(grpcServer, NewHealthServer())
182
	reflection.Register(grpcServer)
183
184
	// Create another gRPC server, presumably for invoking permissions.
185
	invokeServer := grpc.NewServer(opts...)
186
	grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker))
187
188
	// Register health check and reflection services for the invokeServer.
189
	health.RegisterHealthServer(invokeServer, NewHealthServer())
190
	reflection.Register(invokeServer)
191
192
	// If profiling is enabled, set up the profiler using the net/http package.
193
	if profiler.Enabled {
194
		// Create a new HTTP ServeMux to register pprof routes.
195
		mux := http.NewServeMux()
196
		mux.HandleFunc("/debug/pprof/", pprof.Index)
197
		mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
198
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
199
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
200
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
201
202
		// Run the profiler server in a separate goroutine.
203
		go func() {
204
			// Log a message indicating the profiler server's start status and port.
205
			slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port))
206
207
			// Define the HTTP server with timeouts and the mux handler for pprof routes.
208
			pprofserver := &http.Server{
209
				Addr:         ":" + profiler.Port,
210
				Handler:      mux,
211
				ReadTimeout:  20 * time.Second,
212
				WriteTimeout: 20 * time.Second,
213
				IdleTimeout:  15 * time.Second,
214
			}
215
216
			// Start the profiler server.
217
			if err := pprofserver.ListenAndServe(); err != nil {
218
				// Check if the error was due to the server being closed, and log it.
219
				if errors.Is(err, http.ErrServerClosed) {
220
					slog.Error("failed to start profiler", slog.Any("error", err))
221
				}
222
			}
223
		}()
224
	}
225
226
	var lis net.Listener
227
	lis, err = net.Listen("tcp", ":"+srv.GRPC.Port)
228
	if err != nil {
229
		return err
230
	}
231
232
	var invokeLis net.Listener
233
	invokeLis, err = net.Listen("tcp", ":"+dst.Port)
234
	if err != nil {
235
		return err
236
	}
237
238
	// Start the gRPC server.
239
	go func() {
240
		if err := grpcServer.Serve(lis); err != nil {
241
			slog.Error("failed to start grpc server", slog.Any("error", err))
242
		}
243
	}()
244
245
	go func() {
246
		if err := invokeServer.Serve(invokeLis); err != nil {
247
			slog.Error("failed to start invoke grpc server", slog.Any("error", err))
248
		}
249
	}()
250
251
	slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port))
252
	slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port))
253
254
	var httpServer *http.Server
255
256
	// Start the optional HTTP server with CORS and optional TLS configurations.
257
	// Connect to the gRPC server and register the HTTP handlers for each service.
258
	if srv.HTTP.Enabled {
259
		options := []grpc.DialOption{
260
			grpc.WithBlock(),
261
			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
262
		}
263
		if srv.GRPC.TLSConfig.Enabled {
264
			c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride)
265
			if err != nil {
266
				return err
267
			}
268
			options = append(options, grpc.WithTransportCredentials(c))
269
		} else {
270
			options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
271
		}
272
273
		timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
274
		defer cancel()
275
276
		conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...)
277
		if err != nil {
278
			return err
279
		}
280
		defer func() {
281
			if err = conn.Close(); err != nil {
282
				slog.Error("Failed to close gRPC connection", slog.Any("error", err))
283
			}
284
		}()
285
286
		healthClient := health.NewHealthClient(conn)
287
		muxOpts := []runtime.ServeMuxOption{
288
			runtime.WithHealthzEndpoint(healthClient),
289
			runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{
290
				Marshaler: &runtime.JSONPb{
291
					MarshalOptions: protojson.MarshalOptions{
292
						UseProtoNames:   true,
293
						EmitUnpopulated: true,
294
					},
295
					UnmarshalOptions: protojson.UnmarshalOptions{
296
						DiscardUnknown: true,
297
					},
298
				},
299
			}),
300
			runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc {
301
				type key struct{}
302
303
				otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
304
					pathParams := r.Context().Value(key{}).(map[string]string)
305
					next(w, r, pathParams)
306
				}), "server",
307
					otelhttp.WithServerName("permify"),
308
					otelhttp.WithSpanNameFormatter(httpNameFormatter),
309
				)
310
311
				return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
312
					r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams))
313
					otelHandler.ServeHTTP(w, r)
314
				}
315
			}),
316
		}
317
318
		mux := runtime.NewServeMux(muxOpts...)
319
320
		if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil {
321
			return err
322
		}
323
		if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil {
324
			return err
325
		}
326
		if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil {
327
			return err
328
		}
329
		if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil {
330
			return err
331
		}
332
		if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil {
333
			return err
334
		}
335
336
		corsHandler := cors.New(cors.Options{
337
			AllowCredentials: true,
338
			AllowedOrigins:   srv.HTTP.CORSAllowedOrigins,
339
			AllowedHeaders:   srv.HTTP.CORSAllowedHeaders,
340
			AllowedMethods: []string{
341
				http.MethodGet, http.MethodPost,
342
				http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut,
343
			},
344
		}).Handler(mux)
345
346
		httpServer = &http.Server{
347
			Addr:              ":" + srv.HTTP.Port,
348
			Handler:           corsHandler,
349
			ReadHeaderTimeout: 5 * time.Second,
350
		}
351
352
		// Start the HTTP server with TLS if enabled, otherwise without TLS.
353
		go func() {
354
			var err error
355
			if srv.HTTP.TLSConfig.Enabled {
356
				err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath)
357
			} else {
358
				err = httpServer.ListenAndServe()
359
			}
360
			if !errors.Is(err, http.ErrServerClosed) {
361
				slog.Error(err.Error())
362
			}
363
		}()
364
365
		slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port))
366
	}
367
368
	// Wait for the context to be canceled (e.g., due to a signal).
369
	<-ctx.Done()
370
371
	// Shutdown the servers gracefully.
372
	ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second)
373
	defer cancel()
374
375
	if httpServer != nil {
376
		if err := httpServer.Shutdown(ctxShutdown); err != nil {
377
			slog.Error(err.Error())
378
			return err
379
		}
380
	}
381
382
	// Gracefully stop the gRPC server.
383
	grpcServer.GracefulStop()
384
	// Gracefully stop the invoke server.
385
	invokeServer.GracefulStop()
386
387
	slog.Info("gracefully shutting down")
388
389
	return nil
390
}
391
392
// InterceptorLogger adapts slog logger to interceptor logger.
393
func InterceptorLogger(l *slog.Logger) logging.Logger {
394
	return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
395
		l.Log(ctx, slog.Level(lvl), msg, fields...)
396
	})
397
}
398
399
func httpNameFormatter(_ string, req *http.Request) string {
400
	pp, ok := runtime.HTTPPattern(req.Context())
401
	path := "<not found>"
402
	if ok {
403
		path = pp.String()
404
	}
405
	return req.Method + " " + path
406
}
407