Issues (35)

pkg/cmd/serve.go (4 issues)

Severity
1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"os"
9
	"os/signal"
10
	"strings"
11
	"syscall"
12
	"time"
13
14
	slogmulti "github.com/samber/slog-multi"
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	cacheDecorator "github.com/Permify/permify/internal/storage/decorators/cache"
24
	cbDecorator "github.com/Permify/permify/internal/storage/decorators/circuitBreaker"
25
	sfDecorator "github.com/Permify/permify/internal/storage/decorators/singleflight"
26
	"github.com/Permify/permify/internal/storage/postgres/gc"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
43
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
44
)
45
46
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
47
// The command takes no arguments and runs the serve() function to start the Permify service.
48
// The command has a short description of what it does.
49
func NewServeCommand() *cobra.Command {
50
	command := &cobra.Command{
51
		Use:   "serve",
52
		Short: "serve the Permify server",
53
		RunE:  serve(),
54
		Args:  cobra.NoArgs,
55
	}
56
57
	conf := config.DefaultConfig()
58
	f := command.Flags()
59
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
60
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
61
	f.String("account-id", conf.AccountID, "account id")
62
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
63
	f.String("server-name-override", conf.Server.NameOverride, "server name override")
64
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
65
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
66
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
67
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
68
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
69
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
70
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
71
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
72
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
73
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
74
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
75
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
76
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
77
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
78
	f.Bool("log-enabled", conf.Log.Enabled, "logger exporter enabled")
79
	f.String("log-exporter", conf.Log.Exporter, "can be; otlp. (integrated metric tools)")
80
	f.String("log-endpoint", conf.Log.Endpoint, "export uri for logs")
81
	f.Bool("log-insecure", conf.Log.Insecure, "use https or http for logs")
82
	f.String("log-urlpath", conf.Log.Urlpath, "allow to set url path for otlp exporter")
83
	f.StringSlice("log-headers", conf.Log.Headers, "allows setting custom headers for the log exporter in key-value pairs")
84
	f.String("log-protocol", conf.Log.Protocol, "allows setting the communication protocol for the log exporter, with options http or grpc")
85
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
86
	f.String("authn-method", conf.Authn.Method, "server authentication method")
87
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
88
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
89
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
90
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
91
	f.Duration("authn-oidc-backoff-interval", conf.Authn.Oidc.BackoffInterval, "backoff interval for the OpenID Connect configuration")
92
	f.Duration("authn-oidc-backoff-frequency", conf.Authn.Oidc.BackoffFrequency, "backoff frequency for the OpenID Connect configuration")
93
	f.Int("authn-oidc-backoff-max-retries", conf.Authn.Oidc.BackoffMaxRetries, "defines the maximum number of retries for the OpenID Connect configuration")
94
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
95
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
96
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
97
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
98
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
99
	f.String("tracer-urlpath", conf.Tracer.Urlpath, "allow to set url path for otlp exporter")
100
	f.StringSlice("tracer-headers", conf.Tracer.Headers, "allows setting custom headers for the tracer exporter in key-value pairs")
101
	f.String("tracer-protocol", conf.Tracer.Protocol, "allows setting the communication protocol for the tracer exporter, with options http or grpc")
102
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
103
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
104
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
105
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
106
	f.String("meter-urlpath", conf.Meter.Urlpath, "allow to set url path for otlp exporter")
107
	f.StringSlice("meter-headers", conf.Meter.Headers, "allows setting custom headers for the metric exporter in key-value pairs")
108
	f.Int("meter-interval", conf.Meter.Interval, "allows to set metrics to be pushed in certain time interval")
109
	f.String("meter-protocol", conf.Meter.Protocol, "allows setting the communication protocol for the meter exporter, with options http or grpc")
110
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
111
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
112
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
113
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
114
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
115
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
116
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
117
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
118
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
119
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
120
	f.String("database-writer-uri", conf.Database.Writer.URI, "writer uri of your data source to store relation tuples and schema")
121
	f.String("database-reader-uri", conf.Database.Reader.URI, "reader uri of your data source to store relation tuples and schema")
122
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
123
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "maximum number of parallel connections that can be made to the database at any time")
124
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "maximum number of idle connections that can be made to the database at any time")
125
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
126
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
127
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
128
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
129
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
130
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
131
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
132
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
133
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
134
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
135
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
136
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
137
	f.Int("distributed-partition-count", conf.Distributed.PartitionCount, "number of partitions for distributed hashing")
