Passed
Pull Request — master (#1007)
by Tolga
03:13
created

servers.*Container.Run   F

Complexity

Conditions 36

Size

Total Lines 273
Code Lines 169

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 36
eloc 169
nop 7
dl 0
loc 273
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like servers.*Container.Run often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package servers
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"net"
9
	"net/http"
10
	"net/http/pprof"
11
	"time"
12
13
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
14
15
	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
16
17
	grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
18
19
	grpcRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
20
	grpcValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
21
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
22
	"github.com/rs/cors"
23
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
24
	"go.opentelemetry.io/otel"
25
	"google.golang.org/grpc"
26
	"google.golang.org/grpc/credentials"
27
	"google.golang.org/grpc/credentials/insecure"
28
	"google.golang.org/grpc/reflection"
29
	"google.golang.org/protobuf/encoding/protojson"
30
31
	health "google.golang.org/grpc/health/grpc_health_v1"
32
33
	"github.com/Permify/permify/internal/authn/oidc"
34
	"github.com/Permify/permify/internal/authn/preshared"
35
	"github.com/Permify/permify/internal/config"
36
	"github.com/Permify/permify/internal/invoke"
37
	"github.com/Permify/permify/internal/middleware"
38
	"github.com/Permify/permify/internal/storage"
39
	grpcV1 "github.com/Permify/permify/pkg/pb/base/v1"
40
)
41
42
var tracer = otel.Tracer("servers")
43
44
// Container is a struct that holds the invoker and various storage
45
// for permission-related operations. It serves as a central point of access
46
// for interacting with the underlying data and services.
47
type Container struct {
48
	// Invoker for performing permission-related operations
49
	Invoker invoke.Invoker
50
	// DataReader for reading data from storage
51
	DR storage.DataReader
52
	// DataWriter for writing data to storage
53
	DW storage.DataWriter
54
	// BundleReader for reading bundle from storage
55
	BR storage.BundleReader
56
	// BundleWriter for writing bundle to storage
57
	BW storage.BundleWriter
58
	// SchemaReader for reading schemas from storage
59
	SR storage.SchemaReader
60
	// SchemaWriter for writing schemas to storage
61
	SW storage.SchemaWriter
62
	// TenantReader for reading tenant information from storage
63
	TR storage.TenantReader
64
	// TenantWriter for writing tenant information to storage
65
	TW storage.TenantWriter
66
67
	W storage.Watcher
68
}
69
70
// NewContainer is a constructor for the Container struct.
71
// It takes an Invoker, RelationshipReader, RelationshipWriter, SchemaReader, SchemaWriter,
72
// TenantReader, and TenantWriter as arguments, and returns a pointer to a Container instance.
73
func NewContainer(
74
	invoker invoke.Invoker,
75
	dr storage.DataReader,
76
	dw storage.DataWriter,
77
	br storage.BundleReader,
78
	bw storage.BundleWriter,
79
	sr storage.SchemaReader,
80
	sw storage.SchemaWriter,
81
	tr storage.TenantReader,
82
	tw storage.TenantWriter,
83
	w storage.Watcher,
84
) *Container {
85
	return &Container{
86
		Invoker: invoker,
87
		DR:      dr,
88
		DW:      dw,
89
		BR:      br,
90
		BW:      bw,
91
		SR:      sr,
92
		SW:      sw,
93
		TR:      tr,
94
		TW:      tw,
95
		W:       w,
96
	}
97
}
98
99
// Run is a method that starts the Container and its services, including the gRPC server,
100
// an optional HTTP server, and an optional profiler server. It also sets up authentication,
101
// TLS configurations, and interceptors as needed.
102
func (s *Container) Run(
103
	ctx context.Context,
104
	srv *config.Server,
105
	logger *slog.Logger,
106
	dst *config.Distributed,
107
	authentication *config.Authn,
108
	profiler *config.Profiler,
109
	localInvoker invoke.Invoker,
110
) error {
111
	var err error
112
113
	limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec
114
115
	lopts := []logging.Option{
116
		logging.WithLogOnEvents(logging.StartCall, logging.FinishCall),
117
	}
118
119
	unaryInterceptors := []grpc.UnaryServerInterceptor{
120
		grpcValidator.UnaryServerInterceptor(),
121
		grpcRecovery.UnaryServerInterceptor(),
122
		ratelimit.UnaryServerInterceptor(limiter),
123
		logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...),
124
	}
