| Conditions | 39 |
| Total Lines | 292 |
| Code Lines | 182 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like servers.*Container.Run often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | package servers |
||
| 98 | func (s *Container) Run( |
||
| 99 | ctx context.Context, |
||
| 100 | srv *config.Server, |
||
| 101 | logger *slog.Logger, |
||
| 102 | dst *config.Distributed, |
||
| 103 | authentication *config.Authn, |
||
| 104 | profiler *config.Profiler, |
||
| 105 | localInvoker invoke.Invoker, |
||
| 106 | ) error { |
||
| 107 | var err error |
||
| 108 | |||
| 109 | limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec |
||
| 110 | |||
| 111 | lopts := []logging.Option{ |
||
| 112 | logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), |
||
| 113 | } |
||
| 114 | |||
| 115 | unaryInterceptors := []grpc.UnaryServerInterceptor{ |
||
| 116 | grpcValidator.UnaryServerInterceptor(), |
||
| 117 | grpcRecovery.UnaryServerInterceptor(), |
||
| 118 | ratelimit.UnaryServerInterceptor(limiter), |
||
| 119 | logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...), |
||
| 120 | } |
||
| 121 | |||
| 122 | streamingInterceptors := []grpc.StreamServerInterceptor{ |
||
| 123 | grpcValidator.StreamServerInterceptor(), |
||
| 124 | grpcRecovery.StreamServerInterceptor(), |
||
| 125 | ratelimit.StreamServerInterceptor(limiter), |
||
| 126 | logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...), |
||
| 127 | } |
||
| 128 | |||
| 129 | // Configure authentication based on the provided method ("preshared" or "oidc"). |
||
| 130 | // Add the appropriate interceptors to the unary and streaming interceptors. |
||
| 131 | if authentication != nil && authentication.Enabled { |
||
| 132 | switch authentication.Method { |
||
| 133 | case "preshared": |
||
| 134 | var authenticator *preshared.KeyAuthn |
||
| 135 | authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared) |
||
| 136 | if err != nil { |
||
| 137 | return err |
||
| 138 | } |
||
| 139 | unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
||
| 140 | streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
||
| 141 | case "oidc": |
||
| 142 | var authenticator *oidc.Authn |
||
| 143 | authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc) |
||
| 144 | if err != nil { |
||
| 145 | return err |
||
| 146 | } |
||
| 147 | unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.AuthFunc(authenticator))) |
||
| 148 | streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.AuthFunc(authenticator))) |
||
| 149 | default: |
||
| 150 | return fmt.Errorf("unknown authentication method: '%s'", authentication.Method) |
||
| 151 | } |
||
| 152 | } |
||
| 153 | |||
| 154 | opts := []grpc.ServerOption{ |
||
| 155 | grpc.ChainUnaryInterceptor(unaryInterceptors...), |
||
| 156 | grpc.ChainStreamInterceptor(streamingInterceptors...), |
||
| 157 | grpc.StatsHandler(otelgrpc.NewServerHandler()), |
||
| 158 | } |
||
| 159 | |||
| 160 | if srv.GRPC.TLSConfig.Enabled { |
||
| 161 | var c credentials.TransportCredentials |
||
| 162 | c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath) |
||
| 163 | if err != nil { |
||
| 164 | return err |
||
| 165 | } |
||
| 166 | opts = append(opts, grpc.Creds(c)) |
||
| 167 | } |
||
| 168 | |||
| 169 | // Create a new gRPC server instance with the provided options. |
||
| 170 | grpcServer := grpc.NewServer(opts...) |
||
| 171 | |||
| 172 | // Register various gRPC services to the server. |
||
| 173 | grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker)) |
||
| 174 | grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR)) |
||
| 175 | grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR)) |
||
| 176 | grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW)) |
||
| 177 | grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW)) |
||
| 178 | grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR)) |
||
| 179 | |||
| 180 | // Register health check and reflection services for gRPC. |
||
| 181 | health.RegisterHealthServer(grpcServer, NewHealthServer()) |
||
| 182 | reflection.Register(grpcServer) |
||
| 183 | |||
| 184 | // Create another gRPC server, presumably for invoking permissions. |
||
| 185 | invokeServer := grpc.NewServer(opts...) |
||
| 186 | grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker)) |
||
| 187 | |||
| 188 | // Register health check and reflection services for the invokeServer. |
||
| 189 | health.RegisterHealthServer(invokeServer, NewHealthServer()) |
||
| 190 | reflection.Register(invokeServer) |
||
| 191 | |||
| 192 | // If profiling is enabled, set up the profiler using the net/http package. |
||
| 193 | if profiler.Enabled { |
||
| 194 | // Create a new HTTP ServeMux to register pprof routes. |
||
| 195 | mux := http.NewServeMux() |
||
| 196 | mux.HandleFunc("/debug/pprof/", pprof.Index) |
||
| 197 | mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
||
| 198 | mux.HandleFunc("/debug/pprof/profile", pprof.Profile) |
||
| 199 | mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) |
||
| 200 | mux.HandleFunc("/debug/pprof/trace", pprof.Trace) |
||
| 201 | |||
| 202 | // Run the profiler server in a separate goroutine. |
||
| 203 | go func() { |
||
| 204 | // Log a message indicating the profiler server's start status and port. |
||
| 205 | slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port)) |
||
| 206 | |||
| 207 | // Define the HTTP server with timeouts and the mux handler for pprof routes. |
||
| 208 | pprofserver := &http.Server{ |
||
| 209 | Addr: ":" + profiler.Port, |
||
| 210 | Handler: mux, |
||
| 211 | ReadTimeout: 20 * time.Second, |
||
| 212 | WriteTimeout: 20 * time.Second, |
||
| 213 | IdleTimeout: 15 * time.Second, |
||
| 214 | } |
||
| 215 | |||
| 216 | // Start the profiler server. |
||
| 217 | if err := pprofserver.ListenAndServe(); err != nil { |
||
| 218 | // Check if the error was due to the server being closed, and log it. |
||
| 219 | if errors.Is(err, http.ErrServerClosed) { |
||
| 220 | slog.Error("failed to start profiler", slog.Any("error", err)) |
||
| 221 | } |
||
| 222 | } |
||
| 223 | }() |
||
| 224 | } |
||
| 225 | |||
| 226 | var lis net.Listener |
||
| 227 | lis, err = net.Listen("tcp", ":"+srv.GRPC.Port) |
||
| 228 | if err != nil { |
||
| 229 | return err |
||
| 230 | } |
||
| 231 | |||
| 232 | var invokeLis net.Listener |
||
| 233 | invokeLis, err = net.Listen("tcp", ":"+dst.Port) |
||
| 234 | if err != nil { |
||
| 235 | return err |
||
| 236 | } |
||
| 237 | |||
| 238 | // Start the gRPC server. |
||
| 239 | go func() { |
||
| 240 | if err := grpcServer.Serve(lis); err != nil { |
||
| 241 | slog.Error("failed to start grpc server", slog.Any("error", err)) |
||
| 242 | } |
||
| 243 | }() |
||
| 244 | |||
| 245 | go func() { |
||
| 246 | if err := invokeServer.Serve(invokeLis); err != nil { |
||
| 247 | slog.Error("failed to start invoke grpc server", slog.Any("error", err)) |
||
| 248 | } |
||
| 249 | }() |
||
| 250 | |||
| 251 | slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port)) |
||
| 252 | slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port)) |
||
| 253 | |||
| 254 | var httpServer *http.Server |
||
| 255 | |||
| 256 | // Start the optional HTTP server with CORS and optional TLS configurations. |
||
| 257 | // Connect to the gRPC server and register the HTTP handlers for each service. |
||
| 258 | if srv.HTTP.Enabled { |
||
| 259 | options := []grpc.DialOption{ |
||
| 260 | grpc.WithBlock(), |
||
| 261 | grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), |
||
| 262 | } |
||
| 263 | if srv.GRPC.TLSConfig.Enabled { |
||
| 264 | c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.NameOverride) |
||
| 265 | if err != nil { |
||
| 266 | return err |
||
| 267 | } |
||
| 268 | options = append(options, grpc.WithTransportCredentials(c)) |
||
| 269 | } else { |
||
| 270 | options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) |
||
| 271 | } |
||
| 272 | |||
| 273 | timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) |
||
| 274 | defer cancel() |
||
| 275 | |||
| 276 | conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...) |
||
| 277 | if err != nil { |
||
| 278 | return err |
||
| 279 | } |
||
| 280 | defer func() { |
||
| 281 | if err = conn.Close(); err != nil { |
||
| 282 | slog.Error("Failed to close gRPC connection", slog.Any("error", err)) |
||
| 283 | } |
||
| 284 | }() |
||
| 285 | |||
| 286 | healthClient := health.NewHealthClient(conn) |
||
| 287 | muxOpts := []runtime.ServeMuxOption{ |
||
| 288 | runtime.WithHealthzEndpoint(healthClient), |
||
| 289 | runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{ |
||
| 290 | Marshaler: &runtime.JSONPb{ |
||
| 291 | MarshalOptions: protojson.MarshalOptions{ |
||
| 292 | UseProtoNames: true, |
||
| 293 | EmitUnpopulated: true, |
||
| 294 | }, |
||
| 295 | UnmarshalOptions: protojson.UnmarshalOptions{ |
||
| 296 | DiscardUnknown: true, |
||
| 297 | }, |
||
| 298 | }, |
||
| 299 | }), |
||
| 300 | runtime.WithMiddlewares(func(next runtime.HandlerFunc) runtime.HandlerFunc { |
||
| 301 | type key struct{} |
||
| 302 | |||
| 303 | otelHandler := otelhttp.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||
| 304 | pathParams := r.Context().Value(key{}).(map[string]string) |
||
| 305 | next(w, r, pathParams) |
||
| 306 | }), "server", |
||
| 307 | otelhttp.WithServerName("permify"), |
||
| 308 | otelhttp.WithSpanNameFormatter(httpNameFormatter), |
||
| 309 | ) |
||
| 310 | |||
| 311 | return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { |
||
| 312 | r = r.WithContext(context.WithValue(r.Context(), key{}, pathParams)) |
||
| 313 | otelHandler.ServeHTTP(w, r) |
||
| 314 | } |
||
| 315 | }), |
||
| 316 | } |
||
| 317 | |||
| 318 | mux := runtime.NewServeMux(muxOpts...) |
||
| 319 | |||
| 320 | if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil { |
||
| 321 | return err |
||
| 322 | } |
||
| 323 | if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil { |
||
| 324 | return err |
||
| 325 | } |
||
| 326 | if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil { |
||
| 327 | return err |
||
| 328 | } |
||
| 329 | if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil { |
||
| 330 | return err |
||
| 331 | } |
||
| 332 | if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil { |
||
| 333 | return err |
||
| 334 | } |
||
| 335 | |||
| 336 | corsHandler := cors.New(cors.Options{ |
||
| 337 | AllowCredentials: true, |
||
| 338 | AllowedOrigins: srv.HTTP.CORSAllowedOrigins, |
||
| 339 | AllowedHeaders: srv.HTTP.CORSAllowedHeaders, |
||
| 340 | AllowedMethods: []string{ |
||
| 341 | http.MethodGet, http.MethodPost, |
||
| 342 | http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut, |
||
| 343 | }, |
||
| 344 | }).Handler(mux) |
||
| 345 | |||
| 346 | httpServer = &http.Server{ |
||
| 347 | Addr: ":" + srv.HTTP.Port, |
||
| 348 | Handler: corsHandler, |
||
| 349 | ReadHeaderTimeout: 5 * time.Second, |
||
| 350 | } |
||
| 351 | |||
| 352 | // Start the HTTP server with TLS if enabled, otherwise without TLS. |
||
| 353 | go func() { |
||
| 354 | var err error |
||
| 355 | if srv.HTTP.TLSConfig.Enabled { |
||
| 356 | err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath) |
||
| 357 | } else { |
||
| 358 | err = httpServer.ListenAndServe() |
||
| 359 | } |
||
| 360 | if !errors.Is(err, http.ErrServerClosed) { |
||
| 361 | slog.Error(err.Error()) |
||
| 362 | } |
||
| 363 | }() |
||
| 364 | |||
| 365 | slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port)) |
||
| 366 | } |
||
| 367 | |||
| 368 | // Wait for the context to be canceled (e.g., due to a signal). |
||
| 369 | <-ctx.Done() |
||
| 370 | |||
| 371 | // Shutdown the servers gracefully. |
||
| 372 | ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second) |
||
| 373 | defer cancel() |
||
| 374 | |||
| 375 | if httpServer != nil { |
||
| 376 | if err := httpServer.Shutdown(ctxShutdown); err != nil { |
||
| 377 | slog.Error(err.Error()) |
||
| 378 | return err |
||
| 379 | } |
||
| 380 | } |
||
| 381 | |||
| 382 | // Gracefully stop the gRPC server. |
||
| 383 | grpcServer.GracefulStop() |
||
| 384 | // Gracefully stop the invoke server. |
||
| 385 | invokeServer.GracefulStop() |
||
| 386 | |||
| 387 | slog.Info("gracefully shutting down") |
||
| 388 | |||
| 389 | return nil |
||
| 390 | } |
||
| 407 |