138
	f.Int("distributed-replication-factor", conf.Distributed.ReplicationFactor, "number of replicas for distributed hashing")
139
	f.Float64("distributed-load", conf.Distributed.Load, "load factor for distributed hashing")
140
	f.Int("distributed-picker-width", conf.Distributed.PickerWidth, "picker width for distributed hashing")
141
142
	// SilenceUsage is set to true to suppress usage when an error occurs
143
	command.SilenceUsage = true
144
145
	command.PreRun = func(cmd *cobra.Command, args []string) {
146
		flags.RegisterServeFlags(f)
147
	}
148
149
	return command
150
}
151
152
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
153
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
154
// It then creates a ServiceContainer and runs it with the given configuration.
155
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
156
// It returns an error if there is an issue with any of the components or if any goroutine fails.
157
func serve() func(cmd *cobra.Command, args []string) error {
158
	return func(cmd *cobra.Command, args []string) error {
159
		var cfg *config.Config
160
		var err error
161
		cfgFile := viper.GetString("config.file")
162
		if cfgFile != "" {
163
			cfg, err = config.NewConfigWithFile(cfgFile)
164
			if err != nil {
165
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
166
			}
167
168
			if err = viper.Unmarshal(cfg); err != nil {
169
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
170
			}
171
		} else {
172
			// Load configuration
173
			cfg, err = config.NewConfig()
174
			if err != nil {
175
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
176
			}
177
178
			if err = viper.Unmarshal(cfg); err != nil {
179
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
180
			}
181
		}
182
183
		// Print banner and initialize logger
184
		internal.PrintBanner()
185
186
		// Set up context and signal handling
187
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
188
		defer stop()
189
190
		internal.Identifier = cfg.AccountID
191
192
		var logger *slog.Logger
193
		var handler slog.Handler
194
195
		switch cfg.Log.Output {
196
		case "json":
197
			handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
198
				Level: getLogLevel(cfg.Log.Level),
199
			})
200
		case "text":
201
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
202
				Level: getLogLevel(cfg.Log.Level),
203
			})
204
		default:
205
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
206
				Level: getLogLevel(cfg.Log.Level),
207
			})
208
		}
209
210
		if cfg.Log.Enabled {
211
			headers := map[string]string{}
212
			for _, header := range cfg.Log.Headers {
213
				h := strings.Split(header, ":")
214
				if len(h) != 2 {
215
					return errors.New("invalid header format; expected 'key:value'")
216
				}
217
				headers[h[0]] = h[1]
218
			}
219
220
			customHandler, err := telemetry.HandlerFactory(
221
				cfg.Log.Exporter,
222
				cfg.Log.Endpoint,
223
				cfg.Log.Insecure,
224
				cfg.Log.Urlpath,
225
				headers,
226
				cfg.Log.Protocol,
227
				getLogLevel(cfg.Log.Level),
228
			)
229
230
			if err != nil {
231
				slog.Error("invalid logger exporter", slog.Any("error", err))
232
				logger = slog.New(handler)
233
			} else {
234
				logger = slog.New(
235
					slogmulti.Fanout(
236
						customHandler,
237
						handler,
238
					),
239
				)
240
			}
241
		} else {
242
			logger = slog.New(handler)
243
		}
244
245
		slog.SetDefault(logger)
246
247
		if internal.Identifier == "" {
248
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
249
			slog.Error(message)
250
251
			ticker := time.NewTicker(24 * time.Hour)
252
			defer ticker.Stop()
253
254
			go func() {
255
				for range ticker.C {
256
					slog.Error(message)
257
				}
258
			}()
259
		}
260
261
		slog.Info("🚀 starting permify service...")
262
263
		// Run database migration if enabled
264
		if cfg.Database.AutoMigrate {
265
			err = storage.Migrate(cfg.Database)
266
			if err != nil {
267
				slog.Error("failed to migrate database", slog.Any("error", err))
268
				return err
269
			}
270
		}
271
272
		// Initialize database
273
		db, err := factories.DatabaseFactory(cfg.Database)
274
		if err != nil {
275
			slog.Error("failed to initialize database", slog.Any("error", err))
276
			return err
277
		}
278
		defer func() {
279
			if err = db.Close(); err != nil {
280
				slog.Error("failed to close database", slog.Any("error", err))
281
			}
282
		}()
283
284
		// Tracing
