Passed
Push — master ( 7a86d8...54e0e1 )
by Tolga
01:02 queued 13s
created

cmd.serve   F

Complexity

Conditions 42

Size

Total Lines 345
Code Lines 210

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 42
eloc 210
nop 0
dl 0
loc 345
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like cmd.serve often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package cmd
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"os"
9
	"os/signal"
10
	"syscall"
11
	"time"
12
13
	"github.com/davecgh/go-spew/spew"
14
15
	"github.com/sony/gobreaker"
16
	"github.com/spf13/cobra"
17
	"github.com/spf13/viper"
18
	"go.opentelemetry.io/otel/sdk/metric"
19
20
	"github.com/Permify/permify/internal/engines/balancer"
21
	"github.com/Permify/permify/internal/engines/cache"
22
	"github.com/Permify/permify/internal/invoke"
23
	cacheDecorator "github.com/Permify/permify/internal/storage/decorators/cache"
24
	cbDecorator "github.com/Permify/permify/internal/storage/decorators/circuitBreaker"
25
	sfDecorator "github.com/Permify/permify/internal/storage/decorators/singleflight"
26
	"github.com/Permify/permify/internal/storage/postgres/gc"
27
	"github.com/Permify/permify/pkg/cmd/flags"
28
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
29
30
	"go.opentelemetry.io/otel/sdk/trace"
31
	"golang.org/x/sync/errgroup"
32
33
	"github.com/Permify/permify/internal"
34
	"github.com/Permify/permify/internal/config"
35
	"github.com/Permify/permify/internal/engines"
36
	"github.com/Permify/permify/internal/factories"
37
	"github.com/Permify/permify/internal/servers"
38
	"github.com/Permify/permify/internal/storage"
39
	pkgcache "github.com/Permify/permify/pkg/cache"
40
	"github.com/Permify/permify/pkg/cache/ristretto"
41
	"github.com/Permify/permify/pkg/telemetry"
