Passed
Push — master ( 7adeef...ae6ff4 )
by Tolga
01:09 queued 14s
created

cmd.getLogLevel   B

Complexity

Conditions 6

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 12
nop 1
dl 0
loc 12
rs 8.6666
c 0
b 0
f 0
1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"io"
8
	"log/slog"
9
	"os"
10
	"os/signal"
11
	"strings"
12
	"syscall"
13
	"time"
14
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	cacheDecorator "github.com/Permify/permify/internal/storage/decorators/cache"
24
	cbDecorator "github.com/Permify/permify/internal/storage/decorators/circuitBreaker"
25
	sfDecorator "github.com/Permify/permify/internal/storage/decorators/singleflight"
26
	"github.com/Permify/permify/internal/storage/postgres/gc"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
43
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
44
)
45
46
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
47
// The command takes no arguments and runs the serve() function to start the Permify service.
48
// The command has a short description of what it does.
49
func NewServeCommand() *cobra.Command {
50
	command := &cobra.Command{
51
		Use:   "serve",
52
		Short: "serve the Permify server",
53
		RunE:  serve(),
54
		Args:  cobra.NoArgs,
55
	}
56
57
	conf := config.DefaultConfig()
58
	f := command.Flags()
59
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
60
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
61
	f.String("account-id", conf.AccountID, "account id")
62
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
63
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
64
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
65
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
66
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
67
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
68
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
69
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
70
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
71
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
72
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
73
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
74
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
75
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
76
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
77
	f.String("log-file", conf.Log.File, "logger file destination")
78
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
79
	f.String("authn-method", conf.Authn.Method, "server authentication method")
80
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
81
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
82
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
83
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
84
	f.Duration("authn-oidc-backoff-interval", conf.Authn.Oidc.BackoffInterval, "backoff interval for the OpenID Connect configuration")
85
	f.Duration("authn-oidc-backoff-frequency", conf.Authn.Oidc.BackoffFrequency, "backoff frequency for the OpenID Connect configuration")
86
	f.Int("authn-oidc-backoff-max-retries", conf.Authn.Oidc.BackoffMaxRetries, "defines the maximum number of retries for the OpenID Connect configuration")
87
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
88
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
89
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
90
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
91
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
92
	f.String("tracer-urlpath", conf.Tracer.URLPath, "allow to set url path for otlp exporter")
93
	f.StringSlice("tracer-headers", conf.Tracer.Headers, "allows setting custom headers for the tracer exporter in key-value pairs")
94
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
95
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
96
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
97
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
98
	f.String("meter-urlpath", conf.Meter.URLPath, "allow to set url path for otlp exporter")
99
	f.StringSlice("meter-headers", conf.Meter.Headers, "allows setting custom headers for the metric exporter in key-value pairs")
100
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
101
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
102
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
103
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
104
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
105
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
106
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
107
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
108
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
109
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
110
	f.String("database-writer-uri", conf.Database.Writer.URI, "writer uri of your data source to store relation tuples and schema")
111
	f.String("database-reader-uri", conf.Database.Reader.URI, "reader uri of your data source to store relation tuples and schema")
112
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
113
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "maximum number of parallel connections that can be made to the database at any time")
114
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "maximum number of idle connections that can be made to the database at any time")
115
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
116
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
117
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
118
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
119
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
120
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
121
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
122
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
123
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
124
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
125
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
126
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
127
128
	// SilenceUsage is set to true to suppress usage when an error occurs
129
	command.SilenceUsage = true
130
131
	command.PreRun = func(cmd *cobra.Command, args []string) {
132
		flags.RegisterServeFlags(f)
133
	}
134
135
	return command
