Passed
Push — master ( eaef63...5dc383 )
by Tolga
02:23 queued 22s
created

engines.*LookupEngine.LookupSubject   D

Complexity

Conditions 12

Size

Total Lines 86
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 50
nop 2
dl 0
loc 86
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like engines.*LookupEngine.LookupSubject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package engines
2
3
import (
4
	"context"
5
	"slices"
6
	"sort"
7
	"sync"
8
9
	"github.com/Permify/permify/internal/invoke"
10
	"github.com/Permify/permify/internal/storage"
11
	"github.com/Permify/permify/internal/storage/context/utils"
12
	"github.com/Permify/permify/pkg/database"
13
	base "github.com/Permify/permify/pkg/pb/base/v1"
14
)
15
16
type LookupEngine struct {
17
	// schemaReader is responsible for reading schema information
18
	schemaReader storage.SchemaReader
19
	// schemaReader is responsible for reading data
20
	dataReader storage.DataReader
21
	// checkEngine is responsible for performing permission checks
22
	checkEngine invoke.Check
23
	// schemaMap is a map that keeps track of schema versions
24
	schemaMap sync.Map
25
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
26
	concurrencyLimit int
27
}
28
29
func NewLookupEngine(
30
	check invoke.Check,
31
	schemaReader storage.SchemaReader,
32
	dataReader storage.DataReader,
33
	opts ...LookupOption,
34
) *LookupEngine {
35
	engine := &LookupEngine{
36
		schemaReader:     schemaReader,
37
		checkEngine:      check,
38
		dataReader:       dataReader,
39
		schemaMap:        sync.Map{},
40
		concurrencyLimit: _defaultConcurrencyLimit,
41
	}
42
43
	// options
44
	for _, opt := range opts {
45
		opt(engine)
46
	}
47
48
	return engine
49
}
50
51
// LookupEntity performs a permission check on a set of entities and returns a response
52
// containing the IDs of the entities that have the requested permission.
53
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
54
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
55
	var mu sync.Mutex
56
	var entityIDs []string
57
	var ct string
58
59
	size := request.GetPageSize()
60
	if size == 0 {
61
		size = 1000
62
	}
63
64
	// Callback function which is called for each entity. If the entity passes the permission check,
65
	// the entity ID is appended to the entityIDs slice.
66
	callback := func(entityID, token string) {
67
		mu.Lock()         // Safeguard access to the shared slice with a mutex
68
		defer mu.Unlock() // Ensure the lock is released after appending the ID
69
		entityIDs = append(entityIDs, entityID)
70
		ct = token
71
	}
72
73
	// Create and start BulkChecker. It performs permission checks in parallel.
74
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
75
76
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
77
	publisher := NewBulkEntityPublisher(ctx, request, checker)
78
79
	// Retrieve the schema of the entity based on the tenantId and schema version
80
	var sc *base.SchemaDefinition
81
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
82
	if err != nil {
83
		return nil, err
84
	}
85
86
	// Create a map to keep track of visited entities
87
	visits := &VisitsMap{}
88
89
	// Perform an entity filter operation based on the permission request
90
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
91
		TenantId: request.GetTenantId(),
92
		Metadata: &base.PermissionEntityFilterRequestMetadata{
93
			SnapToken:     request.GetMetadata().GetSnapToken(),
94
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
95
			Depth:         request.GetMetadata().GetDepth(),
96
		},
97
		Entrance: &base.Entrance{
98
			Type:  request.GetEntityType(),
99
			Value: request.GetPermission(),
100
		},
101
		Subject: request.GetSubject(),
102
		Context: request.GetContext(),
103
		Scope:   request.GetScope(),
104
		Cursor:  request.GetContinuousToken(),
105
	}, visits, publisher)
106
	if err != nil {
107
		return nil, err
108
	}
109
110
	// At this point, the BulkChecker has collected and sorted requests
111
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
112
	if err != nil {
113
		return nil, err
114
	}
115
116
	// Return response containing allowed entity IDs
117
	return &base.PermissionLookupEntityResponse{
118
		EntityIds:       entityIDs,
119
		ContinuousToken: ct,
120
	}, nil
121
}
122
123
// LookupEntityStream performs a permission check on a set of entities and streams the results
124
// containing the IDs of the entities that have the requested permission.
125
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
126
	size := request.GetPageSize()
127
	if size == 0 {
128
		size = 1000
129
	}
130
131
	// Define a callback function that will be called for each entity that passes the permission check.
132
	// If the check result is allowed, it sends the entity ID to the server stream.
133
	callback := func(entityID, token string) {
134
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
135
			EntityId:        entityID,
136
			ContinuousToken: token,
137
		})
138
		// If there is an error in sending the response, the function will return
139
		if err != nil {
140
			return
141
		}
142
	}
143
144
	// Create and start BulkChecker. It performs permission checks concurrently.
