Passed
Push — master ( 5fb140...f50c84 )
by Tolga
01:04 queued 13s
created

cmd.NewServeCommand   B

Complexity

Conditions 2

Size

Total Lines 93
Code Lines 86

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 86
nop 0
dl 0
loc 93
rs 7.4581
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"os"
9
	"os/signal"
10
	"strings"
11
	"syscall"
12
	"time"
13
14
	"github.com/agoda-com/opentelemetry-go/otelslog"
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	cacheDecorator "github.com/Permify/permify/internal/storage/decorators/cache"
24
	cbDecorator "github.com/Permify/permify/internal/storage/decorators/circuitBreaker"
25
	sfDecorator "github.com/Permify/permify/internal/storage/decorators/singleflight"
26
	"github.com/Permify/permify/internal/storage/postgres/gc"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/logexporters"
43
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
44
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
45
)
46
47
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
48
// The command takes no arguments and runs the serve() function to start the Permify service.
49
// The command has a short description of what it does.
50
func NewServeCommand() *cobra.Command {
51
	command := &cobra.Command{
52
		Use:   "serve",
53
		Short: "serve the Permify server",
54
		RunE:  serve(),
55
		Args:  cobra.NoArgs,
56
	}
57
58
	conf := config.DefaultConfig()
59
	f := command.Flags()
60
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
61
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
62
	f.String("account-id", conf.AccountID, "account id")
63
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
64
	f.String("server-name-override", conf.Server.NameOverride, "server name override")
65
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
66
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
67
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
68
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
69
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
70
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
71
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
72
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
73
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
74
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
75
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
76
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
77
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
78
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
79
	f.Bool("log-enabled", conf.Log.Enabled, "logger exporter enabled")
80
	f.String("log-exporter", conf.Log.Exporter, "can be; otlp. (integrated metric tools)")
81
	f.String("log-endpoint", conf.Log.Endpoint, "export uri for logs")
82
	f.Bool("log-insecure", conf.Log.Insecure, "use https or http for logs")
83
	f.String("log-urlpath", conf.Log.URLPath, "allow to set url path for otlp exporter")
84
	f.StringSlice("log-headers", conf.Log.Headers, "allows setting custom headers for the log exporter in key-value pairs")
85
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
86
	f.String("authn-method", conf.Authn.Method, "server authentication method")
87
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
88
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
89
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
90
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
91
	f.Duration("authn-oidc-backoff-interval", conf.Authn.Oidc.BackoffInterval, "backoff interval for the OpenID Connect configuration")
92
	f.Duration("authn-oidc-backoff-frequency", conf.Authn.Oidc.BackoffFrequency, "backoff frequency for the OpenID Connect configuration")
93
	f.Int("authn-oidc-backoff-max-retries", conf.Authn.Oidc.BackoffMaxRetries, "defines the maximum number of retries for the OpenID Connect configuration")
94
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
95
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
96
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
97
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
98
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
99
	f.String("tracer-urlpath", conf.Tracer.URLPath, "allow to set url path for otlp exporter")
100
	f.StringSlice("tracer-headers", conf.Tracer.Headers, "allows setting custom headers for the tracer exporter in key-value pairs")
101
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
102
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
103
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
104
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
105
	f.String("meter-urlpath", conf.Meter.URLPath, "allow to set url path for otlp exporter")
106
	f.StringSlice("meter-headers", conf.Meter.Headers, "allows setting custom headers for the metric exporter in key-value pairs")
107
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
108
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
109
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
110
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
111
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
112
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
113
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
114
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
115
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
116
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
117
	f.String("database-writer-uri", conf.Database.Writer.URI, "writer uri of your data source to store relation tuples and schema")
118
	f.String("database-reader-uri", conf.Database.Reader.URI, "reader uri of your data source to store relation tuples and schema")
119
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
120
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "maximum number of parallel connections that can be made to the database at any time")
121
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "maximum number of idle connections that can be made to the database at any time")
122
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
123
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
124
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
125
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
126
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
127
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
128
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
129
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
130
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
131
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
132
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
133
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
134
135
	// SilenceUsage is set to true to suppress usage when an error occurs
136
	command.SilenceUsage = true
137
138
	command.PreRun = func(cmd *cobra.Command, args []string) {
139
		flags.RegisterServeFlags(f)
140
	}
