Issues (58)

pkg/cmd/serve.go (4 issues)

Severity
1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"os"
9
	"os/signal"
10
	"strings"
11
	"syscall"
12
	"time"
13
14
	slogmulti "github.com/samber/slog-multi" // Multi-handler logger
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	"github.com/Permify/permify/internal/storage/postgres/gc"
24
	cacheproxy "github.com/Permify/permify/internal/storage/proxies/cache"
25
	cbproxy "github.com/Permify/permify/internal/storage/proxies/circuitbreaker"
26
	sfproxy "github.com/Permify/permify/internal/storage/proxies/singleflight"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
43
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
44
)
45
46
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
47
// The command takes no arguments and runs the serve() function to start the Permify service.
48
// The command has a short description of what it does.
49
func NewServeCommand() *cobra.Command {
50
	command := &cobra.Command{
51
		Use:   "serve",
52
		Short: "serve the Permify server",
53
		RunE:  serve(),
54
		Args:  cobra.NoArgs,
55
	}
56
57
	conf := config.DefaultConfig()
58
	f := command.Flags()
59
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
60
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
61
	f.String("account-id", conf.AccountID, "account id")
62
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
63
	f.String("server-name-override", conf.Server.NameOverride, "server name override")
64
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
65
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
66
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
67
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
68
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
69
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
70
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
71
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
72
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
73
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
74
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
75
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
76
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
77
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
78
	f.Bool("log-enabled", conf.Log.Enabled, "logger exporter enabled") // Log exporter toggle
79
	f.String("log-exporter", conf.Log.Exporter, "can be; otlp. (integrated metric tools)")
80
	f.String("log-endpoint", conf.Log.Endpoint, "export uri for logs")
81
	f.Bool("log-insecure", conf.Log.Insecure, "use https or http for logs")
82
	f.String("log-urlpath", conf.Log.Urlpath, "allow to set url path for otlp exporter")
83
	f.StringSlice("log-headers", conf.Log.Headers, "allows setting custom headers for the log exporter in key-value pairs")
84
	f.String("log-protocol", conf.Log.Protocol, "allows setting the communication protocol for the log exporter, with options http or grpc")
85
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
86
	f.String("authn-method", conf.Authn.Method, "server authentication method")
87
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
88
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
89
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
90
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
91
	f.Duration("authn-oidc-backoff-interval", conf.Authn.Oidc.BackoffInterval, "backoff interval for the OpenID Connect configuration")
92
	f.Duration("authn-oidc-backoff-frequency", conf.Authn.Oidc.BackoffFrequency, "backoff frequency for the OpenID Connect configuration")
93
	f.Int("authn-oidc-backoff-max-retries", conf.Authn.Oidc.BackoffMaxRetries, "defines the maximum number of retries for the OpenID Connect configuration")
94
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
95
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
96
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
97
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
98
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
99
	f.String("tracer-urlpath", conf.Tracer.Urlpath, "allow to set url path for otlp exporter")
100
	f.StringSlice("tracer-headers", conf.Tracer.Headers, "allows setting custom headers for the tracer exporter in key-value pairs")
101
	f.String("tracer-protocol", conf.Tracer.Protocol, "allows setting the communication protocol for the tracer exporter, with options http or grpc")
102
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
103
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
104
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
105
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
106
	f.String("meter-urlpath", conf.Meter.Urlpath, "allow to set url path for otlp exporter")
107
	f.StringSlice("meter-headers", conf.Meter.Headers, "allows setting custom headers for the metric exporter in key-value pairs")
108
	f.Int("meter-interval", conf.Meter.Interval, "allows to set metrics to be pushed in certain time interval")
109
	f.String("meter-protocol", conf.Meter.Protocol, "allows setting the communication protocol for the meter exporter, with options http or grpc")
110
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
111
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
112
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
113
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
114
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
115
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
116
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
117
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
118
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
119
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
120
	f.String("database-writer-uri", conf.Database.Writer.URI, "writer uri of your data source to store relation tuples and schema")
121
	f.String("database-reader-uri", conf.Database.Reader.URI, "reader uri of your data source to store relation tuples and schema")
122
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
123
	f.Int("database-max-connections", conf.Database.MaxConnections, "maximum number of connections in the pool")
124
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "deprecated: use database-max-connections instead. maximum number of parallel connections that can be made to the database at any time")
125
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "deprecated: use database-min-connections instead. maximum number of idle connections that can be made to the database at any time")
126
	f.Int("database-min-connections", conf.Database.MinConnections, "minimum number of connections in the pool")
127
	f.Int("database-min-idle-connections", conf.Database.MinIdleConnections, "minimum number of idle connections in the pool")
128
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
129
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
130
	f.Duration("database-health-check-period", conf.Database.HealthCheckPeriod, "period between health checks on idle connections")