125
126
	streamingInterceptors := []grpc.StreamServerInterceptor{
127
		grpcValidator.StreamServerInterceptor(),
128
		grpcRecovery.StreamServerInterceptor(),
129
		ratelimit.StreamServerInterceptor(limiter),
130
		logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...),
131
	}
132
133
	// Configure authentication based on the provided method ("preshared" or "oidc").
134
	// Add the appropriate interceptors to the unary and streaming interceptors.
135
	if authentication != nil && authentication.Enabled {
136
		switch authentication.Method {
137
		case "preshared":
138
			var authenticator *preshared.KeyAuthn
139
			authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared)
140
			if err != nil {
141
				return err
142
			}
143
			unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.KeyAuthFunc(authenticator)))
144
			streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.KeyAuthFunc(authenticator)))
145
		case "oidc":
146
			var authenticator *oidc.Authn
147
			authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc)
148
			if err != nil {
149
				return err
150
			}
151
			unaryInterceptors = append(unaryInterceptors, oidc.UnaryServerInterceptor(authenticator))
152
			streamingInterceptors = append(streamingInterceptors, oidc.StreamServerInterceptor(authenticator))
153
		default:
154
			return fmt.Errorf("unknown authentication method: '%s'", authentication.Method)
155
		}
156
	}
157
158
	opts := []grpc.ServerOption{
159
		grpc.ChainUnaryInterceptor(unaryInterceptors...),
160
		grpc.ChainStreamInterceptor(streamingInterceptors...),
161
	}
162
163
	if srv.GRPC.TLSConfig.Enabled {
164
		var c credentials.TransportCredentials
165
		c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath)
166
		if err != nil {
167
			return err
168
		}
169
		opts = append(opts, grpc.Creds(c))
170
	}
171
172
	// Create a new gRPC server instance with the provided options.
173
	grpcServer := grpc.NewServer(opts...)
174
175
	// Register various gRPC services to the server.
176
	grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker))
177
	grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR))
178
	grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR))
179
	grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW))
180
	grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW))
181
	grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR))
182
183
	// Register health check and reflection services for gRPC.
184
	health.RegisterHealthServer(grpcServer, NewHealthServer())
185
	reflection.Register(grpcServer)
186
187
	// Create another gRPC server, presumably for invoking permissions.
188
	invokeServer := grpc.NewServer(opts...)
189
	grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker))
190
191
	// Register health check and reflection services for the invokeServer.
192
	health.RegisterHealthServer(invokeServer, NewHealthServer())
193
	reflection.Register(invokeServer)
194
195
	// If profiling is enabled, set up the profiler using the net/http package.
196
	if profiler.Enabled {
197
		// Create a new HTTP ServeMux to register pprof routes.
198
		mux := http.NewServeMux()
199
		mux.HandleFunc("/debug/pprof/", pprof.Index)
200
		mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
201
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
202
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
203
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
204
205
		// Run the profiler server in a separate goroutine.
206
		go func() {
207
			// Log a message indicating the profiler server's start status and port.
208
			slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port))
209
210
			// Define the HTTP server with timeouts and the mux handler for pprof routes.
211
			pprofserver := &http.Server{
212
				Addr:         ":" + profiler.Port,
213
				Handler:      mux,
214
				ReadTimeout:  20 * time.Second,
215
				WriteTimeout: 20 * time.Second,
216
				IdleTimeout:  15 * time.Second,
217
			}
218
219
			// Start the profiler server.
220
			if err := pprofserver.ListenAndServe(); err != nil {
221
				// Check if the error was due to the server being closed, and log it.
222
				if errors.Is(err, http.ErrServerClosed) {
223
					slog.Error("failed to start profiler", err)
224
				}
225
			}
226
		}()
227
	}
228
229
	var lis net.Listener
230
	lis, err = net.Listen("tcp", ":"+srv.GRPC.Port)
231
	if err != nil {
232
		return err
233
	}
234
235
	var invokeLis net.Listener
236
	invokeLis, err = net.Listen("tcp", ":"+dst.Port)