141
142
	return command
143
}
144
145
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
146
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
147
// It then creates a ServiceContainer and runs it with the given configuration.
148
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
149
// It returns an error if there is an issue with any of the components or if any goroutine fails.
150
func serve() func(cmd *cobra.Command, args []string) error {
151
	return func(cmd *cobra.Command, args []string) error {
152
		var cfg *config.Config
153
		var err error
154
		cfgFile := viper.GetString("config.file")
155
		if cfgFile != "" {
156
			cfg, err = config.NewConfigWithFile(cfgFile)
157
			if err != nil {
158
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
159
			}
160
161
			if err = viper.Unmarshal(cfg); err != nil {
162
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
163
			}
164
		} else {
165
			// Load configuration
166
			cfg, err = config.NewConfig()
167
			if err != nil {
168
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
169
			}
170
171
			if err = viper.Unmarshal(cfg); err != nil {
172
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
173
			}
174
		}
175
176
		// Print banner and initialize logger
177
		internal.PrintBanner()
178
179
		var handler slog.Handler
180
181
		switch cfg.Log.Output {
182
		case "json":
183
			handler = telemetry.OtelHandler{
184
				Next: slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
185
					Level: getLogLevel(cfg.Log.Level),
186
				}),
187
			}
188
		case "text":
189
			handler = telemetry.OtelHandler{
190
				Next: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
191
					Level: getLogLevel(cfg.Log.Level),
192
				}),
193
			}
194
		default:
195
			handler = telemetry.OtelHandler{
196
				Next: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
197
					Level: getLogLevel(cfg.Log.Level),
198
				}),
199
			}
200
		}
201
		logger := slog.New(handler)
202
		slog.SetDefault(logger)
203
204
		internal.Identifier = cfg.AccountID
205
		if internal.Identifier == "" {
206
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
207
			slog.Error(message)
208
209
			ticker := time.NewTicker(24 * time.Hour)
210
			defer ticker.Stop()
211
212
			go func() {
213
				for range ticker.C {
214
					slog.Error(message)
215
				}
216
			}()
217
		}
218
219
		// Set up context and signal handling
220
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
221
		defer stop()
222
223
		if cfg.Log.Enabled {
224
			headers := map[string]string{}
225
			for _, header := range cfg.Log.Headers {
226
				h := strings.Split(header, ":")
227
				if len(h) != 2 {
228
					return errors.New("invalid header format; expected 'key:value'")
229
				}
230
				headers[h[0]] = h[1]
231
			}
232
233
			exporter, _ := logexporters.ExporterFactory(
234
				cfg.Log.Exporter,
235
				cfg.Log.Endpoint,
236
				cfg.Log.Insecure,
237
				cfg.Log.URLPath,
238
				headers,
239
			)
240
			lp := telemetry.NewLog(exporter)
241
242
			logger := slog.New(otelslog.NewOtelHandler(lp, &otelslog.HandlerOptions{
243
				Level: getLogLevel(cfg.Log.Level),
244
			}))
245
246
			slog.SetDefault(logger)
247
248
			defer func() {
249
				if err = lp.Shutdown(ctx); err != nil {
250
					slog.Error(err.Error())
251
				}
252
			}()
253
		}
254
255
		slog.Info("🚀 starting permify service...")
256
257
		// Run database migration if enabled
258
		if cfg.Database.AutoMigrate {
259
			err = storage.Migrate(cfg.Database)
260
			if err != nil {
261
				slog.Error("failed to migrate database", slog.Any("error", err))
262
			}
263
		}
264
265
		// Initialize database
266
		db, err := factories.DatabaseFactory(cfg.Database)
267
		if err != nil {
268
			slog.Error("failed to initialize database", slog.Any("error", err))
269
			return err
270
		}
271
		defer func() {
272
			if err = db.Close(); err != nil {
273
				slog.Error("failed to close database", slog.Any("error", err))
274
			}
275
		}()
276
277
		// Tracing
