Passed
Push — master ( d4fe54...30e045 )
by Tolga
01:36 queued 15s
created

servers.NewContainer   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 23
nop 10
dl 0
loc 23
rs 9.328
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
package servers
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"net"
9
	"net/http"
10
	"net/http/pprof"
11
	"time"
12
13
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
14
15
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
16
17
	grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
18
19
	grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
20
	grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
21
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
22
	"github.com/rs/cors"
23
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
24
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
25
	"google.golang.org/grpc"
26
	"google.golang.org/grpc/credentials"
27
	"google.golang.org/grpc/credentials/insecure"
28
	"google.golang.org/grpc/reflection"
29
	"google.golang.org/protobuf/encoding/protojson"
30
31
	"github.com/Permify/permify/internal/authn/oidc"
32
	"github.com/Permify/permify/internal/authn/preshared"
33
	"github.com/Permify/permify/internal/config"
34
	"github.com/Permify/permify/internal/invoke"
35
	"github.com/Permify/permify/internal/middleware"
36
	"github.com/Permify/permify/internal/storage"
37
	grpcV1 "github.com/Permify/permify/pkg/pb/base/v1"
38
	health "google.golang.org/grpc/health/grpc_health_v1"
39
)
40
41
// Container is a struct that holds the invoker and various storage
42
// for permission-related operations. It serves as a central point of access
43
// for interacting with the underlying data and services.
44
type Container struct {
45
	// Invoker for performing permission-related operations
46
	Invoker invoke.Invoker
47
	// DataReader for reading data from storage
48
	DR storage.DataReader
49
	// DataWriter for writing data to storage
50
	DW storage.DataWriter
51
	// BundleReader for reading bundle from storage
52
	BR storage.BundleReader
53
	// BundleWriter for writing bundle to storage
54
	BW storage.BundleWriter
55
	// SchemaReader for reading schemas from storage
56
	SR storage.SchemaReader
57
	// SchemaWriter for writing schemas to storage
58
	SW storage.SchemaWriter
59
	// TenantReader for reading tenant information from storage
60
	TR storage.TenantReader
61
	// TenantWriter for writing tenant information to storage
62
	TW storage.TenantWriter
63
64
	W storage.Watcher
65
}
66
67
// NewContainer is a constructor for the Container struct.
68
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter,
69
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance.
70
func NewContainer(
71
	invoker invoke.Invoker,
72
	dr storage.DataReader,
73
	dw storage.DataWriter,
74
	br storage.BundleReader,
75
	bw storage.BundleWriter,
76
	sr storage.SchemaReader,
77
	sw storage.SchemaWriter,
78
	tr storage.TenantReader,
79
	tw storage.TenantWriter,
80
	w storage.Watcher,
81
) *Container {
82
	return &Container{
83
		Invoker: invoker,
84
		DR:      dr,
85
		DW:      dw,
86
		BR:      br,
87
		BW:      bw,
88
		SR:      sr,
89
		SW:      sw,
90
		TR:      tr,
91
		TW:      tw,
92
		W:       w,
93
	}
94
}
95
96
// Run is a method that starts the Container and its services, including the gRPC server,
97
// an optional HTTP server, and an optional profiler server. It also sets up authentication,
98
// TLS configurations, and interceptors as needed.
99
func (s *Container) Run(
100
	ctx context.Context,
101
	srv *config.Server,
102
	logger *slog.Logger,
103
	dst *config.Distributed,
104
	authentication *config.Authn,
105
	profiler *config.Profiler,
106
	localInvoker invoke.Invoker,
107
) error {
108
	var err error
109
110
	limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec
111
112
	lopts := []logging.Option{
113
		logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
114
	}
115
116
	unaryInterceptors := []grpc.UnaryServerInterceptor{
117
		grpcValidator.UnaryServerInterceptor(),
118
		grpcRecovery.UnaryServerInterceptor(),
119
		ratelimit.UnaryServerInterceptor(limiter),
120
		logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...),
121
	}
122
123
	streamingInterceptors := []grpc.StreamServerInterceptor{
124
		grpcValidator.StreamServerInterceptor(),
125
		grpcRecovery.StreamServerInterceptor(),
126
		ratelimit.StreamServerInterceptor(limiter),
127
		logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...),
128
	}
