Passed
Pull Request — master (#885)
by Tolga
02:43
created

balancer.NewCheckEngineWithBalancer   C

Complexity

Conditions 9

Size

Total Lines 56
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 40
nop 6
dl 0
loc 56
rs 6.5866
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package balancer
2
3
import (
4
	"context"
5
	"encoding/hex"
6
	"fmt"
7
	"log/slog"
8
	"time"
9
10
	"github.com/cespare/xxhash/v2"
11
	"google.golang.org/grpc"
12
	"google.golang.org/grpc/credentials"
13
	"google.golang.org/grpc/credentials/insecure"
14
15
	"github.com/Permify/permify/internal/config"
16
	"github.com/Permify/permify/internal/engines"
17
	"github.com/Permify/permify/internal/invoke"
18
	"github.com/Permify/permify/internal/storage"
19
20
	"github.com/Permify/permify/pkg/balancer"
21
	base "github.com/Permify/permify/pkg/pb/base/v1"
22
)
23
24
var grpcServicePolicy = fmt.Sprintf(`{
25
		"loadBalancingPolicy": "%s"
26
	}`, balancer.Policy)
27
28
// Balancer is a wrapper around the balancer hash implementation that
29
type Balancer struct {
30
	schemaReader storage.SchemaReader
31
	checker      invoke.Check
32
	client       base.PermissionClient
33
	options      []grpc.DialOption
34
}
35
36
// NewCheckEngineWithBalancer creates a new check engine with a load balancer.
37
// It takes a Check interface, SchemaReader, distributed config, gRPC config, and authn config as input.
38
// It returns a Check interface and an error if any.
39
func NewCheckEngineWithBalancer(
40
	ctx context.Context,
41
	checker invoke.Check,
42
	schemaReader storage.SchemaReader,
43
	dst *config.Distributed,
44
	srv *config.GRPC,
45
	authn *config.Authn,
46
) (invoke.Check, error) {
47
	var (
48
		creds    credentials.TransportCredentials
49
		options  []grpc.DialOption
50
		isSecure bool
51
		err      error
52
	)
53
54
	// Set up TLS credentials if paths are provided
55
	if srv.TLSConfig.CertPath != "" && srv.TLSConfig.KeyPath != "" {
56
		isSecure = true
57
		creds, err = credentials.NewClientTLSFromFile(srv.TLSConfig.CertPath, srv.TLSConfig.KeyPath)
58
		if err != nil {
59
			return nil, fmt.Errorf("could not load TLS certificate: %s", err)
60
		}
61
	} else {
62
		creds = insecure.NewCredentials()
63
	}
64
65
	// Append common options
66
	options = append(
67
		options,
68
		grpc.WithDefaultServiceConfig(grpcServicePolicy),
69
		grpc.WithTransportCredentials(creds),
70
	)
71
72
	// Handle authentication if enabled
73
	if authn != nil && authn.Enabled {
74
		token, err := setupAuthn(ctx, authn)
75
		if err != nil {
76
			return nil, err
77
		}
78
		if isSecure {
79
			options = append(options, grpc.WithPerRPCCredentials(secureTokenCredentials{"authorization": "Bearer " + token}))
80
		} else {
81
			options = append(options, grpc.WithPerRPCCredentials(nonSecureTokenCredentials{"authorization": "Bearer " + token}))
82
		}
83
	}
84
85
	conn, err := grpc.Dial(dst.Address, options...)
86
	if err != nil {
87
		return nil, err
88
	}
89
90
	return &Balancer{
91
		schemaReader: schemaReader,
92
		checker:      checker,
93
		client:       base.NewPermissionClient(conn),
94
	}, nil
95
}
96
97
// Check performs a permission check using the schema reader to obtain
98
// entity definitions, then distributes the request based on a generated key.
99
func (c *Balancer) Check(ctx context.Context, request *base.PermissionCheckRequest) (*base.PermissionCheckResponse, error) {
100
	// Fetch the EntityDefinition for the given tenant, entity type, and schema version.
101
	en, _, err := c.schemaReader.ReadEntityDefinition(ctx, request.GetTenantId(), request.GetEntity().GetType(), request.GetMetadata().GetSchemaVersion())
102
	if err != nil {
103
		slog.Error(err.Error())
104
		// If an error occurs while reading the entity definition, deny permission and return the error.
105
		return &base.PermissionCheckResponse{
106
			Can: base.CheckResult_CHECK_RESULT_DENIED,
107
			Metadata: &base.PermissionCheckResponseMetadata{
108
				CheckCount: 0,
109
			},
110
		}, err
111
	}
112
113
	isRelational := engines.IsRelational(en, request.GetPermission())
114
115
	// Create a new xxhash instance.
116
	h := xxhash.New()
117
118
	// Generate a unique key for the request based on its relational state.
119
	// This key helps in distributing the request.
120
	_, err = h.Write([]byte(engines.GenerateKey(request, isRelational)))
121
	if err != nil {
122
		slog.Error(err.Error())
123
		return &base.PermissionCheckResponse{
124
			Can: base.CheckResult_CHECK_RESULT_DENIED,
125
			Metadata: &base.PermissionCheckResponseMetadata{
126
				CheckCount: 0,
127
			},
128
		}, err
129
	}
130
	k := hex.EncodeToString(h.Sum(nil))
131
132
	// Add a timeout of 2 seconds to the context and also set the generated key as a value.
133
	withTimeout, cancel := context.WithTimeout(context.WithValue(ctx, balancer.Key, k), 4*time.Second)
134
	defer cancel()
135
136
	// Logging the intention to forward the request to the underlying client.
137
	slog.Debug("Forwarding request with key to the underlying client", slog.String("key", k))
138
139
	// Perform the actual permission check by making a call to the underlying client.
140
	response, err := c.client.Check(withTimeout, request)
141
	if err != nil {
142
		// Log the error and return it.
143
		slog.Error(err.Error())
144
		return &base.PermissionCheckResponse{
145
			Can: base.CheckResult_CHECK_RESULT_DENIED,
146
			Metadata: &base.PermissionCheckResponseMetadata{
147
				CheckCount: 0,
148
			},
149
		}, err
150
	}
151
152
	// Return the response received from the client.
153
	return response, nil
154
}
155