42
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
43
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
44
)
45
46
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
47
// The command takes no arguments and runs the serve() function to start the Permify service.
48
// The command has a short description of what it does.
49
func NewServeCommand() *cobra.Command {
50
	command := &cobra.Command{
51
		Use:   "serve",
52
		Short: "serve the Permify server",
53
		RunE:  serve(),
54
		Args:  cobra.NoArgs,
55
	}
56
57
	conf := config.DefaultConfig()
58
	f := command.Flags()
59
	f.StringP("config", "c", "", "config file (default is $HOME/.permify.yaml)")
60
	f.Bool("http-enabled", conf.Server.HTTP.Enabled, "switch option for HTTP server")
61
	f.String("account-id", conf.AccountID, "account id")
62
	f.Int64("server-rate-limit", conf.Server.RateLimit, "the maximum number of requests the server should handle per second")
63
	f.String("grpc-port", conf.Server.GRPC.Port, "port that GRPC server run on")
64
	f.Bool("grpc-tls-enabled", conf.Server.GRPC.TLSConfig.Enabled, "switch option for GRPC tls server")
65
	f.String("grpc-tls-key-path", conf.Server.GRPC.TLSConfig.KeyPath, "GRPC tls key path")
66
	f.String("grpc-tls-cert-path", conf.Server.GRPC.TLSConfig.CertPath, "GRPC tls certificate path")
67
	f.String("http-port", conf.Server.HTTP.Port, "HTTP port address")
68
	f.Bool("http-tls-enabled", conf.Server.HTTP.TLSConfig.Enabled, "switch option for HTTP tls server")
69
	f.String("http-tls-key-path", conf.Server.HTTP.TLSConfig.KeyPath, "HTTP tls key path")
70
	f.String("http-tls-cert-path", conf.Server.HTTP.TLSConfig.CertPath, "HTTP tls certificate path")
71
	f.StringSlice("http-cors-allowed-origins", conf.Server.HTTP.CORSAllowedOrigins, "CORS allowed origins for http gateway")
72
	f.StringSlice("http-cors-allowed-headers", conf.Server.HTTP.CORSAllowedHeaders, "CORS allowed headers for http gateway")
73
	f.Bool("profiler-enabled", conf.Profiler.Enabled, "switch option for profiler")
74
	f.String("profiler-port", conf.Profiler.Port, "profiler port address")
75
	f.String("log-level", conf.Log.Level, "set log verbosity ('info', 'debug', 'error', 'warning')")
76
	f.String("log-output", conf.Log.Output, "logger output valid values json, text")
77
	f.Bool("authn-enabled", conf.Authn.Enabled, "enable server authentication")
78
	f.String("authn-method", conf.Authn.Method, "server authentication method")
79
	f.StringSlice("authn-preshared-keys", conf.Authn.Preshared.Keys, "preshared key/keys for server authentication")
80
	f.String("authn-oidc-issuer", conf.Authn.Oidc.Issuer, "issuer identifier of the OpenID Connect Provider")
81
	f.String("authn-oidc-audience", conf.Authn.Oidc.Audience, "intended audience of the OpenID Connect token")
82
	f.Duration("authn-oidc-refresh-interval", conf.Authn.Oidc.RefreshInterval, "refresh interval for the OpenID Connect configuration")
83
	f.StringSlice("authn-oidc-valid-methods", conf.Authn.Oidc.ValidMethods, "list of valid JWT signing methods for OpenID Connect")
84
	f.Bool("tracer-enabled", conf.Tracer.Enabled, "switch option for tracing")
85
	f.String("tracer-exporter", conf.Tracer.Exporter, "can be; jaeger, signoz, zipkin or otlp. (integrated tracing tools)")
86
	f.String("tracer-endpoint", conf.Tracer.Endpoint, "export uri for tracing data")
87
	f.Bool("tracer-insecure", conf.Tracer.Insecure, "use https or http for tracer data, only used for otlp exporter or signoz")
88
	f.String("tracer-urlpath", conf.Tracer.URLPath, "allow to set url path for otlp exporter")
89
	f.Bool("meter-enabled", conf.Meter.Enabled, "switch option for metric")
90
	f.String("meter-exporter", conf.Meter.Exporter, "can be; otlp. (integrated metric tools)")
91
	f.String("meter-endpoint", conf.Meter.Endpoint, "export uri for metric data")
92
	f.Bool("meter-insecure", conf.Meter.Insecure, "use https or http for metric data")
93
	f.String("meter-urlpath", conf.Meter.URLPath, "allow to set url path for otlp exporter")
94
	f.Bool("service-circuit-breaker", conf.Service.CircuitBreaker, "switch option for service circuit breaker")
95
	f.Bool("service-watch-enabled", conf.Service.Watch.Enabled, "switch option for watch service")
96
	f.Int64("service-schema-cache-number-of-counters", conf.Service.Schema.Cache.NumberOfCounters, "schema service cache number of counters")
97
	f.String("service-schema-cache-max-cost", conf.Service.Schema.Cache.MaxCost, "schema service cache max cost")
98
	f.Int("service-permission-bulk-limit", conf.Service.Permission.BulkLimit, "bulk operations limit")
99
	f.Int("service-permission-concurrency-limit", conf.Service.Permission.ConcurrencyLimit, "concurrency limit")
100
	f.Int64("service-permission-cache-number-of-counters", conf.Service.Permission.Cache.NumberOfCounters, "permission service cache number of counters")
101
	f.String("service-permission-cache-max-cost", conf.Service.Permission.Cache.MaxCost, "permission service cache max cost")
102
	f.String("database-engine", conf.Database.Engine, "data source. e.g. postgres, memory")
103
	f.String("database-uri", conf.Database.URI, "uri of your data source to store relation tuples and schema")
104
	f.Bool("database-auto-migrate", conf.Database.AutoMigrate, "auto migrate database tables")
105
	f.Int("database-max-open-connections", conf.Database.MaxOpenConnections, "maximum number of parallel connections that can be made to the database at any time")
106
	f.Int("database-max-idle-connections", conf.Database.MaxIdleConnections, "maximum number of idle connections that can be made to the database at any time")
107
	f.Duration("database-max-connection-lifetime", conf.Database.MaxConnectionLifetime, "maximum amount of time a connection may be reused")
108
	f.Duration("database-max-connection-idle-time", conf.Database.MaxConnectionIdleTime, "maximum amount of time a connection may be idle")
109
	f.Int("database-max-data-per-write", conf.Database.MaxDataPerWrite, "sets the maximum amount of data per write operation to the database")
110
	f.Int("database-max-retries", conf.Database.MaxRetries, "defines the maximum number of retries for database operations in case of failure")
111
	f.Int("database-watch-buffer-size", conf.Database.WatchBufferSize, "specifies the buffer size for database watch operations, impacting how many changes can be queued")
112
	f.Bool("database-garbage-collection-enabled", conf.Database.GarbageCollection.Enabled, "use database garbage collection for expired relationships and attributes")
113
	f.Duration("database-garbage-collection-interval", conf.Database.GarbageCollection.Interval, "interval for database garbage collection")
114
	f.Duration("database-garbage-collection-timeout", conf.Database.GarbageCollection.Timeout, "timeout for database garbage collection")
115
	f.Duration("database-garbage-collection-window", conf.Database.GarbageCollection.Window, "window for database garbage collection")
116
	f.Bool("distributed-enabled", conf.Distributed.Enabled, "enable distributed")
117
	f.String("distributed-address", conf.Distributed.Address, "distributed address")
118
	f.String("distributed-port", conf.Distributed.Port, "distributed port")
119
120
	// SilenceUsage is set to true to suppress usage when an error occurs
121
	command.SilenceUsage = true
122
123
	command.PreRun = func(cmd *cobra.Command, args []string) {
124
		flags.RegisterServeFlags(f)
125
	}
126
127
	return command
128
}
129
130
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
131
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
132
// It then creates a ServiceContainer and runs it with the given configuration.
133
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
134
// It returns an error if there is an issue with any of the components or if any goroutine fails.
135
func serve() func(cmd *cobra.Command, args []string) error {
136
	return func(cmd *cobra.Command, args []string) error {
137
		var cfg *config.Config
138
		var err error
139
		cfgFile := viper.GetString("config.file")
140
		if cfgFile != "" {
141
			cfg, err = config.NewConfigWithFile(cfgFile)
142
			if err != nil {
143
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
144
			}
145
146
			if err = viper.Unmarshal(cfg); err != nil {
147
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
148
			}
149
		} else {
150
			// Load configuration
151
			cfg, err = config.NewConfig()
152
			if err != nil {
153
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
154
			}
155
156
			if err = viper.Unmarshal(cfg); err != nil {
157
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
158
			}
159
		}
160
161
		spew.Dump(cfg)
162
163
		// Print banner and initialize logger
164
		internal.PrintBanner()
165
166
		var handler slog.Handler
167
		switch cfg.Log.Output {
168
		case "json":
169
			handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
170
				Level: getLogLevel(cfg.Log.Level),
171
			})
172
		case "text":
173
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
174
				Level: getLogLevel(cfg.Log.Level),
175
			})
