servers.*Container.Run   F
last analyzed

Complexity

Conditions 39

Size

Total Lines 291
Code Lines 181

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 39
eloc 181
nop 7
dl 0
loc 291
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like servers.*Container.Run often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package servers
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"net"
9
	"net/http"
10
	"net/http/pprof"
11
	"time"
12
13
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
14
15
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
16
17
	grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
18
19
	grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
20
	grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
21
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
22
	"github.com/rs/cors"
23
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
24
	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" // HTTP telemetry
25
	"google.golang.org/grpc"
26
	"google.golang.org/grpc/credentials"
27
	"google.golang.org/grpc/credentials/insecure"
28
	"google.golang.org/grpc/reflection"
29
	"google.golang.org/protobuf/encoding/protojson"
30
31
	health "google.golang.org/grpc/health/grpc_health_v1" // gRPC health check
32
33
	oidc "github.com/Permify/permify/internal/authn/openid"
34
	"github.com/Permify/permify/internal/authn/preshared"
35
	"github.com/Permify/permify/internal/config"
36
	"github.com/Permify/permify/internal/invoke"
37
	"github.com/Permify/permify/internal/middleware"
38
	"github.com/Permify/permify/internal/storage"
39
	grpcV1 "github.com/Permify/permify/pkg/pb/base/v1"
40
)
41
42
// Container is a struct that holds the invoker and various storage
43
// for permission-related operations. It serves as a central point of access
44
// for interacting with the underlying data and services.
45
type Container struct {
46
	// Invoker for performing permission-related operations
47
	Invoker invoke.Invoker
48
	// DataReader for reading data from storage
49
	DR storage.DataReader
50
	// DataWriter for writing data to storage
51
	DW storage.DataWriter
52
	// BundleReader for reading bundle from storage
53
	BR storage.BundleReader
54
	// BundleWriter for writing bundle to storage
55
	BW storage.BundleWriter
56
	// SchemaReader for reading schemas from storage
57
	SR storage.SchemaReader
58
	// SchemaWriter for writing schemas to storage
59
	SW storage.SchemaWriter
60
	// TenantReader for reading tenant information from storage
61
	TR storage.TenantReader
62
	// TenantWriter for writing tenant information to storage
63
	TW storage.TenantWriter
64
65
	W storage.Watcher
66
}
67
68
// NewContainer is a constructor for the Container struct.
69
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter,
70
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance.
71
func NewContainer(
72
	invoker invoke.Invoker,
73
	dr storage.DataReader,
74
	dw storage.DataWriter,
75
	br storage.BundleReader,
76
	bw storage.BundleWriter,
77
	sr storage.SchemaReader,
78
	sw storage.SchemaWriter,
79
	tr storage.TenantReader,
80
	tw storage.TenantWriter,
81
	w storage.Watcher,
82
) *Container {
83
	return &Container{
84
		Invoker: invoker,
85
		DR:      dr,
86
		DW:      dw,
87
		BR:      br,
88
		BW:      bw,
89
		SR:      sr,
90
		SW:      sw,
91
		TR:      tr,
92
		TW:      tw,
93
		W:       w,
94
	}
95
}
96
97
// Run is a method that starts the Container and its services, including the gRPC server,
98
// an optional HTTP server, and an optional profiler server. It also sets up authentication,
99
// TLS configurations, and interceptors as needed.
100
func (s *Container) Run(
101
	ctx context.Context,
102
	srv *config.Server,
103
	logger *slog.Logger,
104
	dst *config.Distributed,
105
	authentication *config.Authn,
106
	profiler *config.Profiler,
107
	localInvoker invoke.Invoker,
108
) error {
109
	var err error
110
111
	limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec
112
113
	lopts := []logging.Option{
114
		logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
115
	}
116
117
	unaryInterceptors := []grpc.UnaryServerInterceptor{
118
		grpcValidator.UnaryServerInterceptor(),
119
		grpcRecovery.UnaryServerInterceptor(),
120
		ratelimit.UnaryServerInterceptor(limiter),
121
		logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...),
122
	}
123
124
	streamingInterceptors := []grpc.StreamServerInterceptor{
125
		grpcValidator.StreamServerInterceptor(),
126
		grpcRecovery.StreamServerInterceptor(),
127
		ratelimit.StreamServerInterceptor(limiter),
128
		logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...),
129
	}
130
131
	// Configure authentication based on the provided method ("preshared" or "oidc").
132
	// Add the appropriate interceptors to the unary and streaming interceptors.