237
	if err != nil {
238
		return err
239
	}
240
241
	// Start the gRPC server.
242
	go func() {
243
		if err := grpcServer.Serve(lis); err != nil {
244
			slog.Error("failed to start grpc server", err)
245
		}
246
	}()
247
248
	go func() {
249
		if err := invokeServer.Serve(invokeLis); err != nil {
250
			slog.Error("failed to start invoke grpc server", err)
251
		}
252
	}()
253
254
	slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port))
255
	slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port))
256
257
	var httpServer *http.Server
258
259
	// Start the optional HTTP server with CORS and optional TLS configurations.
260
	// Connect to the gRPC server and register the HTTP handlers for each service.
261
	if srv.HTTP.Enabled {
262
		options := []grpc.DialOption{
263
			grpc.WithBlock(),
264
			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
265
		}
266
		if srv.GRPC.TLSConfig.Enabled {
267
			c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, "")
268
			if err != nil {
269
				return err
270
			}
271
			options = append(options, grpc.WithTransportCredentials(c))
272
		} else {
273
			options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials()))
274
		}
275
276
		timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
277
		defer cancel()
278
279
		conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...)
280
		if err != nil {
281
			return err
282
		}
283
		defer func() {
284
			if err = conn.Close(); err != nil {
285
				slog.Error("Failed to close gRPC connection: %v", err)
286
			}
287
		}()
288
289
		healthClient := health.NewHealthClient(conn)
290
		muxOpts := []runtime.ServeMuxOption{
291
			runtime.WithHealthzEndpoint(healthClient),
292
			runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{
293
				Marshaler: &runtime.JSONPb{
294
					MarshalOptions: protojson.MarshalOptions{
295
						UseProtoNames:   true,
296
						EmitUnpopulated: true,
297
					},
298
					UnmarshalOptions: protojson.UnmarshalOptions{
299
						DiscardUnknown: true,
300
					},
301
				},
302
			}),
303
		}
304
305
		mux := runtime.NewServeMux(muxOpts...)
306
307
		if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil {
308
			return err
309
		}
310
		if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil {
311
			return err
312
		}
313
		if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil {
314
			return err
315
		}
316
		if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil {
317
			return err
318
		}
319
		if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil {
320
			return err
321
		}
322
323
		httpServer = &http.Server{
324
			Addr: ":" + srv.HTTP.Port,
325
			Handler: cors.New(cors.Options{
326
				AllowCredentials: true,
327
				AllowedOrigins:   srv.HTTP.CORSAllowedOrigins,
328
				AllowedHeaders:   srv.HTTP.CORSAllowedHeaders,
329
				AllowedMethods: []string{
330
					http.MethodGet, http.MethodPost,
331
					http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut,
332
				},
333
			}).Handler(mux),
334
			ReadHeaderTimeout: 5 * time.Second,
335
		}
336
337
		// Start the HTTP server with TLS if enabled, otherwise without TLS.
338
		go func() {
339
			var err error
340
			if srv.HTTP.TLSConfig.Enabled {
341
				err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath)
342
			} else {
343
				err = httpServer.ListenAndServe()
344
			}
345
			if !errors.Is(err, http.ErrServerClosed) {
346
				slog.Error(err.Error())
347
			}
348
		}()
349
350
		slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port))
351
	}
352
353
	// Wait for the context to be canceled (e.g., due to a signal).
354
	<-ctx.Done()
355
356
	// Shutdown the servers gracefully.
357
	ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second)
358
	defer cancel()
359
360
	if httpServer != nil {
361
		if err := httpServer.Shutdown(ctxShutdown); err != nil {
362
			slog.Error(err.Error())
363
			return err
364
		}
365
	}
366
367
	// Gracefully stop the gRPC server.
368
	grpcServer.GracefulStop()
369
	// Gracefully stop the invoke server.
370
	invokeServer.GracefulStop()
371
372
	slog.Info("gracefully shutting down")
373
374
	return nil
375
}
376
377
// InterceptorLogger adapts slog logger to interceptor logger.
378
func InterceptorLogger(l *slog.Logger) logging.Logger {
379
	return logging.LoggerFunc(func(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
380
		l.Log(ctx, slog.Level(lvl), msg, fields...)
381
	})
382
}
383