176
		default:
177
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
178
				Level: getLogLevel(cfg.Log.Level),
179
			})
180
		}
181
182
		logger := slog.New(handler)
183
184
		slog.SetDefault(logger)
185
186
		slog.Info("🚀 starting permify service...")
187
188
		internal.Identifier = cfg.AccountID
189
		if internal.Identifier == "" {
190
			message := "Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account"
191
			slog.Error(message)
192
193
			ticker := time.NewTicker(24 * time.Hour)
194
			defer ticker.Stop()
195
196
			go func() {
197
				for range ticker.C {
198
					slog.Error(message)
199
				}
200
			}()
201
		}
202
203
		// Set up context and signal handling
204
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
205
		defer stop()
206
207
		// Run database migration if enabled
208
		if cfg.Database.AutoMigrate {
209
			err = storage.Migrate(cfg.Database)
210
			if err != nil {
211
				slog.Error("failed to migrate database", slog.Any("error", err))
212
			}
213
		}
214
215
		// Initialize database
216
		db, err := factories.DatabaseFactory(cfg.Database)
217
		if err != nil {
218
			slog.Error("failed to initialize database", slog.Any("error", err))
219
			return err
220
		}
221
		defer func() {
222
			if err = db.Close(); err != nil {
223
				slog.Error("failed to close database", slog.Any("error", err))
224
			}
225
		}()
226
227
		// Tracing