278
		if cfg.Tracer.Enabled {
279
			headers := map[string]string{}
280
			for _, header := range cfg.Tracer.Headers {
281
				h := strings.Split(header, ":")
282
				if len(h) != 2 {
283
					return errors.New("invalid header format; expected 'key:value'")
284
				}
285
				headers[h[0]] = h[1]
286
			}
287
288
			var exporter trace.SpanExporter
289
			exporter, err = tracerexporters.ExporterFactory(
290
				cfg.Tracer.Exporter,
291
				cfg.Tracer.Endpoint,
292
				cfg.Tracer.Insecure,
293
				cfg.Tracer.URLPath,
294
				headers,
295
			)
296
			if err != nil {
297
				slog.Error(err.Error())
298
			}
299
300
			shutdown := telemetry.NewTracer(exporter)
301
302
			defer func() {
303
				if err = shutdown(ctx); err != nil {
304
					slog.Error(err.Error())
305
				}
306
			}()
307
		}
308
309
		// Garbage collection
310
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
311
			slog.Info("🗑️ starting database garbage collection...")
312
313
			garbageCollector := gc.NewGC(
314
				db.(*PQDatabase.Postgres),
315
				gc.Interval(cfg.Database.GarbageCollection.Interval),
316
				gc.Window(cfg.Database.GarbageCollection.Window),
317
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
318
			)
319
320
			go func() {
321
				err = garbageCollector.Start(ctx)
322
				if err != nil {
323
					slog.Error(err.Error())
324
				}
325
			}()
326
		}
327
328
		// Meter
329
		if cfg.Meter.Enabled {
330
			headers := map[string]string{}
331
			for _, header := range cfg.Meter.Headers {
332
				h := strings.Split(header, ":")
333
				if len(h) != 2 {
334
					return errors.New("invalid header format; expected 'key:value'")
335
				}
336
				headers[h[0]] = h[1]
337
			}
338
339
			var exporter metric.Exporter
340
			exporter, err = meterexporters.ExporterFactory(
341
				cfg.Meter.Exporter,
342
				cfg.Meter.Endpoint,
343
				cfg.Meter.Insecure,
344
				cfg.Meter.URLPath,
345
				headers,
346
			)
347
348
			if err != nil {
349
				slog.Error(err.Error())
350
			}
351
352
			shutdown := telemetry.NewMeter(exporter, time.Duration(cfg.Meter.Interval)*time.Second)
353
354
			defer func() {
355
				if err = shutdown(ctx); err != nil {
356
					slog.Error(err.Error())
357
				}
358
			}()
359
		}
360
361
		// schema cache
362
		var schemaCache pkgcache.Cache
363
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
364
		if err != nil {
365
			slog.Error(err.Error())
366
			return err
367
		}
368
369
		// engines cache cache
370
		var engineKeyCache pkgcache.Cache
371
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
372
		if err != nil {
373
			slog.Error(err.Error())
374
			return err
375
		}
376
377
		watcher := storage.NewNoopWatcher()
378
		if cfg.Service.Watch.Enabled {
379
			watcher = factories.WatcherFactory(db)
380
		}
381
382
		// Initialize the storage with factory methods
383
		dataReader := factories.DataReaderFactory(db)
384
		dataWriter := factories.DataWriterFactory(db)
385
		bundleReader := factories.BundleReaderFactory(db)
386
		bundleWriter := factories.BundleWriterFactory(db)
387
		schemaReader := factories.SchemaReaderFactory(db)
388
		schemaWriter := factories.SchemaWriterFactory(db)
389
		tenantReader := factories.TenantReaderFactory(db)
390
		tenantWriter := factories.TenantWriterFactory(db)
391
392
		// Add caching to the schema reader using a decorator
393
		schemaReader = cacheDecorator.NewSchemaReader(schemaReader, schemaCache)
394
395
		dataReader = sfDecorator.NewDataReader(dataReader)
396
		schemaReader = sfDecorator.NewSchemaReader(schemaReader)
397
398
		// Check if circuit breaker should be enabled for services
399
		if cfg.Service.CircuitBreaker {
400
			var cb *gobreaker.CircuitBreaker
401
			var st gobreaker.Settings
402
			st.Name = "storage"
403
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
404
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
405
				return counts.Requests >= 10 && failureRatio >= 0.6
406
			}
407
408
			cb = gobreaker.NewCircuitBreaker(st)
409
410
			// Add circuit breaker to the relationship reader using decorator
411
			dataReader = cbDecorator.NewDataReader(dataReader, cb)
412
413
			// Add circuit breaker to the bundle reader using decorators