136
}
137
138
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
139
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
140
// It then creates a ServiceContainer and runs it with the given configuration.
141
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
142
// It returns an error if there is an issue with any of the components or if any goroutine fails.
143
func serve() func(cmd *cobra.Command, args []string) error {
144
	return func(cmd *cobra.Command, args []string) error {
145
		var cfg *config.Config
146
		var err error
147
		cfgFile := viper.GetString("config.file")
148
		if cfgFile != "" {
149
			cfg, err = config.NewConfigWithFile(cfgFile)
150
			if err != nil {
151
				return fmt.Errorf("failed to create new config: %w", err)
152
			}
153
154
			if err = viper.Unmarshal(cfg); err != nil {
155
				return fmt.Errorf("failed to unmarshal config: %w", err)
156
			}
157
		} else {
158
			// Load configuration
159
			cfg, err = config.NewConfig()
160
			if err != nil {
161
				return fmt.Errorf("failed to create new config: %w", err)
162
			}
163
164
			if err = viper.Unmarshal(cfg); err != nil {
165
				return fmt.Errorf("failed to unmarshal config: %w", err)
166
			}
167
		}
168
169
		// Print banner and initialize logger
170
		internal.PrintBanner()
171
172
		var handler slog.Handler
173
174
		var ioWriter io.Writer
175
176
		ioWriter = os.Stdout
177
178
		if cfg.Log.File != "" {
179
			if err := os.MkdirAll(cfg.Log.File, 0o750); err != nil {
180
				panic(err)
181
			}
182
183
			file, err := os.OpenFile(cfg.Log.File+"/app.json", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)
184
			if err != nil {
185
				panic(err)
186
			}
187
			defer file.Close()
188
			ioWriter = io.MultiWriter(file, os.Stdout)
189
190
		}
191
192
		switch cfg.Log.Output {
193
		case "json":
194
			handler = telemetry.OtelHandler{
195
				Next: slog.NewJSONHandler(ioWriter, &slog.HandlerOptions{
196
					Level: getLogLevel(cfg.Log.Level),
197
				}),
198
			}
199
		case "text":
200
			handler = telemetry.OtelHandler{
201
				Next: slog.NewTextHandler(ioWriter, &slog.HandlerOptions{
202
					Level: getLogLevel(cfg.Log.Level),
203
				}),
204
			}
205
		default:
206
			handler = telemetry.OtelHandler{
207
				Next: slog.NewTextHandler(ioWriter, &slog.HandlerOptions{
208
					Level: getLogLevel(cfg.Log.Level),
209
				}),
210
			}
211
		}
212
213
		logger := slog.New(handler)
214
215
		slog.SetDefault(logger)
216
217
		slog.Info("🚀 starting permify service...")
218
219
		internal.Identifier = cfg.AccountID
220
		if internal.Identifier == "" {
221
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
222
			slog.Error(message)
223
224
			ticker := time.NewTicker(24 * time.Hour)
225
			defer ticker.Stop()
226
227
			go func() {
228
				for range ticker.C {
229
					slog.Error(message)
230
				}
231
			}()
232
		}
233
234
		// Set up context and signal handling
235
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
236
		defer stop()
237
238
		// Run database migration if enabled
239
		if cfg.Database.AutoMigrate {
240
			err = storage.Migrate(cfg.Database)
241
			if err != nil {
242
				slog.Error("failed to migrate database", slog.Any("error", err))
243
			}
244
		}
245
246
		// Initialize database
247
		db, err := factories.DatabaseFactory(cfg.Database)
248
		if err != nil {
249
			slog.Error("failed to initialize database", slog.Any("error", err))
250
			return err
251
		}
252
		defer func() {
253
			if err = db.Close(); err != nil {
254
				slog.Error("failed to close database", slog.Any("error", err))
255
			}
256
		}()
257
258
		// Tracing
259
		if cfg.Tracer.Enabled {
260
			headers := map[string]string{}
261
			for _, header := range cfg.Tracer.Headers {
262
				h := strings.Split(header, ":")
263
				if len(h) != 2 {
264
					return errors.New("invalid header format; expected 'key:value'")
265
				}
266
				headers[h[0]] = h[1]
267
			}
268
269
			var exporter trace.SpanExporter
270
			exporter, err = tracerexporters.ExporterFactory(
271
				cfg.Tracer.Exporter,
272
				cfg.Tracer.Endpoint,
273
				cfg.Tracer.Insecure,
274
				cfg.Tracer.URLPath,
275
				headers,
276
			)
277
			if err != nil {
278
				slog.Error(err.Error())
279
			}
280
281
			shutdown := telemetry.NewTracer(exporter)
282
283
			defer func() {
284
				if err = shutdown(context.Background()); err != nil {
285
					slog.Error(err.Error())
286
				}
287
			}()
288
		}
289
290
		// Garbage collection
291
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
292
			slog.Info("🗑️ starting database garbage collection...")
293
294
			garbageCollector := gc.NewGC(
295
				db.(*PQDatabase.Postgres),
296
				gc.Interval(cfg.Database.GarbageCollection.Interval),
297
				gc.Window(cfg.Database.GarbageCollection.Window),
298
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
299
			)
300
301
			go func() {
302
				err = garbageCollector.Start(ctx)
303
				if err != nil {
304
					slog.Error(err.Error())
305
				}
306
			}()
307
		}
308
309
		// Meter
310
		meter := telemetry.NewNoopMeter()
311
		if cfg.Meter.Enabled {
312
			headers := map[string]string{}
313
			for _, header := range cfg.Meter.Headers {
314
				h := strings.Split(header, ":")
315
				if len(h) != 2 {
316
					return errors.New("invalid header format; expected 'key:value'")
317
				}
318
				headers[h[0]] = h[1]
319
			}
320
321
			var exporter metric.Exporter
322
			exporter, err = meterexporters.ExporterFactory(
323
				cfg.Meter.Exporter,
324
				cfg.Meter.Endpoint,
325
				cfg.Meter.Insecure,
326
				cfg.Meter.URLPath,
327
				headers,
328
			)
329
			if err != nil {
330
				slog.Error(err.Error())
331
			}
332
333
			meter, err = telemetry.NewMeter(exporter)
334
			if err != nil {
335
				slog.Error(err.Error())
336
			}
337
		}
338
339
		// schema cache
340
		var schemaCache pkgcache.Cache
341
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
342
		if err != nil {
343
			slog.Error(err.Error())
344
			return err
345
		}
346
347
		// engines cache cache
348
		var engineKeyCache pkgcache.Cache
349
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
350
		if err != nil {
351
			slog.Error(err.Error())
352
			return err
353
		}
354
355
		watcher := storage.NewNoopWatcher()
356
		if cfg.Service.Watch.Enabled {
357
			watcher = factories.WatcherFactory(db)
358
		}
359
360
		// Initialize the storage with factory methods
361
		dataReader := factories.DataReaderFactory(db)
362
		dataWriter := factories.DataWriterFactory(db)
363
		bundleReader := factories.BundleReaderFactory(db)
364
		bundleWriter := factories.BundleWriterFactory(db)
365
		schemaReader := factories.SchemaReaderFactory(db)
366
		schemaWriter := factories.SchemaWriterFactory(db)
367
		tenantReader := factories.TenantReaderFactory(db)
368
		tenantWriter := factories.TenantWriterFactory(db)
369
370
		// Add caching to the schema reader using a decorator
371
		schemaReader = cacheDecorator.NewSchemaReader(schemaReader, schemaCache)
372
373
		dataReader = sfDecorator.NewDataReader(dataReader)
374
		schemaReader = sfDecorator.NewSchemaReader(schemaReader)
375
376
		// Check if circuit breaker should be enabled for services
377
		if cfg.Service.CircuitBreaker {
378
			var cb *gobreaker.CircuitBreaker
379
			var st gobreaker.Settings
380
			st.Name = "storage"
381
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
382
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
383
				return counts.Requests >= 10 && failureRatio >= 0.6
384
			}
385
386
			cb = gobreaker.NewCircuitBreaker(st)
387
388
			// Add circuit breaker to the relationship reader using decorator
389
			dataReader = cbDecorator.NewDataReader(dataReader, cb)
390
391
			// Add circuit breaker to the bundle reader using decorators
392
			bundleReader = cbDecorator.NewBundleReader(bundleReader, cb)
393
394
			// Add circuit breaker to the schema reader using decorator
395
			schemaReader = cbDecorator.NewSchemaReader(schemaReader, cb)
396
397
			// Add circuit breaker to the tenant reader using decorator
398
			tenantReader = cbDecorator.NewTenantReader(tenantReader, cb)
399
		}