129
130
	// Configure authentication based on the provided method ("preshared" or "oidc").
131
	// Add the appropriate interceptors to the unary and streaming interceptors.
132
	if authentication != nil && authentication.Enabled {
133
		switch authentication.Method {
134
		case "preshared":
135
			var authenticator *preshared.KeyAuthn
136
			authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared)
137
			if err != nil {
138
				return err
139
			}
140
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
141
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
142
		case "oidc":
143
			var authenticator *oidc.Authn
144
			authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc)
145
			if err != nil {
146
				return err
147
			}
148
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
149
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
150
		default:
151
			return fmt.Errorf("unknown authentication method: '%s'", authentication.Method)
152
		}
153
	}
154
155
	opts := []grpc.ServerOption{
156
		grpc.ChainUnaryInterceptor(unaryInterceptors...),
157
		grpc.ChainStreamInterceptor(streamingInterceptors...),
158
		grpc.StatsHandler(otelgrpc.NewServerHandler()),
159
	}
160
161
	if srv.GRPC.TLSConfig.Enabled {
162
		var c credentials.TransportCredentials
163
		c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath)
164
		if err != nil {
165
			return err
166
		}
167
		opts = append(opts, grpc.Creds(c))
168
	}
169
170
	// Create a new gRPC server instance with the provided options.
171
	grpcServer := grpc.NewServer(opts...)
172
173
	// Register various gRPC services to the server.
174
	grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker))
175
	grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR))
176
	grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR))
177
	grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW))
178
	grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW))
179
	grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR))
180
181
	// Register health check and reflection services for gRPC.
182
	health.RegisterHealthServer(grpcServer, NewHealthServer())
183
	reflection.Register(grpcServer)
184
185
	// Create another gRPC server, presumably for invoking permissions.
186
	invokeServer := grpc.NewServer(opts...)
187
	grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker))
188
189
	// Register health check and reflection services for the invokeServer.
190
	health.RegisterHealthServer(invokeServer, NewHealthServer())
191
	reflection.Register(invokeServer)
192
193
	// If profiling is enabled, set up the profiler using the net/http package.
194
	if profiler.Enabled {
195
		// Create a new HTTP ServeMux to register pprof routes.
196
		mux := http.NewServeMux()
197
		mux.HandleFunc("/debug/pprof/", pprof.Index)
198
		mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
199
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
200
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
201
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
202
203
		// Run the profiler server in a separate goroutine.
204
		go func() {
205
			// Log a message indicating the profiler server's start status and port.
206
			slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port))
207
208
			// Define the HTTP server with timeouts and the mux handler for pprof routes.
209
			pprofserver := &http.Server{
210
				Addr:         ":" + profiler.Port,
211
				Handler:      mux,
212
				ReadTimeout:  20 * time.Second,
213
				WriteTimeout: 20 * time.Second,
214
				IdleTimeout:  15 * time.Second,
215
			}
216
217
			// Start the profiler server.
218
			if err := pprofserver.ListenAndServe(); err != nil {
219
				// Check if the error was due to the server being closed, and log it.
220
				if errors.Is(err, http.ErrServerClosed) {
221
					slog.Error("failed to start profiler", slog.Any("error", err))
222
				}
223
			}
224
		}()
225
	}
226
227
	var lis net.Listener
228
	lis, err = net.Listen("tcp", ":"+srv.GRPC.Port)
229
	if err != nil {
230
		return err
231
	}
232
233
	var invokeLis net.Listener
234
	invokeLis, err = net.Listen("tcp", ":"+dst.Port)
235
	if err != nil {
236
		return err
237
	}
238
239
	// Start the gRPC server.
240
	go func() {
241
		if err := grpcServer.Serve(lis); err != nil {
242
			slog.Error("failed to start grpc server", slog.Any("error", err))
243
		}
244
	}()
245
246
	go func() {
247
		if err := invokeServer.Serve(invokeLis); err != nil {
248
			slog.Error("failed to start invoke grpc server", slog.Any("error", err))
249
		}
250
	}()
251
252
	slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port))
253
	slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port))
254
255
	var httpServer *http.Server
256
257
	// Start the optional HTTP server with CORS and optional TLS configurations.
258
	// Connect to the gRPC server and register the HTTP handlers for each service.