133
	if authentication != nil && authentication.Enabled {
134
		switch authentication.Method {
135
		case "preshared":
136
			var authenticator *preshared.KeyAuthn
137
			authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared)
138
			if err != nil {
139
				return err
140
			}
141
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
142
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
143
		case "oidc": // OpenID Connect authentication
144
			var authenticator *oidc.Authn                                    // OIDC authenticator
145
			authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc) // Create OIDC authenticator
146
			if err != nil {                                                  // Check for errors
147
				return err // Return error
148
			} // OIDC authenticator created
149
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator)))
150
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator)))
151
		default: // Unknown authentication method
152
			return fmt.Errorf("unknown authentication method: '%s'", authentication.Method) // Return error
153
		}
154
	}
155
156
	opts := []grpc.ServerOption{
157
		grpc.ChainUnaryInterceptor(unaryInterceptors...),
158
		grpc.ChainStreamInterceptor(streamingInterceptors...),
159
		grpc.StatsHandler(otelgrpc.NewServerHandler()),
160
	}
161
162
	if srv.GRPC.TLSConfig.Enabled {
163
		var c credentials.TransportCredentials
164
		c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath)
165
		if err != nil {
166
			return err
167
		}
168
		opts = append(opts, grpc.Creds(c))
169
	}
170
171
	// Create a new gRPC server instance with the provided options.
172
	grpcServer := grpc.NewServer(opts...)
173
174
	// Register various gRPC services to the server.
175
	grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker))
176
	grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR))
177
	grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR))
178
	grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW))
179
	grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW))
180
	grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR))
181
182
	// Register health check and reflection services for gRPC.
183
	health.RegisterHealthServer(grpcServer, NewHealthServer()) // Register health server
184
	reflection.Register(grpcServer)
185
186
	// Create another gRPC server, presumably for invoking permissions.
187
	invokeServer := grpc.NewServer(opts...)
188
	grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker))
189
190
	// Register health check and reflection services for the invokeServer.
191
	health.RegisterHealthServer(invokeServer, NewHealthServer()) // Register health server for invoker
192
	reflection.Register(invokeServer)
193
194
	// If profiling is enabled, set up the profiler using the net/http package.
195
	if profiler.Enabled {
196
		// Create a new HTTP ServeMux to register pprof routes.
197
		mux := http.NewServeMux()
198
		mux.HandleFunc("/debug/pprof/", pprof.Index)
199
		mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
200
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
201
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
202
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
203
204
		// Run the profiler server in a separate goroutine.
205
		go func() {
206
			// Log a message indicating the profiler server's start status and port.
207
			slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port))
208
209
			// Define the HTTP server with timeouts and the mux handler for pprof routes.
210
			pprofserver := &http.Server{
211
				Addr:         ":" + profiler.Port,
212
				Handler:      mux,
213
				ReadTimeout:  20 * time.Second,
214
				WriteTimeout: 20 * time.Second,
215
				IdleTimeout:  15 * time.Second,
216
			}
217
218
			// Start the profiler server.
219
			if err := pprofserver.ListenAndServe(); err != nil {
220
				// Check if the error was due to the server being closed, and log it.
221
				if errors.Is(err, http.ErrServerClosed) { // Server closed error
222
					slog.Error("failed to start profiler", slog.Any("error", err)) // Log profiler error
223
				}
224
			}
225
		}()
226
	}
227
228
	var lis net.Listener
229
	lis, err = net.Listen("tcp", ":"+srv.GRPC.Port)
230
	if err != nil {
231
		return err
232
	}
233
234
	var invokeLis net.Listener
235
	invokeLis, err = net.Listen("tcp", ":"+dst.Port)
236
	if err != nil {
237
		return err
238
	}
239
240
	// Start the gRPC server.
241
	go func() {
242
		if err := grpcServer.Serve(lis); err != nil { // gRPC server error
243
			slog.Error("failed to start grpc server", slog.Any("error", err)) // Log gRPC error
244
		}
245
	}()
246
247
	go func() {
248
		if err := invokeServer.Serve(invokeLis); err != nil { // Invoker server error
249
			slog.Error("failed to start invoke grpc server", slog.Any("error", err)) // Log invoker error
250
		}
251
	}()
252
253
	slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port))
254
	slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port))
255
256
	var httpServer *http.Server
257
258
	// Start the optional HTTP server with CORS and optional TLS configurations.
259
	// Connect to the gRPC server and register the HTTP handlers for each service.
260
	if srv.HTTP.Enabled {
261
		options := []grpc.DialOption{
262
			grpc.WithBlock(),
263
		}
264
		if srv.GRPC.TLSConfig.Enabled {
265
			c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride)
266
			if err != nil {
267
				return err
268
			}
269
			options = append(options, grpc.WithTransportCredentials(c))
270
		} else {
271
			options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
272
		}