131
	f.Duration("database-max-connection-lifetime-jitter", conf.Database.MaxConnectionLifetimeJitter, "jitter added to max_connection_lifetime to prevent all connections from expiring at once")
132
	f.Duration("database-connect-timeout", conf.Database.ConnectTimeout, "maximum time to wait when establishing a new connection")
133
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
134
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
135
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
136
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
137
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
138
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
139
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
140
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
141
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
142
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
143
	f.Int("distributed-partition-count", conf.Distributed.PartitionCount, "number of partitions for distributed hashing")
144
	f.Int("distributed-replication-factor", conf.Distributed.ReplicationFactor, "number of replicas for distributed hashing")
145
	f.Float64("distributed-load", conf.Distributed.Load, "load factor for distributed hashing")
146
	f.Int("distributed-picker-width", conf.Distributed.PickerWidth, "picker width for distributed hashing")
147
	// Silence usage on error
148
	command.SilenceUsage = true // Suppress usage on errors
149
	// Register flags
150
	command.PreRun = func(cmd *cobra.Command, args []string) {
151
		flags.RegisterServeFlags(f)
152
	}
153
154
	return command
155
}
156
157
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
158
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
159
// It then creates a ServiceContainer and runs it with the given configuration.
160
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
161
// It returns an error if there is an issue with any of the components or if any goroutine fails.
162
func serve() func(cmd *cobra.Command, args []string) error {
163
	return func(cmd *cobra.Command, args []string) error {
164
		// Load configuration
165
		var cfg *config.Config
166
		var err error
167
		cfgFile := viper.GetString("config.file")
168
		if cfgFile != "" {
169
			cfg, err = config.NewConfigWithFile(cfgFile)
170
			if err != nil {
171
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
172
			}
173
			// Unmarshal config from file
174
			if err = viper.Unmarshal(cfg); err != nil {
175
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
176
			}
177
		} else {
178
			// Load configuration from environment
179
			cfg, err = config.NewConfig()
180
			if err != nil {
181
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
182
			}
183
			// Unmarshal config from environment
184
			if err = viper.Unmarshal(cfg); err != nil {
185
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
unrecognized printf verb 'w'
Loading history...
186
			}
187
		}
188
189
		// Print banner and initialize logger
190
		internal.PrintBanner()
191
192
		// Set up context and signal handling
193
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
194
		defer stop()
195
196
		internal.Identifier = cfg.AccountID
197
198
		var logger *slog.Logger
199
		var handler slog.Handler
200
		switch cfg.Log.Output {
201
		case "json":
202
			handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
203
				Level: getLogLevel(cfg.Log.Level),
204
			})
205
		case "text":
206
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
207
				Level: getLogLevel(cfg.Log.Level),
208
			})
209
		default:
210
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
211
				Level: getLogLevel(cfg.Log.Level),
212
			})
213
		}
214
215
		// Configure custom log exporter if enabled
216
		if cfg.Log.Enabled {
217
			headers := map[string]string{}
218
			for _, header := range cfg.Log.Headers {
219
				h := strings.Split(header, ":")
220
				if len(h) != 2 {
221
					return errors.New("invalid header format; expected 'key:value'")
222
				}
223
				headers[h[0]] = h[1]
224
			}
225
226
			customHandler, err := telemetry.HandlerFactory(
227
				cfg.Log.Exporter,
228
				cfg.Log.Endpoint,
229
				cfg.Log.Insecure,
230
				cfg.Log.Urlpath,
231
				headers,
232
				cfg.Log.Protocol,
233
				getLogLevel(cfg.Log.Level),
234
			)
235
			if err != nil {
236
				slog.Error("invalid logger exporter", slog.Any("error", err))
237
				logger = slog.New(handler)
238
			} else {
239
				logger = slog.New(slogmulti.Fanout(customHandler, handler))
240
			}
241
		} else {
242
			logger = slog.New(handler)
243
		}
244
245
		// Set default logger
246
		slog.SetDefault(logger)
247
248
		// Check account ID
249
		if internal.Identifier == "" {
250
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
251
			slog.Error(message)
252
253
			ticker := time.NewTicker(24 * time.Hour)
254
			defer ticker.Stop()
255
256
			go func() {
257
				for range ticker.C {
258
					slog.Error(message)
259
				}
260
			}()
261
		}
262
263
		slog.Info("🚀 starting permify service...")
264
265
		// Run database migration if enabled
266
		if cfg.Database.AutoMigrate {
267
			err = storage.Migrate(cfg.Database)
268
			if err != nil {
269
				slog.Error("failed to migrate database", slog.Any("error", err))
270
				return err
271
			}
272
		}
273
274
		// Initialize database