228
		if cfg.Tracer.Enabled {
229
			var exporter trace.SpanExporter
230
			exporter, err = tracerexporters.ExporterFactory(
231
				cfg.Tracer.Exporter,
232
				cfg.Tracer.Endpoint,
233
				cfg.Tracer.Insecure,
234
				cfg.Tracer.URLPath,
235
			)
236
			if err != nil {
237
				slog.Error(err.Error())
238
			}
239
240
			shutdown := telemetry.NewTracer(exporter)
241
242
			defer func() {
243
				if err = shutdown(context.Background()); err != nil {
244
					slog.Error(err.Error())
245
				}
246
			}()
247
		}
248
249
		// Garbage collection
250
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
251
			slog.Info("🗑️ starting database garbage collection...")
252
253
			garbageCollector := gc.NewGC(
254
				db.(*PQDatabase.Postgres),
255
				gc.Interval(cfg.Database.GarbageCollection.Interval),
256
				gc.Window(cfg.Database.GarbageCollection.Window),
257
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
258
			)
259
260
			go func() {
261
				err = garbageCollector.Start(ctx)
262
				if err != nil {
263
					slog.Error(err.Error())
264
				}
265
			}()
266
		}
267
268
		// Meter
269
		meter := telemetry.NewNoopMeter()
270
		if cfg.Meter.Enabled {
271
			var exporter metric.Exporter
272
			exporter, err = meterexporters.ExporterFactory(
273
				cfg.Meter.Exporter,
274
				cfg.Meter.Endpoint,
275
				cfg.Meter.Insecure,
276
				cfg.Meter.URLPath,
277
			)
278
			if err != nil {
279
				slog.Error(err.Error())
280
			}
281
282
			meter, err = telemetry.NewMeter(exporter)
283
			if err != nil {
284
				slog.Error(err.Error())
285
			}
286
		}
287
288
		// schema cache
289
		var schemaCache pkgcache.Cache
290
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
291
		if err != nil {
292
			slog.Error(err.Error())
293
			return err
294
		}
295
296
		// engines cache cache
297
		var engineKeyCache pkgcache.Cache
298
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
299
		if err != nil {
300
			slog.Error(err.Error())
301
			return err
302
		}
303
304
		watcher := storage.NewNoopWatcher()
305
		if cfg.Service.Watch.Enabled {
306
			watcher = factories.WatcherFactory(db)
307
		}
308
309
		// Initialize the storage with factory methods
310
		dataReader := factories.DataReaderFactory(db)
311
		dataWriter := factories.DataWriterFactory(db)
312
		bundleReader := factories.BundleReaderFactory(db)
313
		bundleWriter := factories.BundleWriterFactory(db)
314
		schemaReader := factories.SchemaReaderFactory(db)
315
		schemaWriter := factories.SchemaWriterFactory(db)
316
		tenantReader := factories.TenantReaderFactory(db)
317
		tenantWriter := factories.TenantWriterFactory(db)
318
319
		// Add caching to the schema reader using a decorator
320
		schemaReader = cacheDecorator.NewSchemaReader(schemaReader, schemaCache)
321
322
		dataReader = sfDecorator.NewDataReader(dataReader)
323
		schemaReader = sfDecorator.NewSchemaReader(schemaReader)
324
325
		// Check if circuit breaker should be enabled for services
326
		if cfg.Service.CircuitBreaker {
327
			var cb *gobreaker.CircuitBreaker
328
			var st gobreaker.Settings
329
			st.Name = "storage"
330
			st.ReadyToTrip = func(counts gobreaker.Counts) bool {
331
				failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
332
				return counts.Requests >= 10 && failureRatio >= 0.6
333
			}
334
335
			cb = gobreaker.NewCircuitBreaker(st)
336
337
			// Add circuit breaker to the relationship reader using decorator
338
			dataReader = cbDecorator.NewDataReader(dataReader, cb)
339
340
			// Add circuit breaker to the bundle reader using decorators
341
			bundleReader = cbDecorator.NewBundleReader(bundleReader, cb)
342
343
			// Add circuit breaker to the schema reader using decorator
344
			schemaReader = cbDecorator.NewSchemaReader(schemaReader, cb)
345
346
			// Add circuit breaker to the tenant reader using decorator
347
			tenantReader = cbDecorator.NewTenantReader(tenantReader, cb)
348
		}