145
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
146
147
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
148
	publisher := NewBulkEntityPublisher(ctx, request, checker)
149
150
	// Retrieve the entity definition schema based on the tenantId and schema version
151
	var sc *base.SchemaDefinition
152
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
153
	if err != nil {
154
		return err
155
	}
156
157
	visits := &VisitsMap{}
158
159
	// Perform an entity filter operation based on the permission request
160
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
161
		TenantId: request.GetTenantId(),
162
		Metadata: &base.PermissionEntityFilterRequestMetadata{
163
			SnapToken:     request.GetMetadata().GetSnapToken(),
164
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
165
			Depth:         request.GetMetadata().GetDepth(),
166
		},
167
		Entrance: &base.Entrance{
168
			Type:  request.GetEntityType(),
169
			Value: request.GetPermission(),
170
		},
171
		Subject: request.GetSubject(),
172
		Context: request.GetContext(),
173
		Cursor:  request.GetContinuousToken(),
174
	}, visits, publisher)
175
	if err != nil {
176
		return err
177
	}
178
179
	err = checker.ExecuteRequests(size)
180
	if err != nil {
181
		return err
182
	}
183
184
	return nil
185
}
186
187
// LookupSubject checks if a subject has a particular permission based on the schema and version.
188
// It returns a list of subjects that have the given permission.
189
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
190
	size := request.GetPageSize()
191
	if size == 0 {
192
		size = 1000
193
	}
194
195
	var ids IdResponse
196
	var ct string
197
198
	// Use the schema-based subject filter to get the list of subjects with the requested permission.
199
	ids, err = NewSubjectFilter(engine.schemaReader, engine.dataReader, SubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
200
	if err != nil {
201
		return nil, err
202
	}
203
204
	// Check if the wildcard '*' is present in the ids.Ids
205
	if slices.Contains(ids.Ids, "*") {
206
		resp, pct, err := engine.dataReader.QueryUniqueSubjectReferences(
207
			ctx,
208
			request.GetTenantId(),
209
			request.GetSubjectReference(),
210
			ids.ExcludedIds,
211
			request.GetMetadata().GetSnapToken(),
212
			database.NewPagination(database.Size(size), database.Token(request.GetContinuousToken())),
213
		)
214
		if err != nil {
215
			return nil, err
216
		}
217
		ct = pct.String()
218
219
		// Return the list of entity IDs that have the required permission.
220
		return &base.PermissionLookupSubjectResponse{
221
			SubjectIds:      resp,
222
			ContinuousToken: ct,
223
		}, nil
224
	}
225
226
	// Sort the IDs
227
	sort.Strings(ids.Ids)
228
229
	// Initialize the start index as a string (to match token format)
230
	start := ""
231
232
	// Handle continuous token if present
233
	if request.GetContinuousToken() != "" {
234
		var t database.ContinuousToken
235
		t, err := utils.EncodedContinuousToken{Value: request.GetContinuousToken()}.Decode()
236
		if err != nil {
237
			return nil, err
238
		}
239
		start = t.(utils.ContinuousToken).Value
240
	}
241
242
	// Find the start index based on the continuous token
243
	startIndex := 0
244
	if start != "" {
245
		// Locate the position in the sorted list where the ID equals or exceeds the token value
246
		for i, id := range ids.Ids {
247
			if id >= start {
248
				startIndex = i
249
				break
250
			}
251
		}
252
	}
253
254
	// Convert size to int for compatibility with startIndex
255
	pageSize := int(size)
256
257
	// Calculate the end index based on the page size
258
	end := startIndex + pageSize
259
	if end > len(ids.Ids) {
260
		end = len(ids.Ids)
261
	}
262
263
	// Generate the next continuous token if there are more results
264
	if end < len(ids.Ids) {
265
		ct = utils.NewContinuousToken(ids.Ids[end]).Encode().String()
266
	} else {
267
		ct = ""
268
	}
269
270
	// Return the paginated and sorted list of IDs
271
	return &base.PermissionLookupSubjectResponse{
272
		SubjectIds:      ids.Ids[startIndex:end], // Slice the IDs based on pagination
273
		ContinuousToken: ct,                      // Return the next continuous token
274
	}, nil
275
}
276
277
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
278
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
279
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
280
	// Create a unique cache key by combining the tenantID and schemaVersion.
281
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
282
	cacheKey := tenantID + "|" + schemaVersion
283
284
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
285
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
286
		// If the schema is present in the cache, cast it to its correct type and return.
287
		return sch.(*base.SchemaDefinition), nil
288
	}
289
290
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
291
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
292
	if err != nil {
293
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
294
		return nil, err
295
	}
296
297
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
298
	engine.schemaMap.Store(cacheKey, sch)
299
300
	// Return the freshly read schema.
301
	return sch, nil
302
}
303