Passed
Pull Request — master (#1003)
by Tolga
02:53
created

cmd.getLogLevel   B

Complexity

Conditions 6

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 12
nop 1
dl 0
loc 12
rs 8.6666
c 0
b 0
f 0
1
package cmd
2
3
import (
4
	"context"
5
	"fmt"
6
	"log/slog"
7
	"os"
8
	"os/signal"
9
	"syscall"
10
11
	"github.com/spf13/viper"
12
	"go.opentelemetry.io/otel/sdk/metric"
13
14
	"github.com/Permify/permify/internal/engines/balancer"
15
	"github.com/Permify/permify/internal/engines/cache"
16
	"github.com/Permify/permify/internal/invoke"
17
	"github.com/Permify/permify/internal/storage/postgres/gc"
18
	"github.com/Permify/permify/pkg/cmd/flags"
19
	PQDatabase "github.com/Permify/permify/pkg/database/postgres"
20
21
	"github.com/fatih/color"
22
	"github.com/spf13/cobra"
23
	"go.opentelemetry.io/otel/sdk/trace"
24
	"golang.org/x/sync/errgroup"
25
26
	"github.com/Permify/permify/internal"
27
	"github.com/Permify/permify/internal/config"
28
	"github.com/Permify/permify/internal/engines"
29
	"github.com/Permify/permify/internal/factories"
30
	"github.com/Permify/permify/internal/servers"
31
	"github.com/Permify/permify/internal/storage"
32
	"github.com/Permify/permify/internal/storage/decorators"
33
	pkgcache "github.com/Permify/permify/pkg/cache"
34
	"github.com/Permify/permify/pkg/cache/ristretto"
35
	"github.com/Permify/permify/pkg/telemetry"
36
	"github.com/Permify/permify/pkg/telemetry/meterexporters"
37
	"github.com/Permify/permify/pkg/telemetry/tracerexporters"
38
)
39
40
// NewServeCommand returns a new Cobra command that can be used to run the "permify serve" command.
41
// The command takes no arguments and runs the serve() function to start the Permify service.
42
// The command has a short description of what it does.
43
func NewServeCommand() *cobra.Command {
44
	command := &cobra.Command{
45
		Use:   "serve",
46
		Short: "serve the Permify server",
47
		RunE:  serve(),
48
		Args:  cobra.NoArgs,
49
	}
50
51
	// register flags for serve
52
	flags.RegisterServeFlags(command)
53
54
	return command
55
}
56
57
// serve is the main function for the "permify serve" command. It starts the Permify service by configuring and starting the necessary components.
58
// It initializes the configuration, logger, database, tracing and metering components, and creates instances of the necessary engines, services, and decorators.
59
// It then creates a ServiceContainer and runs it with the given configuration.
60
// The function uses errgroup to manage the goroutines and gracefully shuts down the service upon receiving a termination signal.
61
// It returns an error if there is an issue with any of the components or if any goroutine fails.
62
func serve() func(cmd *cobra.Command, args []string) error {
63
	return func(cmd *cobra.Command, args []string) error {
64
		var cfg *config.Config
65
		var err error
66
		cfgFile := viper.GetString("config.file")
67
		if cfgFile != "" {
68
			cfg, err = config.NewConfigWithFile(cfgFile)
69
			if err != nil {
70
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
71
			}
72
73
			if err = viper.Unmarshal(cfg); err != nil {
74
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
75
			}
76
		} else {
77
			// Load configuration
78
			cfg, err = config.NewConfig()
79
			if err != nil {
80
				return fmt.Errorf("failed to create new config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
81
			}
82
83
			if err = viper.Unmarshal(cfg); err != nil {
84
				return fmt.Errorf("failed to unmarshal config: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
85
			}
86
		}
87
88
		// Print banner and initialize logger
89
		red := color.New(color.FgGreen)
90
		_, _ = red.Printf(internal.Banner, internal.Version)
0 ignored issues
show
introduced by
can't check non-constant format in call to Printf
Loading history...
91
92
		internal.Identifier = cfg.AccountID
93
		if internal.Identifier == "" {
94
			slog.Warn("Account ID is not set. Please fill in the Account ID for better support. Get your Account ID from https://permify.co/account")
95
		}
96
97
		var handler slog.Handler
98
		switch cfg.Log.Output {
99
		case "json":
100
			handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
101
				Level: getLogLevel(cfg.Log.Level),
102
			})
103
		case "text":
104
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
105
				Level: getLogLevel(cfg.Log.Level),
106
			})
107
		default:
108
			handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
109
				Level: getLogLevel(cfg.Log.Level),
110
			})
111
		}
112
113
		logger := slog.New(handler)
114
115
		slog.SetDefault(logger)
116
117
		slog.Info("🚀 starting permify service...")
118
119
		// Set up context and signal handling
120
		ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
121
		defer stop()
122
123
		// Run database migration if enabled