349
350
		// Initialize the engines using the key manager, schema reader, and relationship reader
351
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
352
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
353
354
		// Declare a variable `checker` of type `invoke.Check`.
355
		var checker invoke.Check
356
357
		// Create the checker either with load balancing or caching capabilities.
358
		if cfg.Distributed.Enabled {
359
360
			if cfg.Authn.Enabled && cfg.Authn.Method == "oidc" {
361
				return errors.New("OIDC authentication method cannot be used in distributed mode. Please check your configuration")
362
			}
363
364
			checker, err = balancer.NewCheckEngineWithBalancer(
365
				context.Background(),
366
				checkEngine,
367
				schemaReader,
368
				&cfg.Distributed,
369
				&cfg.Server.GRPC,
370
				&cfg.Authn,
371
			)
372
			// Handle potential error during checker creation.
373
			if err != nil {
374
				return err
375
			}
376
			checker = cache.NewCheckEngineWithCache(
377
				checker,
378
				schemaReader,
379
				engineKeyCache,
380
				meter,
381
			)
382
		} else {
383
			checker = cache.NewCheckEngineWithCache(
384
				checkEngine,
385
				schemaReader,
386
				engineKeyCache,
387
				meter,
388
			)
389
		}
390
391
		// Create a localChecker which directly checks without considering distributed setup.
392
		// This also includes caching capabilities.
393
		localChecker := cache.NewCheckEngineWithCache(
394
			checkEngine,
395
			schemaReader,
396
			engineKeyCache,
397
			meter,
398
		)
399
400
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
401
		lookupEngine := engines.NewLookupEngine(
402
			checker,
403
			schemaReader,
404
			dataReader,
405
			// Set concurrency limit based on the configuration.
406
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
407
		)
408
409
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
410
		subjectPermissionEngine := engines.NewSubjectPermission(
411
			checker,
412
			schemaReader,
413
			// Set concurrency limit for the subject permission checks.
414
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
415
		)
416
417
		// Create a new invoker that is used to directly call various functions or engines.
418
		// It encompasses the schema, data, checker, and other engines.
419
		invoker := invoke.NewDirectInvoker(
420
			schemaReader,
421
			dataReader,
422
			checker,
423
			expandEngine,
424
			lookupEngine,
425
			subjectPermissionEngine,
426
			meter,
427
		)
428
429
		// Associate the invoker with the checkEngine.
430
		checkEngine.SetInvoker(invoker)
431
432
		// Create a local invoker for local operations.
433
		localInvoker := invoke.NewDirectInvoker(
434
			schemaReader,
435
			dataReader,
436
			localChecker,
437
			expandEngine,
438
			lookupEngine,
439
			subjectPermissionEngine,
440
			meter,
441
		)
442
443
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
444
		container := servers.NewContainer(
445
			invoker,
446
			dataReader,
447
			dataWriter,
448
			bundleReader,
449
			bundleWriter,
450
			schemaReader,
451
			schemaWriter,
452
			tenantReader,
453
			tenantWriter,
454
			watcher,
455
		)
456
457
		// Create an error group with the provided context
458
		var g *errgroup.Group
459
		g, ctx = errgroup.WithContext(ctx)
460
461
		// Add the container.Run function to the error group
462
		g.Go(func() error {
463
			return container.Run(
464
				ctx,
465
				&cfg.Server,
466
				logger,
467
				&cfg.Distributed,
468
				&cfg.Authn,
469
				&cfg.Profiler,
470
				localInvoker,
471
			)
472
		})
473
474
		// Wait for the error group to finish and log any errors
475
		if err = g.Wait(); err != nil {
476
			slog.Error(err.Error())
477
		}
478
479
		return nil
480
	}
481
}
482
483
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
484
func getLogLevel(level string) slog.Level {
485
	switch level {
486
	case "info":
487
		return slog.LevelInfo // Return Info level
488
	case "warn":
489
		return slog.LevelWarn // Return Warning level
490
	case "error":
491
		return slog.LevelError // Return Error level
492
	case "debug":
493
		return slog.LevelDebug // Return Debug level
494
	default:
495
		return slog.LevelInfo // Default to Info level if unrecognized
496
	}
497
}
498