285
		if cfg.Tracer.Enabled {
286
			headers := map[string]string{}
287
			for _, header := range cfg.Tracer.Headers {
288
				h := strings.Split(header, ":")
289
				if len(h) != 2 {
290
					return errors.New("invalid header format; expected 'key:value'")
291
				}
292
				headers[h[0]] = h[1]
293
			}
294
295
			var exporter trace.SpanExporter
296
			exporter, err = tracerexporters.ExporterFactory(
297
				cfg.Tracer.Exporter,
298
				cfg.Tracer.Endpoint,
299
				cfg.Tracer.Insecure,
300
				cfg.Tracer.Urlpath,
301
				headers,
302
				cfg.Tracer.Protocol,
303
			)
304
			if err != nil {
305
				slog.Error(err.Error())
306
			}
307
308
			shutdown := telemetry.NewTracer(exporter)
309
310
			defer func() {
311
				if err = shutdown(ctx); err != nil {
312
					slog.Error(err.Error())
313
				}
314
			}()
315
		}
316
317
		// Garbage collection
318
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
319
			slog.Info("🗑️ starting database garbage collection...")
320
321
			garbageCollector := gc.NewGC(
322
				db.(*PQDatabase.Postgres),
323
				gc.Interval(cfg.Database.GarbageCollection.Interval),
324
				gc.Window(cfg.Database.GarbageCollection.Window),
325
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
326
			)
327
328
			go func() {
329
				err = garbageCollector.Start(ctx)
330
				if err != nil {
331
					slog.Error(err.Error())
332
				}
333
			}()
334
		}
335
336
		// Meter
337
		if cfg.Meter.Enabled {
338
			headers := map[string]string{}
339
			for _, header := range cfg.Meter.Headers {
340
				h := strings.Split(header, ":")
341
				if len(h) != 2 {
342
					return errors.New("invalid header format; expected 'key:value'")
343
				}
344
				headers[h[0]] = h[1]
345
			}
346
347
			var exporter metric.Exporter
348
			exporter, err = meterexporters.ExporterFactory(
349
				cfg.Meter.Exporter,
350
				cfg.Meter.Endpoint,
351
				cfg.Meter.Insecure,
352
				cfg.Meter.Urlpath,
353
				headers,
354
				cfg.Meter.Protocol,
355
			)
356
			if err != nil {
357
				slog.Error(err.Error())
358
			}
359
360
			shutdown := telemetry.NewMeter(exporter, time.Duration(cfg.Meter.Interval)*time.Second)
361
362
			defer func() {
363
				if err = shutdown(ctx); err != nil {
364
					slog.Error(err.Error())
365
				}
366
			}()
367
		}
368
369
		// schema cache
370
		var schemaCache pkgcache.Cache
371
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
372
		if err != nil {
373
			slog.Error(err.Error())
374
			return err
375
		}
376
377
		// engines cache cache
378
		var engineKeyCache pkgcache.Cache
379
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
380
		if err != nil {
381
			slog.Error(err.Error())
382
			return err
383
		}
384
385
		watcher := storage.NewNoopWatcher()
386
		if cfg.Service.Watch.Enabled {
387
			watcher = factories.WatcherFactory(db)
388
		}
389
390
		// Initialize the storage with factory methods
391
		dataReader := factories.DataReaderFactory(db)
392
		dataWriter := factories.DataWriterFactory(db)
393
		bundleReader := factories.BundleReaderFactory(db)
394
		bundleWriter := factories.BundleWriterFactory(db)
395
		schemaReader := factories.SchemaReaderFactory(db)
396
		schemaWriter := factories.SchemaWriterFactory(db)
397
		tenantReader := factories.TenantReaderFactory(db)
398
		tenantWriter := factories.TenantWriterFactory(db)
399
400
		// Add caching to the schema reader using a decorator
401
		schemaReader = cacheDecorator.NewSchemaReader(schemaReader, schemaCache)
402
403
		dataReader = sfDecorator.NewDataReader(dataReader)
404
		schemaReader = sfDecorator.NewSchemaReader(schemaReader)
405
406
		// Check if circuit breaker should be enabled for services
407
		if cfg.Service.CircuitBreaker {
408
			var cb *gobreaker.CircuitBreaker
409
			var st gobreaker.Settings
410
			st.Name = "storage"
411
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
412
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
413
				return counts.Requests >= 10 && failureRatio >= 0.6
414
			}
415
416
			cb = gobreaker.NewCircuitBreaker(st)
