Passed
Push — master ( 9e53c9...4fac29 )
by Tolga
01:30 queued 15s
created

pkg/cmd/serve.go   D

Size/Duplication

Total Lines 568
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 59
eloc 386
dl 0
loc 568
rs 4.08
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
B cmd.NewServeCommand 0 97 2
F cmd.serve 0 400 51
B cmd.getLogLevel 0 12 6
1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"os"
9
	"os/signal"
10
	"strings"
11
	"syscall"
12
	"time"
13
14
	slogmulti "github.com/samber/slog-multi"
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	cacheDecorator "github.com/Permify/permify/internal/storage/decorators/cache"
24
	cbDecorator "github.com/Permify/permify/internal/storage/decorators/circuitBreaker"
25
	sfDecorator "github.com/Permify/permify/internal/storage/decorators/singleflight"
26
	"github.com/Permify/permify/internal/storage/postgres/gc"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
43
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
44
)
45
46
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
47
// The command takes no arguments and runs the serve() function to start the Permify service.
48
// The command has a short description of what it does.
49
func NewServeCommand() *cobra.Command {
50
	command := &cobra.Command{
51
		Use:   "serve",
52
		Short: "serve the Permify server",
53
		RunE:  serve(),
54
		Args:  cobra.NoArgs,
55
	}
56
57
	conf := config.DefaultConfig()
58
	f := command.Flags()
59
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
60
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
61
	f.String("account-id", conf.AccountID, "account id")
62
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
63
	f.String("server-name-override", conf.Server.NameOverride, "server name override")
64
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
65
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
66
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
67
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
68
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
69
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
70
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
71
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
72
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
73
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
74
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
75
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
76
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
77
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
78
	f.Bool("log-enabled", conf.Log.Enabled, "logger exporter enabled")
79
	f.String("log-exporter", conf.Log.Exporter, "can be; otlp. (integrated metric tools)")
80
	f.String("log-endpoint", conf.Log.Endpoint, "export uri for logs")
81
	f.Bool("log-insecure", conf.Log.Insecure, "use https or http for logs")
82
	f.String("log-urlpath", conf.Log.URLPath, "allow to set url path for otlp exporter")
83
	f.StringSlice("log-headers", conf.Log.Headers, "allows setting custom headers for the log exporter in key-value pairs")
84
	f.String("log-protocol", conf.Log.Protocol, "allows setting the communication protocol for the log exporter, with options http or grpc")
85
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
86
	f.String("authn-method", conf.Authn.Method, "server authentication method")
87
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
88
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
89
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
90
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
91
	f.Duration("authn-oidc-backoff-interval", conf.Authn.Oidc.BackoffInterval, "backoff interval for the OpenID Connect configuration")
92
	f.Duration("authn-oidc-backoff-frequency", conf.Authn.Oidc.BackoffFrequency, "backoff frequency for the OpenID Connect configuration")
93
	f.Int("authn-oidc-backoff-max-retries", conf.Authn.Oidc.BackoffMaxRetries, "defines the maximum number of retries for the OpenID Connect configuration")
94
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
95
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
96
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
97
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
98
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
99
	f.String("tracer-urlpath", conf.Tracer.URLPath, "allow to set url path for otlp exporter")
100
	f.StringSlice("tracer-headers", conf.Tracer.Headers, "allows setting custom headers for the tracer exporter in key-value pairs")
101
	f.String("tracer-protocol", conf.Tracer.Protocol, "allows setting the communication protocol for the tracer exporter, with options http or grpc")
102
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
103
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
104
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
105
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
106
	f.String("meter-urlpath", conf.Meter.URLPath, "allow to set url path for otlp exporter")
107
	f.StringSlice("meter-headers", conf.Meter.Headers, "allows setting custom headers for the metric exporter in key-value pairs")
108
	f.Int("meter-interval", conf.Meter.Interval, "allows to set metrics to be pushed in certain time interval")
109
	f.String("meter-protocol", conf.Meter.Protocol, "allows setting the communication protocol for the meter exporter, with options http or grpc")
110
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
111
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
112
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
113
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
114
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
115
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
116
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
117
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
118
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
119
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
120
	f.String("database-writer-uri", conf.Database.Writer.URI, "writer uri of your data source to store relation tuples and schema")
121
	f.String("database-reader-uri", conf.Database.Reader.URI, "reader uri of your data source to store relation tuples and schema")
122
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
123
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "maximum number of parallel connections that can be made to the database at any time")
124
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "maximum number of idle connections that can be made to the database at any time")
125
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
126
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
127
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
128
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
129
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
130
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
131
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
132
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
133
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
134
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
135
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
136
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
137
138
	// SilenceUsage is set to true to suppress usage when an error occurs