275
		db, err := factories.DatabaseFactory(cfg.Database)
276
		if err != nil {
277
			slog.Error("failed to initialize database", slog.Any("error", err))
278
			return err
279
		}
280
		defer func() {
281
			if err = db.Close(); err != nil {
282
				slog.Error("failed to close database", slog.Any("error", err))
283
			}
284
		}()
285
286
		// Tracing
287
		if cfg.Tracer.Enabled {
288
			headers := map[string]string{}
289
			for _, header := range cfg.Tracer.Headers {
290
				h := strings.Split(header, ":")
291
				if len(h) != 2 {
292
					return errors.New("invalid header format; expected 'key:value'")
293
				}
294
				headers[h[0]] = h[1]
295
			}
296
297
			// Create tracer exporter
298
			var exporter trace.SpanExporter
299
			exporter, err = tracerexporters.ExporterFactory( // Create exporter
300
				cfg.Tracer.Exporter,
301
				cfg.Tracer.Endpoint,
302
				cfg.Tracer.Insecure,
303
				cfg.Tracer.Urlpath,
304
				headers,
305
				cfg.Tracer.Protocol,
306
			)
307
			if err != nil {
308
				slog.Error(err.Error())
309
			}
310
311
			// Initialize tracer
312
			shutdown := telemetry.NewTracer(exporter) // Create tracer
313
314
			defer func() {
315
				if err = shutdown(ctx); err != nil {
316
					slog.Error(err.Error())
317
				}
318
			}()
319
		}
320
321
		// Setup garbage collection
322
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
323
			slog.Info("🗑️ starting database garbage collection...")
324
325
			// Create garbage collector
326
			garbageCollector := gc.NewGC(
327
				db.(*PQDatabase.Postgres),
328
				gc.Interval(cfg.Database.GarbageCollection.Interval),
329
				gc.Window(cfg.Database.GarbageCollection.Window),
330
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
331
			)
332
333
			// Start garbage collector in background
334
			go func() {
335
				err = garbageCollector.Start(ctx)
336
				if err != nil {
337
					slog.Error(err.Error())
338
				}
339
			}()
340
		}
341
342
		if cfg.Meter.Enabled {
343
			headers := map[string]string{}
344
			for _, header := range cfg.Meter.Headers {
345
				h := strings.Split(header, ":")
346
				if len(h) != 2 {
347
					return errors.New("invalid header format; expected 'key:value'")
348
				}
349
				headers[h[0]] = h[1]
350
			}
351
352
			// Create meter exporter
353
			var exporter metric.Exporter
354
			exporter, err = meterexporters.ExporterFactory(
355
				cfg.Meter.Exporter,
356
				cfg.Meter.Endpoint,
357
				cfg.Meter.Insecure,
358
				cfg.Meter.Urlpath,
359
				headers,
360
				cfg.Meter.Protocol,
361
			)
362
			if err != nil {
363
				slog.Error(err.Error())
364
			}
365
366
			// Initialize meter
367
			shutdown := telemetry.NewMeter(exporter, time.Duration(cfg.Meter.Interval)*time.Second) // Create meter
368
			// Cleanup on exit
369
			defer func() {
370
				if err = shutdown(ctx); err != nil {
371
					slog.Error(err.Error())
372
				}
373
			}()
374
		}
375
376
		// schema cache
377
		var schemaCache pkgcache.Cache
378
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
379
		if err != nil {
380
			slog.Error(err.Error())
381
			return err
382
		}
383
384
		// engines cache cache
385
		var engineKeyCache pkgcache.Cache
386
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
387
		if err != nil {
388
			slog.Error(err.Error())
389
			return err
390
		}
391
392
		watcher := storage.NewNoopWatcher()
393
		if cfg.Service.Watch.Enabled {
394
			watcher = factories.WatcherFactory(db)
395
		}
396
397
		// Initialize the storage with factory methods
398
		dataReader := factories.DataReaderFactory(db)
399
		dataWriter := factories.DataWriterFactory(db)
400
		bundleReader := factories.BundleReaderFactory(db)
401
		bundleWriter := factories.BundleWriterFactory(db)
402
		schemaReader := factories.SchemaReaderFactory(db)
403
		schemaWriter := factories.SchemaWriterFactory(db)
404
		tenantReader := factories.TenantReaderFactory(db)
405
		tenantWriter := factories.TenantWriterFactory(db)
406
407
		// Add caching to the schema reader using a decorator
408
		schemaReader = cacheproxy.NewSchemaReader(schemaReader, schemaCache)
409
410
		dataReader = sfproxy.NewDataReader(dataReader)
411
		schemaReader = sfproxy.NewSchemaReader(schemaReader)
412
413
		// Check if circuit breaker should be enabled for services