124
		if cfg.Database.AutoMigrate {
125
			err = storage.Migrate(cfg.Database)
126
			if err != nil {
127
				slog.Error("failed to migrate database: %w", err)
128
			}
129
		}
130
131
		// Initialize database
132
		db, err := factories.DatabaseFactory(cfg.Database)
133
		if err != nil {
134
			slog.Error("failed to initialize database: %w", err)
135
		}
136
		defer func() {
137
			if err = db.Close(); err != nil {
138
				slog.Error("failed to close database: %v", err)
139
			}
140
		}()
141
142
		// Tracing
143
		if cfg.Tracer.Enabled {
144
			var exporter trace.SpanExporter
145
			exporter, err = tracerexporters.ExporterFactory(
146
				cfg.Tracer.Exporter,
147
				cfg.Tracer.Endpoint,
148
				cfg.Tracer.Insecure,
149
				cfg.Tracer.URLPath,
150
			)
151
			if err != nil {
152
				slog.Error(err.Error())
153
			}
154
155
			shutdown := telemetry.NewTracer(exporter)
156
157
			defer func() {
158
				if err = shutdown(context.Background()); err != nil {
159
					slog.Error(err.Error())
160
				}
161
			}()
162
		}
163
164
		// Garbage collection
165
		if cfg.Database.GarbageCollection.Timeout > 0 && cfg.Database.GarbageCollection.Enabled && cfg.Database.Engine != "memory" {
166
			slog.Info("🗑️ starting database garbage collection...")
167
168
			garbageCollector := gc.NewGC(
169
				db.(*PQDatabase.Postgres),
170
				gc.Interval(cfg.Database.GarbageCollection.Interval),
171
				gc.Window(cfg.Database.GarbageCollection.Window),
172
				gc.Timeout(cfg.Database.GarbageCollection.Timeout),
173
			)
174
175
			go func() {
176
				err = garbageCollector.Start(ctx)
177
				if err != nil {
178
					slog.Error(err.Error())
179
				}
180
			}()
181
		}
182
183
		// Meter
184
		meter := telemetry.NewNoopMeter()
185
		if cfg.Meter.Enabled {
186
			var exporter metric.Exporter
187
			exporter, err = meterexporters.ExporterFactory(
188
				cfg.Meter.Exporter,
189
				cfg.Meter.Endpoint,
190
				cfg.Meter.Insecure,
191
				cfg.Meter.URLPath,
192
			)
193
			if err != nil {
194
				slog.Error(err.Error())
195
			}
196
197
			meter, err = telemetry.NewMeter(exporter)
198
			if err != nil {
199
				slog.Error(err.Error())
200
			}
201
		}
202
203
		// schema cache
204
		var schemaCache pkgcache.Cache
205
		schemaCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Schema.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Schema.Cache.MaxCost))
206
		if err != nil {
207
			slog.Error(err.Error())
208
		}
209
210
		// engines cache cache
211
		var engineKeyCache pkgcache.Cache
212
		engineKeyCache, err = ristretto.New(ristretto.NumberOfCounters(cfg.Service.Permission.Cache.NumberOfCounters), ristretto.MaxCost(cfg.Service.Permission.Cache.MaxCost))
213
		if err != nil {
214
			slog.Error(err.Error())
215
		}
216
217
		watcher := storage.NewNoopWatcher()
218
		if cfg.Service.Watch.Enabled {
219
			watcher = factories.WatcherFactory(db)
220
		}
221
222
		// Initialize the storage with factory methods
223
		dataReader := factories.DataReaderFactory(db)
224
		dataWriter := factories.DataWriterFactory(db)
225
		bundleReader := factories.BundleReaderFactory(db)
226
		bundleWriter := factories.BundleWriterFactory(db)
227
		schemaReader := factories.SchemaReaderFactory(db)
228
		schemaWriter := factories.SchemaWriterFactory(db)
229
		tenantReader := factories.TenantReaderFactory(db)
230
		tenantWriter := factories.TenantWriterFactory(db)
231
232
		// Add caching to the schema reader using a decorator
233
		schemaReader = decorators.NewSchemaReaderWithCache(schemaReader, schemaCache)
234
235
		// Check if circuit breaker should be enabled for services
236
		if cfg.Service.CircuitBreaker {
237
			// Add circuit breaker to the relationship reader and writer using decorators
238
			dataWriter = decorators.NewDataWriterWithCircuitBreaker(dataWriter, 1000)
239
			dataReader = decorators.NewDataReaderWithCircuitBreaker(dataReader, 1000)
240
241
			// Add circuit breaker to the bundle reader and writer using decorators
242
			bundleWriter = decorators.NewBundleWriterWithCircuitBreaker(bundleWriter, 1000)
243
			bundleReader = decorators.NewBundleReaderWithCircuitBreaker(bundleReader, 1000)
244
245
			// Add circuit breaker to the schema reader and writer using decorators
246
			schemaWriter = decorators.NewSchemaWriterWithCircuitBreaker(schemaWriter, 1000)
247
			schemaReader = decorators.NewSchemaReaderWithCircuitBreaker(schemaReader, 1000)
248
249
			// Add circuit breaker to the tenant reader and writer using decorators
250
			tenantWriter = decorators.NewTenantWriterWithCircuitBreaker(tenantWriter, 1000)
251
			tenantReader = decorators.NewTenantReaderWithCircuitBreaker(tenantReader, 1000)
252
		}