417
418
			// Add circuit breaker to the relationship reader using decorator
419
			dataReader = cbDecorator.NewDataReader(dataReader, cb)
420
421
			// Add circuit breaker to the bundle reader using decorators
422
			bundleReader = cbDecorator.NewBundleReader(bundleReader, cb)
423
424
			// Add circuit breaker to the schema reader using decorator
425
			schemaReader = cbDecorator.NewSchemaReader(schemaReader, cb)
426
427
			// Add circuit breaker to the tenant reader using decorator
428
			tenantReader = cbDecorator.NewTenantReader(tenantReader, cb)
429
		}
430
431
		// Initialize the engines using the key manager, schema reader, and relationship reader
432
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
433
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
434
435
		// Declare a variable `checker` of type `invoke.Check`.
436
		var checker invoke.Check
437
438
		checker = cache.NewCheckEngineWithCache(
439
			checkEngine,
440
			schemaReader,
441
			engineKeyCache,
442
		)
443
444
		// Create the checker either with load balancing or caching capabilities.
445
		if cfg.Distributed.Enabled {
446
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
447
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
448
			}
449
450
			checker, err = balancer.NewCheckEngineWithBalancer(
451
				ctx,
452
				checker,
453
				schemaReader,
454
				cfg.NameOverride,
455
				&cfg.Distributed,
456
				&cfg.Server.GRPC,
457
				&cfg.Authn,
458
			)
459
			// Handle potential error during checker creation.
460
			if err != nil {
461
				return err
462
			}
463
		}
464
465
		// Create a localChecker which directly checks without considering distributed setup.
466
		// This also includes caching capabilities.
467
		localChecker := cache.NewCheckEngineWithCache(
468
			checkEngine,
469
			schemaReader,
470
			engineKeyCache,
471
		)
472
473
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
474
		lookupEngine := engines.NewLookupEngine(
475
			checker,
476
			schemaReader,
477
			dataReader,
478
			// Set concurrency limit based on the configuration.
479
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
480
		)
481
482
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
483
		subjectPermissionEngine := engines.NewSubjectPermission(
484
			checker,
485
			schemaReader,
486
			// Set concurrency limit for the subject permission checks.
487
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
488
		)
489
490
		// Create a new invoker that is used to directly call various functions or engines.
491
		// It encompasses the schema, data, checker, and other engines.
492
		invoker := invoke.NewDirectInvoker(
493
			schemaReader,
494
			dataReader,
495
			checker,
496
			expandEngine,
497
			lookupEngine,
498
			subjectPermissionEngine,
499
		)
500
501
		// Associate the invoker with the checkEngine.
502
		checkEngine.SetInvoker(invoker)
503
504
		// Create a local invoker for local operations.
505
		localInvoker := invoke.NewDirectInvoker(
506
			schemaReader,
507
			dataReader,
508
			localChecker,
509
			expandEngine,
510
			lookupEngine,
511
			subjectPermissionEngine,
512
		)
513
514
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
515
		container := servers.NewContainer(
516
			invoker,
517
			dataReader,
518
			dataWriter,
519
			bundleReader,
520
			bundleWriter,
521
			schemaReader,
522
			schemaWriter,
523
			tenantReader,
524
			tenantWriter,
525
			watcher,
526
		)
527
528
		// Create an error group with the provided context
529
		var g *errgroup.Group
530
		g, ctx = errgroup.WithContext(ctx)
531
532
		// Add the container.Run function to the error group
533
		g.Go(func() error {
534
			return container.Run(
535
				ctx,
536
				&cfg.Server,
537
				logger,
538
				&cfg.Distributed,
539
				&cfg.Authn,
540
				&cfg.Profiler,
541
				localInvoker,
542
			)
543
		})
544
545
		// Wait for the error group to finish and log any errors
546
		if err = g.Wait(); err != nil {
547
			slog.Error(err.Error())
548
		}
549
550
		return nil
551
	}
552
}
553
554
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
555
func getLogLevel(level string) slog.Level {
556
	switch level {
557
	case "info":
558
		return slog.LevelInfo // Return Info level
559
	case "warn":
560
		return slog.LevelWarn // Return Warning level
561
	case "error":
562
		return slog.LevelError // Return Error level
563
	case "debug":
564
		return slog.LevelDebug // Return Debug level
565
	default:
566
		return slog.LevelInfo // Default to Info level if unrecognized
567
	}
568
}
569