414
		if cfg.Service.CircuitBreaker {
415
			var cb *gobreaker.CircuitBreaker
416
			var st gobreaker.Settings
417
			st.Name = "storage"
418
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
419
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
420
				return counts.Requests >= 10 && failureRatio >= 0.6
421
			}
422
423
			cb = gobreaker.NewCircuitBreaker(st)
424
425
			// Add circuit breaker to the relationship reader using decorator
426
			dataReader = cbproxy.NewDataReader(dataReader, cb)
427
428
			// Add circuit breaker to the bundle reader using decorators
429
			bundleReader = cbproxy.NewBundleReader(bundleReader, cb)
430
431
			// Add circuit breaker to the schema reader using decorator
432
			schemaReader = cbproxy.NewSchemaReader(schemaReader, cb)
433
434
			// Add circuit breaker to the tenant reader using decorator
435
			tenantReader = cbproxy.NewTenantReader(tenantReader, cb)
436
		}
437
438
		// Initialize the engines using the key manager, schema reader, and relationship reader
439
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
440
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
441
442
		// Declare a variable `checker` of type `invoke.Check`.
443
		var checker invoke.Check
444
445
		checker = cache.NewCheckEngineWithCache(
446
			checkEngine,
447
			schemaReader,
448
			engineKeyCache,
449
		)
450
451
		// Create the checker either with load balancing or caching capabilities.
452
		if cfg.Distributed.Enabled {
453
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
454
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
455
			}
456
457
			checker, err = balancer.NewCheckEngineWithBalancer(
458
				ctx,
459
				checker,
460
				schemaReader,
461
				cfg.Server.NameOverride,
462
				&cfg.Distributed,
463
				&cfg.Server.GRPC,
464
				&cfg.Authn,
465
			)
466
			// Handle potential error during checker creation.
467
			if err != nil {
468
				return err
469
			}
470
		}
471
472
		// Create a localChecker which directly checks without considering distributed setup.
473
		// This also includes caching capabilities.
474
		localChecker := cache.NewCheckEngineWithCache(
475
			checkEngine,
476
			schemaReader,
477
			engineKeyCache,
478
		)
479
480
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
481
		lookupEngine := engines.NewLookupEngine(
482
			checker,
483
			schemaReader,
484
			dataReader,
485
			// Set concurrency limit based on the configuration.
486
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
487
		)
488
489
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
490
		subjectPermissionEngine := engines.NewSubjectPermission(
491
			checker,
492
			schemaReader,
493
			// Set concurrency limit for the subject permission checks.
494
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
495
		)
496
497
		// Create a new invoker that is used to directly call various functions or engines.
498
		// It encompasses the schema, data, checker, and other engines.
499
		invoker := invoke.NewDirectInvoker(
500
			schemaReader,
501
			dataReader,
502
			checker,
503
			expandEngine,
504
			lookupEngine,
505
			subjectPermissionEngine,
506
		)
507
508
		// Associate the invoker with the checkEngine.
509
		checkEngine.SetInvoker(invoker)
510
511
		// Create a local invoker for local operations.
512
		localInvoker := invoke.NewDirectInvoker(
513
			schemaReader,
514
			dataReader,
515
			localChecker,
516
			expandEngine,
517
			lookupEngine,
518
			subjectPermissionEngine,
519
		)
520
521
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
522
		container := servers.NewContainer(
523
			invoker,
524
			dataReader,
525
			dataWriter,
526
			bundleReader,
527
			bundleWriter,
528
			schemaReader,
529
			schemaWriter,
530
			tenantReader,
531
			tenantWriter,
532
			watcher,
533
		)
534
535
		// Create an error group with the provided context
536
		var g *errgroup.Group
537
		g, ctx = errgroup.WithContext(ctx)
538
539
		// Add the container.Run function to the error group
540
		g.Go(func() error {
541
			return container.Run(
542
				ctx,
543
				&cfg.Server,
544
				logger,
545
				&cfg.Distributed,
546
				&cfg.Authn,
547
				&cfg.Profiler,
548
				localInvoker,
549
			)
550
		})
551
552
		// Wait for the error group to finish and log any errors
553
		if err = g.Wait(); err != nil {
554
			slog.Error(err.Error())
555
		}
556
557
		return nil
558
	}
559
}
560
561
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
562
func getLogLevel(level string) slog.Level {
563
	switch level {
564
	case "info":
565
		return slog.LevelInfo // Return Info level
566
	case "warn":
567
		return slog.LevelWarn // Return Warning level
568
	case "error":
569
		return slog.LevelError // Return Error level
570
	case "debug":
571
		return slog.LevelDebug // Return Debug level
572
	default:
573
		return slog.LevelInfo // Default to Info level if unrecognized
574
	}
575
}
576