400
401
		// Initialize the engines using the key manager, schema reader, and relationship reader
402
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
403
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
404
405
		// Declare a variable `checker` of type `invoke.Check`.
406
		var checker invoke.Check
407
408
		// Create the checker either with load balancing or caching capabilities.
409
		if cfg.Distributed.Enabled {
410
411
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
412
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
413
			}
414
415
			checker, err = balancer.NewCheckEngineWithBalancer(
416
				context.Background(),
417
				checkEngine,
418
				schemaReader,
419
				&cfg.Distributed,
420
				&cfg.Server.GRPC,
421
				&cfg.Authn,
422
			)
423
			// Handle potential error during checker creation.
424
			if err != nil {
425
				return err
426
			}
427
			checker = cache.NewCheckEngineWithCache(
428
				checker,
429
				schemaReader,
430
				engineKeyCache,
431
				meter,
432
			)
433
		} else {
434
			checker = cache.NewCheckEngineWithCache(
435
				checkEngine,
436
				schemaReader,
437
				engineKeyCache,
438
				meter,
439
			)
440
		}
441
442
		// Create a localChecker which directly checks without considering distributed setup.
443
		// This also includes caching capabilities.
444
		localChecker := cache.NewCheckEngineWithCache(
445
			checkEngine,
446
			schemaReader,
447
			engineKeyCache,
448
			meter,
449
		)