273
274
		timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
275
		defer cancel()
276
277
		conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...)
278
		if err != nil {
279
			return err
280
		}
281
		defer func() {
282
			if err = conn.Close(); err != nil { // Connection close error
283
				slog.Error("Failed to close gRPC connection", slog.Any("error", err)) // Log close error
284
			}
285
		}()
286
287
		healthClient := health.NewHealthClient(conn) // Create health client
288
		muxOpts := []runtime.ServeMuxOption{
289
			runtime.WithHealthzEndpoint(healthClient), // Add health endpoint
290
			runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{
291
				Marshaler: &runtime.JSONPb{
292
					MarshalOptions: protojson.MarshalOptions{
293
						UseProtoNames:   true,
294
						EmitUnpopulated: true,
295
					},
296
					UnmarshalOptions: protojson.UnmarshalOptions{
297
						DiscardUnknown: true,
298
					},
299
				},
300
			}),
301
			runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc { // OpenTelemetry middleware
302
				type key struct{} // Context key type
303
304
				otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // HTTP handler with telemetry
305
					pathParams := r.Context().Value(key{}).(map[string]string) // Get path params
306
					next(w, r, pathParams)                                     // Call next handler
307
				}), "server", // Server name
308
					otelhttp.WithServerName("permify"),                // Set server name
309
					otelhttp.WithSpanNameFormatter(httpNameFormatter), // Format span name
310
				) // OpenTelemetry handler created
311
312
				return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { // Middleware handler
313
					r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams)) // Add path params to context
314
					otelHandler.ServeHTTP(w, r)                                          // Serve with telemetry
315
				} // Return middleware handler
316
			}), // Middleware registered
317
		}
318
319
		mux := runtime.NewServeMux(muxOpts...)
320
321
		if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil {
322
			return err
323
		}
324
		if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil {
325
			return err
326
		}
327
		if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil {
328
			return err
329
		}
330
		if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil {
331
			return err
332
		}
333
		if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil {
334
			return err
335
		}
336
337
		corsHandler := cors.New(cors.Options{ // CORS configuration
338
			AllowCredentials: true,                        // Allow credentials
339
			AllowedOrigins:   srv.HTTP.CORSAllowedOrigins, // Allowed origins
340
			AllowedHeaders:   srv.HTTP.CORSAllowedHeaders, // Allowed headers
341
			AllowedMethods: []string{ // Allowed HTTP methods
342
				http.MethodGet, http.MethodPost, // GET and POST
343
				http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut, // Other methods
344
			}, // Methods configured
345
		}).Handler(mux) // CORS handler created
346
347
		httpServer = &http.Server{ // HTTP server configuration
348
			Addr:              ":" + srv.HTTP.Port, // Server address
349
			Handler:           corsHandler,         // CORS handler
350
			ReadHeaderTimeout: 5 * time.Second,
351
		}
352
353
		// Start the HTTP server with TLS if enabled, otherwise without TLS.
354
		go func() {
355
			var err error
356
			if srv.HTTP.TLSConfig.Enabled {
357
				err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath)
358
			} else {
359
				err = httpServer.ListenAndServe()
360
			}
361
			if !errors.Is(err, http.ErrServerClosed) {
362
				slog.Error(err.Error())
363
			}
364
		}()
365
366
		slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port))
367
	}
368
369
	// Wait for the context to be canceled (e.g., due to a signal).
370
	<-ctx.Done()
371
372
	// Shutdown the servers gracefully.
373
	ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second)
374
	defer cancel()
375
376
	if httpServer != nil {
377
		if err := httpServer.Shutdown(ctxShutdown); err != nil {
378
			slog.Error(err.Error())
379
			return err
380
		}
381
	}
382
383
	// Gracefully stop the gRPC server.
384
	grpcServer.GracefulStop()
385
	// Gracefully stop the invoke server.
386
	invokeServer.GracefulStop()
387
388
	slog.Info("gracefully shutting down")
389
390
	return nil
391
}
392
393
// InterceptorLogger adapts slog logger to interceptor logger.
394
func InterceptorLogger(l *slog.Logger) logging.Logger {
395
	return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
396
		l.Log(ctx, slog.Level(lvl), msg, fields...)
397
	})
398
} // End of InterceptorLogger
399
400
func httpNameFormatter(_ string, req *http.Request) string { // Format HTTP span name
401
	pp, ok := runtime.HTTPPattern(req.Context()) // Get HTTP pattern
402
	path := "<not found>"                        // Default path
403
	if ok {                                      // Pattern found
404
		path = pp.String() // Get path string
405
	} // Path determined
406
	return req.Method + " " + path // Return formatted name
407
} // End of httpNameFormatter
408