259
	if srv.HTTP.Enabled {
260
		options := []grpc.DialOption{
261
			grpc.WithBlock(),
262
			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
263
		}
264
		if srv.GRPC.TLSConfig.Enabled {
265
			c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride)
266
			if err != nil {
267
				return err
268
			}
269
			options = append(options, grpc.WithTransportCredentials(c))
270
		} else {
271
			options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
272
		}
273
274
		timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
275
		defer cancel()
276
277
		conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...)
278
		if err != nil {
279
			return err
280
		}
281
		defer func() {
282
			if err = conn.Close(); err != nil {
283
				slog.Error("Failed to close gRPC connection", slog.Any("error", err))
284
			}
285
		}()
286
287
		healthClient := health.NewHealthClient(conn)
288
		muxOpts := []runtime.ServeMuxOption{
289
			runtime.WithHealthzEndpoint(healthClient),
290
			runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{
291
				Marshaler: &runtime.JSONPb{
292
					MarshalOptions: protojson.MarshalOptions{
293
						UseProtoNames:   true,
294
						EmitUnpopulated: true,
295
					},
296
					UnmarshalOptions: protojson.UnmarshalOptions{
297
						DiscardUnknown: true,
298
					},
299
				},
300
			}),
301
			runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc {
302
				type key struct{}
303
304
				otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
305
					pathParams := r.Context().Value(key{}).(map[string]string)
306
					next(w, r, pathParams)
307
				}), "server",
308
					otelhttp.WithServerName("permify"),
309
					otelhttp.WithSpanNameFormatter(httpNameFormatter),
310
				)
311
312
				return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
313
					r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams))
314
					otelHandler.ServeHTTP(w, r)
315
				}
316
			}),
317
		}
318
319
		mux := runtime.NewServeMux(muxOpts...)
320
321
		if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil {
322
			return err
323
		}
324
		if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil {
325
			return err
326
		}
327
		if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil {
328
			return err
329
		}
330
		if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil {
331
			return err
332
		}
333
		if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil {
334
			return err
335
		}
336
337
		corsHandler := cors.New(cors.Options{
338
			AllowCredentials: true,
339
			AllowedOrigins:   srv.HTTP.CORSAllowedOrigins,
340
			AllowedHeaders:   srv.HTTP.CORSAllowedHeaders,
341
			AllowedMethods: []string{
342
				http.MethodGet, http.MethodPost,
343
				http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut,
344
			},
345
		}).Handler(mux)
346
347
		httpServer = &http.Server{
348
			Addr:              ":" + srv.HTTP.Port,
349
			Handler:           corsHandler,
350
			ReadHeaderTimeout: 5 * time.Second,
351
		}
352
353
		// Start the HTTP server with TLS if enabled, otherwise without TLS.
354
		go func() {
355
			var err error
356
			if srv.HTTP.TLSConfig.Enabled {
357
				err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath)
358
			} else {
359
				err = httpServer.ListenAndServe()
360
			}
361
			if !errors.Is(err, http.ErrServerClosed) {
362
				slog.Error(err.Error())
363
			}
364
		}()
365
366
		slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port))
367
	}
368
369
	// Wait for the context to be canceled (e.g., due to a signal).
370
	<-ctx.Done()
371
372
	// Shutdown the servers gracefully.
373
	ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second)
374
	defer cancel()
375
376
	if httpServer != nil {
377
		if err := httpServer.Shutdown(ctxShutdown); err != nil {
378
			slog.Error(err.Error())
379
			return err
380
		}
381
	}
382
383
	// Gracefully stop the gRPC server.
384
	grpcServer.GracefulStop()
385
	// Gracefully stop the invoke server.
386
	invokeServer.GracefulStop()
387
388
	slog.Info("gracefully shutting down")
389
390
	return nil
391
}
392
393
// InterceptorLogger adapts slog logger to interceptor logger.
394
func InterceptorLogger(l *slog.Logger) logging.Logger {
395
	return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
396
		l.Log(ctx, slog.Level(lvl), msg, fields...)
397
	})
398
}
399
400
func httpNameFormatter(_ string, req *http.Request) string {
401
	pp, ok := runtime.HTTPPattern(req.Context())
402
	path := "<not found>"
403
	if ok {
404
		path = pp.String()
405
	}
406
	return req.Method + " " + path
407
}
408