253
254
		// Initialize the engines using the key manager, schema reader, and relationship reader
255
		checkEngine := engines.NewCheckEngine(schemaReader, dataReader, engines.CheckConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit))
256
		expandEngine := engines.NewExpandEngine(schemaReader, dataReader)
257
258
		// Declare a variable `checker` of type `invoke.Check`.
259
		var checker invoke.Check
260
261
		// Create the checker either with load balancing or caching capabilities.
262
		if cfg.Distributed.Enabled {
263
			checker, err = balancer.NewCheckEngineWithBalancer(
264
				context.Background(),
265
				checkEngine,
266
				schemaReader,
267
				&cfg.Distributed,
268
				&cfg.Server.GRPC,
269
				&cfg.Authn,
270
			)
271
			// Handle potential error during checker creation.
272
			if err != nil {
273
				return err
274
			}
275
			checker = cache.NewCheckEngineWithCache(checker, schemaReader, engineKeyCache)
276
		} else {
277
			checker = cache.NewCheckEngineWithCache(checkEngine, schemaReader, engineKeyCache)
278
		}
279
280
		// Create a localChecker which directly checks without considering distributed setup.
281
		// This also includes caching capabilities.
282
		localChecker := cache.NewCheckEngineWithCache(
283
			checkEngine,
284
			schemaReader,
285
			engineKeyCache,
286
		)
287
288
		// Initialize the lookupEngine, which is responsible for looking up certain entities or values.
289
		lookupEngine := engines.NewLookupEngine(
290
			checker,
291
			schemaReader,
292
			dataReader,
293
			// Set concurrency limit based on the configuration.
294
			engines.LookupConcurrencyLimit(cfg.Service.Permission.BulkLimit),
295
		)
296
297
		// Initialize the subjectPermissionEngine, responsible for handling subject permissions.
298
		subjectPermissionEngine := engines.NewSubjectPermission(
299
			checker,
300
			schemaReader,
301
			// Set concurrency limit for the subject permission checks.
302
			engines.SubjectPermissionConcurrencyLimit(cfg.Service.Permission.ConcurrencyLimit),
303
		)
304
305
		// Create a new invoker that is used to directly call various functions or engines.
306
		// It encompasses the schema, data, checker, and other engines.
307
		invoker := invoke.NewDirectInvoker(
308
			schemaReader,
309
			dataReader,
310
			checker,
311
			expandEngine,
312
			lookupEngine,
313
			subjectPermissionEngine,
314
			meter,
315
		)
316
317
		// Associate the invoker with the checkEngine.
318
		checkEngine.SetInvoker(invoker)
319
320
		// Create a local invoker for local operations.
321
		localInvoker := invoke.NewDirectInvoker(
322
			schemaReader,
323
			dataReader,
324
			localChecker,
325
			expandEngine,
326
			lookupEngine,
327
			subjectPermissionEngine,
328
			meter,
329
		)
330
331
		// Initialize the container which brings together multiple components such as the invoker, data readers/writers, and schema handlers.
332
		container := servers.NewContainer(
333
			invoker,
334
			dataReader,
335
			dataWriter,
336
			bundleReader,
337
			bundleWriter,
338
			schemaReader,
339
			schemaWriter,
340
			tenantReader,
341
			tenantWriter,
342
			watcher,
343
		)
344
345
		// Create an error group with the provided context
346
		var g *errgroup.Group
347
		g, ctx = errgroup.WithContext(ctx)
348
349
		// Add the container.Run function to the error group
350
		g.Go(func() error {
351
			return container.Run(
352
				ctx,
353
				&cfg.Server,
354
				&cfg.Distributed,
355
				&cfg.Authn,
356
				&cfg.Profiler,
357
				localInvoker,
358
			)
359
		})
360
361
		// Wait for the error group to finish and log any errors
362
		if err = g.Wait(); err != nil {
363
			slog.Error(err.Error())
364
		}
365
366
		return nil
367
	}
368
}
369
370
// getLogLevel converts a string representation of log level to its corresponding slog.Level value.
371
func getLogLevel(level string) slog.Level {
372
	switch level {
373
	case "info":
374
		return slog.LevelInfo // Return Info level
375
	case "warn":
376
		return slog.LevelWarn // Return Warning level
377
	case "error":
378
		return slog.LevelError // Return Error level
379
	case "debug":
380
		return slog.LevelDebug // Return Debug level
381
	default:
382
		return slog.LevelInfo // Default to Info level if unrecognized
383
	}
384
}
385