414
			bundleReader = cbDecorator.NewBundleReader(bundleReader, cb)
415
416
			// Add circuit breaker to the schema reader using decorator
417
			schemaReader = cbDecorator.NewSchemaReader(schemaReader, cb)
418
419
			// Add circuit breaker to the tenant reader using decorator
420
			tenantReader = cbDecorator.NewTenantReader(tenantReader, cb)
421
		}
422
423
		// Initialize the engines using the key manager, schema reader, and relationship reader
424
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
425
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
426
427
		// Declare a variable `checker` of type `invoke.Check`.
428
		var checker invoke.Check
429
430
		// Create the checker either with load balancing or caching capabilities.
431
		if cfg.Distributed.Enabled {
432
433
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
434
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
435
			}
436
437
			checker, err = balancer.NewCheckEngineWithBalancer(
438
				ctx,
439
				checkEngine,
440
				schemaReader,
441
				&cfg.Distributed,
442
				&cfg.Server.GRPC,
443
				&cfg.Authn,
444
			)
445
			// Handle potential error during checker creation.
446
			if err != nil {
447
				return err
448
			}
449
			checker = cache.NewCheckEngineWithCache(
450
				checker,
451
				schemaReader,
452
				engineKeyCache,
453
			)
454
		} else {
455
			checker = cache.NewCheckEngineWithCache(
456
				checkEngine,
457
				schemaReader,
458
				engineKeyCache,
459
			)
460
		}
461
462
		// Create a localChecker which directly checks without considering distributed setup.
463
		// This also includes caching capabilities.
464
		localChecker := cache.NewCheckEngineWithCache(
465
			checkEngine,
466
			schemaReader,
467
			engineKeyCache,
468
		)
469
470
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
471
		lookupEngine := engines.NewLookupEngine(
472
			checker,
473
			schemaReader,
474
			dataReader,
475
			// Set concurrency limit based on the configuration.
476
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
477
		)
478
479
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
480
		subjectPermissionEngine := engines.NewSubjectPermission(
481
			checker,
482
			schemaReader,
483
			// Set concurrency limit for the subject permission checks.
484
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
485
		)
486
487
		// Create a new invoker that is used to directly call various functions or engines.
488
		// It encompasses the schema, data, checker, and other engines.
489
		invoker := invoke.NewDirectInvoker(
490
			schemaReader,
491
			dataReader,
492
			checker,
493
			expandEngine,
494
			lookupEngine,
495
			subjectPermissionEngine,
496
		)
497
498
		// Associate the invoker with the checkEngine.
499
		checkEngine.SetInvoker(invoker)
500
501
		// Create a local invoker for local operations.
502
		localInvoker := invoke.NewDirectInvoker(
503
			schemaReader,
504
			dataReader,
505
			localChecker,
506
			expandEngine,
507
			lookupEngine,
508
			subjectPermissionEngine,
509
		)
510
511
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
512
		container := servers.NewContainer(
513
			invoker,
514
			dataReader,
515
			dataWriter,
516
			bundleReader,
517
			bundleWriter,
518
			schemaReader,
519
			schemaWriter,
520
			tenantReader,
521
			tenantWriter,
522
			watcher,
523
		)
524
525
		// Create an error group with the provided context
526
		var g *errgroup.Group
527
		g, ctx = errgroup.WithContext(ctx)
528
529
		// Add the container.Run function to the error group
530
		g.Go(func() error {
531
			return container.Run(
532
				ctx,
533
				&cfg.Server,
534
				logger,
535
				&cfg.Distributed,
536
				&cfg.Authn,
537
				&cfg.Profiler,
538
				localInvoker,
539
			)
540
		})
541
542
		// Wait for the error group to finish and log any errors
543
		if err = g.Wait(); err != nil {
544
			slog.Error(err.Error())
545
		}
546
547
		return nil
548
	}
549
}
550
551
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
552
func getLogLevel(level string) slog.Level {
553
	switch level {
554
	case "info":
555
		return slog.LevelInfo // Return Info level
556
	case "warn":
557
		return slog.LevelWarn // Return Warning level
558
	case "error":
559
		return slog.LevelError // Return Error level
560
	case "debug":
561
		return slog.LevelDebug // Return Debug level
562
	default:
563
		return slog.LevelInfo // Default to Info level if unrecognized
564
	}
565
}
566