| Conditions | 36 |
| Total Lines | 273 |
| Code Lines | 169 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like servers.*Container.Run often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | package servers |
||
| 102 | func (s *Container) Run( |
||
| 103 | ctx context.Context, |
||
| 104 | srv *config.Server, |
||
| 105 | logger *slog.Logger, |
||
| 106 | dst *config.Distributed, |
||
| 107 | authentication *config.Authn, |
||
| 108 | profiler *config.Profiler, |
||
| 109 | localInvoker invoke.Invoker, |
||
| 110 | ) error { |
||
| 111 | var err error |
||
| 112 | |||
| 113 | limiter := middleware.NewRateLimiter(srv.RateLimit) // for example 1000 req/sec |
||
| 114 | |||
| 115 | lopts := []logging.Option{ |
||
| 116 | logging.WithLogOnEvents(logging.StartCall, logging.FinishCall), |
||
| 117 | } |
||
| 118 | |||
| 119 | unaryInterceptors := []grpc.UnaryServerInterceptor{ |
||
| 120 | grpcValidator.UnaryServerInterceptor(), |
||
| 121 | grpcRecovery.UnaryServerInterceptor(), |
||
| 122 | ratelimit.UnaryServerInterceptor(limiter), |
||
| 123 | logging.UnaryServerInterceptor(InterceptorLogger(logger), lopts...), |
||
| 124 | } |
||
| 125 | |||
| 126 | streamingInterceptors := []grpc.StreamServerInterceptor{ |
||
| 127 | grpcValidator.StreamServerInterceptor(), |
||
| 128 | grpcRecovery.StreamServerInterceptor(), |
||
| 129 | ratelimit.StreamServerInterceptor(limiter), |
||
| 130 | logging.StreamServerInterceptor(InterceptorLogger(logger), lopts...), |
||
| 131 | } |
||
| 132 | |||
| 133 | // Configure authentication based on the provided method ("preshared" or "oidc"). |
||
| 134 | // Add the appropriate interceptors to the unary and streaming interceptors. |
||
| 135 | if authentication != nil && authentication.Enabled { |
||
| 136 | switch authentication.Method { |
||
| 137 | case "preshared": |
||
| 138 | var authenticator *preshared.KeyAuthn |
||
| 139 | authenticator, err = preshared.NewKeyAuthn(ctx, authentication.Preshared) |
||
| 140 | if err != nil { |
||
| 141 | return err |
||
| 142 | } |
||
| 143 | unaryInterceptors = append(unaryInterceptors, grpcAuth.UnaryServerInterceptor(middleware.KeyAuthFunc(authenticator))) |
||
| 144 | streamingInterceptors = append(streamingInterceptors, grpcAuth.StreamServerInterceptor(middleware.KeyAuthFunc(authenticator))) |
||
| 145 | case "oidc": |
||
| 146 | var authenticator *oidc.Authn |
||
| 147 | authenticator, err = oidc.NewOidcAuthn(ctx, authentication.Oidc) |
||
| 148 | if err != nil { |
||
| 149 | return err |
||
| 150 | } |
||
| 151 | unaryInterceptors = append(unaryInterceptors, oidc.UnaryServerInterceptor(authenticator)) |
||
| 152 | streamingInterceptors = append(streamingInterceptors, oidc.StreamServerInterceptor(authenticator)) |
||
| 153 | default: |
||
| 154 | return fmt.Errorf("unknown authentication method: '%s'", authentication.Method) |
||
| 155 | } |
||
| 156 | } |
||
| 157 | |||
| 158 | opts := []grpc.ServerOption{ |
||
| 159 | grpc.ChainUnaryInterceptor(unaryInterceptors...), |
||
| 160 | grpc.ChainStreamInterceptor(streamingInterceptors...), |
||
| 161 | } |
||
| 162 | |||
| 163 | if srv.GRPC.TLSConfig.Enabled { |
||
| 164 | var c credentials.TransportCredentials |
||
| 165 | c, err = credentials.NewServerTLSFromFile(srv.GRPC.TLSConfig.CertPath, srv.GRPC.TLSConfig.KeyPath) |
||
| 166 | if err != nil { |
||
| 167 | return err |
||
| 168 | } |
||
| 169 | opts = append(opts, grpc.Creds(c)) |
||
| 170 | } |
||
| 171 | |||
| 172 | // Create a new gRPC server instance with the provided options. |
||
| 173 | grpcServer := grpc.NewServer(opts...) |
||
| 174 | |||
| 175 | // Register various gRPC services to the server. |
||
| 176 | grpcV1.RegisterPermissionServer(grpcServer, NewPermissionServer(s.Invoker)) |
||
| 177 | grpcV1.RegisterSchemaServer(grpcServer, NewSchemaServer(s.SW, s.SR)) |
||
| 178 | grpcV1.RegisterDataServer(grpcServer, NewDataServer(s.DR, s.DW, s.BR, s.SR)) |
||
| 179 | grpcV1.RegisterBundleServer(grpcServer, NewBundleServer(s.BR, s.BW)) |
||
| 180 | grpcV1.RegisterTenancyServer(grpcServer, NewTenancyServer(s.TR, s.TW)) |
||
| 181 | grpcV1.RegisterWatchServer(grpcServer, NewWatchServer(s.W, s.DR)) |
||
| 182 | |||
| 183 | // Register health check and reflection services for gRPC. |
||
| 184 | health.RegisterHealthServer(grpcServer, NewHealthServer()) |
||
| 185 | reflection.Register(grpcServer) |
||
| 186 | |||
| 187 | // Create another gRPC server, presumably for invoking permissions. |
||
| 188 | invokeServer := grpc.NewServer(opts...) |
||
| 189 | grpcV1.RegisterPermissionServer(invokeServer, NewPermissionServer(localInvoker)) |
||
| 190 | |||
| 191 | // Register health check and reflection services for the invokeServer. |
||
| 192 | health.RegisterHealthServer(invokeServer, NewHealthServer()) |
||
| 193 | reflection.Register(invokeServer) |
||
| 194 | |||
| 195 | // If profiling is enabled, set up the profiler using the net/http package. |
||
| 196 | if profiler.Enabled { |
||
| 197 | // Create a new HTTP ServeMux to register pprof routes. |
||
| 198 | mux := http.NewServeMux() |
||
| 199 | mux.HandleFunc("/debug/pprof/", pprof.Index) |
||
| 200 | mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) |
||
| 201 | mux.HandleFunc("/debug/pprof/profile", pprof.Profile) |
||
| 202 | mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) |
||
| 203 | mux.HandleFunc("/debug/pprof/trace", pprof.Trace) |
||
| 204 | |||
| 205 | // Run the profiler server in a separate goroutine. |
||
| 206 | go func() { |
||
| 207 | // Log a message indicating the profiler server's start status and port. |
||
| 208 | slog.Info(fmt.Sprintf("🚀 profiler server successfully started: %s", profiler.Port)) |
||
| 209 | |||
| 210 | // Define the HTTP server with timeouts and the mux handler for pprof routes. |
||
| 211 | pprofserver := &http.Server{ |
||
| 212 | Addr: ":" + profiler.Port, |
||
| 213 | Handler: mux, |
||
| 214 | ReadTimeout: 20 * time.Second, |
||
| 215 | WriteTimeout: 20 * time.Second, |
||
| 216 | IdleTimeout: 15 * time.Second, |
||
| 217 | } |
||
| 218 | |||
| 219 | // Start the profiler server. |
||
| 220 | if err := pprofserver.ListenAndServe(); err != nil { |
||
| 221 | // Check if the error was due to the server being closed, and log it. |
||
| 222 | if errors.Is(err, http.ErrServerClosed) { |
||
| 223 | slog.Error("failed to start profiler", err) |
||
| 224 | } |
||
| 225 | } |
||
| 226 | }() |
||
| 227 | } |
||
| 228 | |||
| 229 | var lis net.Listener |
||
| 230 | lis, err = net.Listen("tcp", ":"+srv.GRPC.Port) |
||
| 231 | if err != nil { |
||
| 232 | return err |
||
| 233 | } |
||
| 234 | |||
| 235 | var invokeLis net.Listener |
||
| 236 | invokeLis, err = net.Listen("tcp", ":"+dst.Port) |
||
| 237 | if err != nil { |
||
| 238 | return err |
||
| 239 | } |
||
| 240 | |||
| 241 | // Start the gRPC server. |
||
| 242 | go func() { |
||
| 243 | if err := grpcServer.Serve(lis); err != nil { |
||
| 244 | slog.Error("failed to start grpc server", err) |
||
| 245 | } |
||
| 246 | }() |
||
| 247 | |||
| 248 | go func() { |
||
| 249 | if err := invokeServer.Serve(invokeLis); err != nil { |
||
| 250 | slog.Error("failed to start invoke grpc server", err) |
||
| 251 | } |
||
| 252 | }() |
||
| 253 | |||
| 254 | slog.Info(fmt.Sprintf("🚀 grpc server successfully started: %s", srv.GRPC.Port)) |
||
| 255 | slog.Info(fmt.Sprintf("🚀 invoker grpc server successfully started: %s", dst.Port)) |
||
| 256 | |||
| 257 | var httpServer *http.Server |
||
| 258 | |||
| 259 | // Start the optional HTTP server with CORS and optional TLS configurations. |
||
| 260 | // Connect to the gRPC server and register the HTTP handlers for each service. |
||
| 261 | if srv.HTTP.Enabled { |
||
| 262 | options := []grpc.DialOption{ |
||
| 263 | grpc.WithBlock(), |
||
| 264 | grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), |
||
| 265 | } |
||
| 266 | if srv.GRPC.TLSConfig.Enabled { |
||
| 267 | c, err := credentials.NewClientTLSFromFile(srv.GRPC.TLSConfig.CertPath, "") |
||
| 268 | if err != nil { |
||
| 269 | return err |
||
| 270 | } |
||
| 271 | options = append(options, grpc.WithTransportCredentials(c)) |
||
| 272 | } else { |
||
| 273 | options = append(options, grpc.WithTransportCredentials(insecure.NewCredentials())) |
||
| 274 | } |
||
| 275 | |||
| 276 | timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) |
||
| 277 | defer cancel() |
||
| 278 | |||
| 279 | conn, err := grpc.DialContext(timeoutCtx, ":"+srv.GRPC.Port, options...) |
||
| 280 | if err != nil { |
||
| 281 | return err |
||
| 282 | } |
||
| 283 | defer func() { |
||
| 284 | if err = conn.Close(); err != nil { |
||
| 285 | slog.Error("Failed to close gRPC connection: %v", err) |
||
| 286 | } |
||
| 287 | }() |
||
| 288 | |||
| 289 | healthClient := health.NewHealthClient(conn) |
||
| 290 | muxOpts := []runtime.ServeMuxOption{ |
||
| 291 | runtime.WithHealthzEndpoint(healthClient), |
||
| 292 | runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.HTTPBodyMarshaler{ |
||
| 293 | Marshaler: &runtime.JSONPb{ |
||
| 294 | MarshalOptions: protojson.MarshalOptions{ |
||
| 295 | UseProtoNames: true, |
||
| 296 | EmitUnpopulated: true, |
||
| 297 | }, |
||
| 298 | UnmarshalOptions: protojson.UnmarshalOptions{ |
||
| 299 | DiscardUnknown: true, |
||
| 300 | }, |
||
| 301 | }, |
||
| 302 | }), |
||
| 303 | } |
||
| 304 | |||
| 305 | mux := runtime.NewServeMux(muxOpts...) |
||
| 306 | |||
| 307 | if err = grpcV1.RegisterPermissionHandler(ctx, mux, conn); err != nil { |
||
| 308 | return err |
||
| 309 | } |
||
| 310 | if err = grpcV1.RegisterSchemaHandler(ctx, mux, conn); err != nil { |
||
| 311 | return err |
||
| 312 | } |
||
| 313 | if err = grpcV1.RegisterDataHandler(ctx, mux, conn); err != nil { |
||
| 314 | return err |
||
| 315 | } |
||
| 316 | if err = grpcV1.RegisterBundleHandler(ctx, mux, conn); err != nil { |
||
| 317 | return err |
||
| 318 | } |
||
| 319 | if err = grpcV1.RegisterTenancyHandler(ctx, mux, conn); err != nil { |
||
| 320 | return err |
||
| 321 | } |
||
| 322 | |||
| 323 | httpServer = &http.Server{ |
||
| 324 | Addr: ":" + srv.HTTP.Port, |
||
| 325 | Handler: cors.New(cors.Options{ |
||
| 326 | AllowCredentials: true, |
||
| 327 | AllowedOrigins: srv.HTTP.CORSAllowedOrigins, |
||
| 328 | AllowedHeaders: srv.HTTP.CORSAllowedHeaders, |
||
| 329 | AllowedMethods: []string{ |
||
| 330 | http.MethodGet, http.MethodPost, |
||
| 331 | http.MethodHead, http.MethodPatch, http.MethodDelete, http.MethodPut, |
||
| 332 | }, |
||
| 333 | }).Handler(mux), |
||
| 334 | ReadHeaderTimeout: 5 * time.Second, |
||
| 335 | } |
||
| 336 | |||
| 337 | // Start the HTTP server with TLS if enabled, otherwise without TLS. |
||
| 338 | go func() { |
||
| 339 | var err error |
||
| 340 | if srv.HTTP.TLSConfig.Enabled { |
||
| 341 | err = httpServer.ListenAndServeTLS(srv.HTTP.TLSConfig.CertPath, srv.HTTP.TLSConfig.KeyPath) |
||
| 342 | } else { |
||
| 343 | err = httpServer.ListenAndServe() |
||
| 344 | } |
||
| 345 | if !errors.Is(err, http.ErrServerClosed) { |
||
| 346 | slog.Error(err.Error()) |
||
| 347 | } |
||
| 348 | }() |
||
| 349 | |||
| 350 | slog.Info(fmt.Sprintf("🚀 http server successfully started: %s", srv.HTTP.Port)) |
||
| 351 | } |
||
| 352 | |||
| 353 | // Wait for the context to be canceled (e.g., due to a signal). |
||
| 354 | <-ctx.Done() |
||
| 355 | |||
| 356 | // Shutdown the servers gracefully. |
||
| 357 | ctxShutdown, cancel := context.WithTimeout(ctx, 5*time.Second) |
||
| 358 | defer cancel() |
||
| 359 | |||
| 360 | if httpServer != nil { |
||
| 361 | if err := httpServer.Shutdown(ctxShutdown); err != nil { |
||
| 362 | slog.Error(err.Error()) |
||
| 363 | return err |
||
| 364 | } |
||
| 365 | } |
||
| 366 | |||
| 367 | // Gracefully stop the gRPC server. |
||
| 368 | grpcServer.GracefulStop() |
||
| 369 | // Gracefully stop the invoke server. |
||
| 370 | invokeServer.GracefulStop() |
||
| 371 | |||
| 372 | slog.Info("gracefully shutting down") |
||
| 373 | |||
| 374 | return nil |
||
| 375 | } |
||
| 383 |