139
	command.SilenceUsage = true
140
141
	command.PreRun = func(cmd *cobra.Command, args []string) {
142
		flags.RegisterServeFlags(f)
143
	}
144
145
	return command
146
}
147
148
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
149
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
150
// It then creates a ServiceContainer and runs it with the given configuration.
151
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
152
// It returns an error if there is an issue with any of the components or if any goroutine fails.
153
func serve() func(cmd *cobra.Command, args []string) error {
154
	return func(cmd *cobra.Command, args []string) error {
155
		var cfg *config.Config
156
		var err error
157
		cfgFile := viper.GetString("config.file")
158
		if cfgFile != "" {
159
			cfg, err = config.NewConfigWithFile(cfgFile)
160
			if err != nil {
161
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
162
			}
163
164
			if err = viper.Unmarshal(cfg); err != nil {
165
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
166
			}
167
		} else {
168
			// Load configuration
169
			cfg, err = config.NewConfig()
170
			if err != nil {
171
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
172
			}
173
174
			if err = viper.Unmarshal(cfg); err != nil {
175
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
176
			}
177
		}
178
179
		// Print banner and initialize logger
180
		internal.PrintBanner()
181
182
		// Set up context and signal handling
183
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
184
		defer stop()
185
186
		internal.Identifier = cfg.AccountID
187
188
		var logger *slog.Logger
189
		var handler slog.Handler
190
191
		switch cfg.Log.Output {
192
		case "json":
193
			handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
194
				Level: getLogLevel(cfg.Log.Level),
195
			})
196
		case "text":
197
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
198
				Level: getLogLevel(cfg.Log.Level),
199
			})
200
		default:
201
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
202
				Level: getLogLevel(cfg.Log.Level),
203
			})
204
		}
205
206
		if cfg.Log.Enabled {
207
			headers := map[string]string{}
208
			for _, header := range cfg.Log.Headers {
209
				h := strings.Split(header, ":")
210
				if len(h) != 2 {
211
					return errors.New("invalid header format; expected 'key:value'")
212
				}
213
				headers[h[0]] = h[1]
214
			}
215
216
			customHandler, err := HandlerFactory(
217
				cfg.Log.Exporter,
218
				cfg.Log.Endpoint,
219
				cfg.Log.Insecure,
220
				cfg.Log.URLPath,
221
				headers,
222
				cfg.Log.Protocol,
223
				getLogLevel(cfg.Log.Level),
224
			)
225
226
			if err != nil {
227
				slog.Error("invalid logger exporter", slog.Any("error", err))
228
				logger = slog.New(handler)
229
			} else {
230
				logger = slog.New(
231
					slogmulti.Fanout(
232
						customHandler,
233
						handler,
234
					),
235
				)
236
			}
237
		} else {
238
			logger = slog.New(handler)
239
		}
240
241
		slog.SetDefault(logger)
242
243
		if internal.Identifier == "" {
244
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
245
			slog.Error(message)
246
247
			ticker := time.NewTicker(24 * time.Hour)
248
			defer ticker.Stop()
249
250
			go func() {
251
				for range ticker.C {
252
					slog.Error(message)
253
				}
254
			}()
255
		}
256
257
		slog.Info("🚀 starting permify service...")
258
259
		// Run database migration if enabled
260
		if cfg.Database.AutoMigrate {
261
			err = storage.Migrate(cfg.Database)
262
			if err != nil {
263
				slog.Error("failed to migrate database", slog.Any("error", err))
264
				return err
265
			}
266
		}
267
268
		// Initialize database
269
		db, err := factories.DatabaseFactory(cfg.Database)
270
		if err != nil {
271
			slog.Error("failed to initialize database", slog.Any("error", err))
272
			return err
273
		}
274
		defer func() {
275
			if err = db.Close(); err != nil {
276
				slog.Error("failed to close database", slog.Any("error", err))
277
			}
278
		}()
279
280
		// Tracing