450
451
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
452
		lookupEngine := engines.NewLookupEngine(
453
			checker,
454
			schemaReader,
455
			dataReader,
456
			// Set concurrency limit based on the configuration.
457
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
458
		)
459
460
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
461
		subjectPermissionEngine := engines.NewSubjectPermission(
462
			checker,
463
			schemaReader,
464
			// Set concurrency limit for the subject permission checks.
465
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
466
		)
467
468
		// Create a new invoker that is used to directly call various functions or engines.
469
		// It encompasses the schema, data, checker, and other engines.
470
		invoker := invoke.NewDirectInvoker(
471
			schemaReader,
472
			dataReader,
473
			checker,
474
			expandEngine,
475
			lookupEngine,
476
			subjectPermissionEngine,
477
			meter,
478
		)
479
480
		// Associate the invoker with the checkEngine.
481
		checkEngine.SetInvoker(invoker)
482
483
		// Create a local invoker for local operations.
484
		localInvoker := invoke.NewDirectInvoker(
485
			schemaReader,
486
			dataReader,
487
			localChecker,
488
			expandEngine,
489
			lookupEngine,
490
			subjectPermissionEngine,
491
			meter,
492
		)
493
494
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
495
		container := servers.NewContainer(
496
			invoker,
497
			dataReader,
498
			dataWriter,
499
			bundleReader,
500
			bundleWriter,
501
			schemaReader,
502
			schemaWriter,
503
			tenantReader,
504
			tenantWriter,
505
			watcher,
506
		)
507
508
		// Create an error group with the provided context
509
		var g *errgroup.Group
510
		g, ctx = errgroup.WithContext(ctx)
511
512
		// Add the container.Run function to the error group
513
		g.Go(func() error {
514
			return container.Run(
515
				ctx,
516
				&cfg.Server,
517
				logger,
518
				&cfg.Distributed,
519
				&cfg.Authn,
520
				&cfg.Profiler,
521
				localInvoker,
522
			)
523
		})
524
525
		// Wait for the error group to finish and log any errors
526
		if err = g.Wait(); err != nil {
527
			slog.Error(err.Error())
528
		}
529
530
		return nil
531
	}
532
}
533
534
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
535
func getLogLevel(level string) slog.Level {
536
	switch level {
537
	case "info":
538
		return slog.LevelInfo // Return Info level
539
	case "warn":
540
		return slog.LevelWarn // Return Warning level
541
	case "error":
542
		return slog.LevelError // Return Error level
543
	case "debug":
544
		return slog.LevelDebug // Return Debug level
545
	default:
546
		return slog.LevelInfo // Default to Info level if unrecognized
547
	}
548
}
549