281
		if cfg.Tracer.Enabled {
282
			headers := map[string]string{}
283
			for _, header := range cfg.Tracer.Headers {
284
				h := strings.Split(header, ":")
285
				if len(h) != 2 {
286
					return errors.New("invalid header format; expected 'key:value'")
287
				}
288
				headers[h[0]] = h[1]
289
			}
290
291
			var exporter trace.SpanExporter
292
			exporter, err = tracerexporters.ExporterFactory(
293
				cfg.Tracer.Exporter,
294
				cfg.Tracer.Endpoint,
295
				cfg.Tracer.Insecure,
296
				cfg.Tracer.URLPath,
297
				headers,
298
				cfg.Tracer.Protocol,
299
			)
300
			if err != nil {
301
				slog.Error(err.Error())
302
			}
303
304
			shutdown := telemetry.NewTracer(exporter)
305
306
			defer func() {
307
				if err = shutdown(ctx); err != nil {
308
					slog.Error(err.Error())
309
				}
310
			}()
311
		}
312
313
		// Garbage collection
314
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
315
			slog.Info("🗑️ starting database garbage collection...")
316
317
			garbageCollector := gc.NewGC(
318
				db.(*PQDatabase.Postgres),
319
				gc.Interval(cfg.Database.GarbageCollection.Interval),
320
				gc.Window(cfg.Database.GarbageCollection.Window),
321
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
322
			)
323
324
			go func() {
325
				err = garbageCollector.Start(ctx)
326
				if err != nil {
327
					slog.Error(err.Error())
328
				}
329
			}()
330
		}
331
332
		// Meter
333
		if cfg.Meter.Enabled {
334
			headers := map[string]string{}
335
			for _, header := range cfg.Meter.Headers {
336
				h := strings.Split(header, ":")
337
				if len(h) != 2 {
338
					return errors.New("invalid header format; expected 'key:value'")
339
				}
340
				headers[h[0]] = h[1]
341
			}
342
343
			var exporter metric.Exporter
344
			exporter, err = meterexporters.ExporterFactory(
345
				cfg.Meter.Exporter,
346
				cfg.Meter.Endpoint,
347
				cfg.Meter.Insecure,
348
				cfg.Meter.URLPath,
349
				headers,
350
				cfg.Meter.Protocol,
351
			)
352
353
			if err != nil {
354
				slog.Error(err.Error())
355
			}
356
357
			shutdown := telemetry.NewMeter(exporter, time.Duration(cfg.Meter.Interval)*time.Second)
358
359
			defer func() {
360
				if err = shutdown(ctx); err != nil {
361
					slog.Error(err.Error())
362
				}
363
			}()
364
		}
365
366
		// schema cache
367
		var schemaCache pkgcache.Cache
368
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
369
		if err != nil {
370
			slog.Error(err.Error())
371
			return err
372
		}
373
374
		// engines cache cache
375
		var engineKeyCache pkgcache.Cache
376
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
377
		if err != nil {
378
			slog.Error(err.Error())
379
			return err
380
		}
381
382
		watcher := storage.NewNoopWatcher()
383
		if cfg.Service.Watch.Enabled {
384
			watcher = factories.WatcherFactory(db)
385
		}
386
387
		// Initialize the storage with factory methods
388
		dataReader := factories.DataReaderFactory(db)
389
		dataWriter := factories.DataWriterFactory(db)
390
		bundleReader := factories.BundleReaderFactory(db)
391
		bundleWriter := factories.BundleWriterFactory(db)
392
		schemaReader := factories.SchemaReaderFactory(db)
393
		schemaWriter := factories.SchemaWriterFactory(db)
394
		tenantReader := factories.TenantReaderFactory(db)
395
		tenantWriter := factories.TenantWriterFactory(db)
396
397
		// Add caching to the schema reader using a decorator
398
		schemaReader = cacheDecorator.NewSchemaReader(schemaReader, schemaCache)
399
400
		dataReader = sfDecorator.NewDataReader(dataReader)
401
		schemaReader = sfDecorator.NewSchemaReader(schemaReader)
402
403
		// Check if circuit breaker should be enabled for services
404
		if cfg.Service.CircuitBreaker {
405
			var cb *gobreaker.CircuitBreaker
406
			var st gobreaker.Settings
407
			st.Name = "storage"
408
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
409
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
410
				return counts.Requests >= 10 && failureRatio >= 0.6
411
			}
412
413
			cb = gobreaker.NewCircuitBreaker(st)
414
415
			// Add circuit breaker to the relationship reader using decorator
416
			dataReader = cbDecorator.NewDataReader(dataReader, cb)
417
418
			// Add circuit breaker to the bundle reader using decorators
419
			bundleReader = cbDecorator.NewBundleReader(bundleReader, cb)
420
421
			// Add circuit breaker to the schema reader using decorator
422
			schemaReader = cbDecorator.NewSchemaReader(schemaReader, cb)
423
424
			// Add circuit breaker to the tenant reader using decorator
425
			tenantReader = cbDecorator.NewTenantReader(tenantReader, cb)
426
		}
427
428
		// Initialize the engines using the key manager, schema reader, and relationship reader
429
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
430
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
431
432
		// Declare a variable `checker` of type `invoke.Check`.
433
		var checker invoke.Check
434
435
		// Create the checker either with load balancing or caching capabilities.
436
		if cfg.Distributed.Enabled {
437
438
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
439
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
440
			}
441
442
			checker, err = balancer.NewCheckEngineWithBalancer(
443
				ctx,
444
				checkEngine,
445
				schemaReader,
446
				&cfg.Distributed,
447
				&cfg.Server.GRPC,
448
				&cfg.Authn,
449
			)
450
			// Handle potential error during checker creation.
451
			if err != nil {
452
				return err
453
			}
454
			checker = cache.NewCheckEngineWithCache(
455
				checker,
456
				schemaReader,
457
				engineKeyCache,
458
			)
459
		} else {
460
			checker = cache.NewCheckEngineWithCache(
461
				checkEngine,
462
				schemaReader,
463
				engineKeyCache,
464
			)
465
		}
466
467
		// Create a localChecker which directly checks without considering distributed setup.
468
		// This also includes caching capabilities.
469
		localChecker := cache.NewCheckEngineWithCache(
470
			checkEngine,
471
			schemaReader,
472
			engineKeyCache,
473
		)
474
475
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
476
		lookupEngine := engines.NewLookupEngine(
477
			checker,
478
			schemaReader,
479
			dataReader,
480
			// Set concurrency limit based on the configuration.
481
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
482
		)
483
484
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
485
		subjectPermissionEngine := engines.NewSubjectPermission(
486
			checker,
487
			schemaReader,
488
			// Set concurrency limit for the subject permission checks.
489
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
490
		)
491
492
		// Create a new invoker that is used to directly call various functions or engines.
493
		// It encompasses the schema, data, checker, and other engines.
494
		invoker := invoke.NewDirectInvoker(
495
			schemaReader,
496
			dataReader,
497
			checker,
498
			expandEngine,
499
			lookupEngine,
500
			subjectPermissionEngine,
501
		)
502
503
		// Associate the invoker with the checkEngine.
504
		checkEngine.SetInvoker(invoker)
505
506
		// Create a local invoker for local operations.
507
		localInvoker := invoke.NewDirectInvoker(
508
			schemaReader,
509
			dataReader,
510
			localChecker,
511
			expandEngine,
512
			lookupEngine,
513
			subjectPermissionEngine,
514
		)
515
516
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
517
		container := servers.NewContainer(
518
			invoker,
519
			dataReader,
520
			dataWriter,
521
			bundleReader,
522
			bundleWriter,
523
			schemaReader,
524
			schemaWriter,
525
			tenantReader,
526
			tenantWriter,
527
			watcher,
528
		)
529
530
		// Create an error group with the provided context
531
		var g *errgroup.Group
532
		g, ctx = errgroup.WithContext(ctx)
533
534
		// Add the container.Run function to the error group
535
		g.Go(func() error {
536
			return container.Run(
537
				ctx,
538
				&cfg.Server,
539
				logger,
540
				&cfg.Distributed,
541
				&cfg.Authn,
542
				&cfg.Profiler,
543
				localInvoker,
544
			)
545
		})
546
547
		// Wait for the error group to finish and log any errors
548
		if err = g.Wait(); err != nil {
549
			slog.Error(err.Error())
550
		}
551
552
		return nil
553
	}
554
}
555
556
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
557
func getLogLevel(level string) slog.Level {
558
	switch level {
559
	case "info":
560
		return slog.LevelInfo // Return Info level
561
	case "warn":
562
		return slog.LevelWarn // Return Warning level
563
	case "error":
564
		return slog.LevelError // Return Error level
565
	case "debug":
566
		return slog.LevelDebug // Return Debug level
567
	default:
568
		return slog.LevelInfo // Default to Info level if unrecognized
569
	}
570
}
571