Passed
Pull Request — master (#1465)
by
unknown
02:36
created

engines.*LookupEngine.LookupEntity   F

Complexity

Conditions 14

Size

Total Lines 105
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 61
nop 2
dl 0
loc 105
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like engines.*LookupEngine.LookupEntity often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package engines
2
3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
8
	"github.com/Permify/permify/internal/invoke"
9
	"github.com/Permify/permify/internal/schema"
10
	"github.com/Permify/permify/internal/storage"
11
	base "github.com/Permify/permify/pkg/pb/base/v1"
12
)
13
14
type LookupEngine struct {
15
	// schemaReader is responsible for reading schema information
16
	schemaReader storage.SchemaReader
17
	// schemaReader is responsible for reading data
18
	dataReader storage.DataReader
19
	// checkEngine is responsible for performing permission checks
20
	checkEngine invoke.Check
21
	// schemaMap is a map that keeps track of schema versions
22
	schemaMap sync.Map
23
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
24
	concurrencyLimit int
25
}
26
27
func NewLookupEngine(
28
	check invoke.Check,
29
	schemaReader storage.SchemaReader,
30
	dataReader storage.DataReader,
31
	opts ...LookupOption,
32
) *LookupEngine {
33
	engine := &LookupEngine{
34
		schemaReader:     schemaReader,
35
		checkEngine:      check,
36
		dataReader:       dataReader,
37
		schemaMap:        sync.Map{},
38
		concurrencyLimit: _defaultConcurrencyLimit,
39
	}
40
41
	// options
42
	for _, opt := range opts {
43
		opt(engine)
44
	}
45
46
	return engine
47
}
48
49
// LookupEntity performs a permission check on a set of entities and returns a response
50
// containing the IDs of the entities that have the requested permission.
51
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
52
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
53
	var mu sync.Mutex
54
	entityIDsByPermission := make(map[string]*base.EntityIds)
55
	var ct string
56
57
	size := request.GetPageSize()
58
	if size == 0 {
59
		size = 1000
60
	}
61
62
	// Callback function which is called for each entity. If the entity passes the permission check,
63
	// the entity ID is appended to the entityIDs slice.
64
	callback := func(entityID, permission string, token string) {
65
		mu.Lock()         // Safeguard access to the shared slice with a mutex
66
		defer mu.Unlock() // Ensure the lock is released after appending the ID
67
		if _, exists := entityIDsByPermission[permission]; !exists {
68
			// If not, initialize it with an empty EntityIds struct
69
			entityIDsByPermission[permission] = &base.EntityIds{Ids: []string{}}
70
		}
71
		entityIDsByPermission[permission].Ids = append(entityIDsByPermission[permission].Ids, entityID)
72
		ct = token
73
	}
74
75
	// Create and start BulkChecker. It performs permission checks in parallel.
76
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
77
78
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
79
	publisher := NewBulkEntityPublisher(ctx, request, checker)
80
81
	// Retrieve the schema of the entity based on the tenantId and schema version
82
	var sc *base.SchemaDefinition
83
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
84
	if err != nil {
85
		return nil, err
86
	}
87
88
	// Perform a walk of the entity schema for the permission check
89
	validPermissions := make([]string, 0)
90
91
	for _, permission := range request.GetPermissions() {
92
		err = schema.NewWalker(sc).Walk(request.GetEntityType(), permission)
93
		if err == nil {
94
			validPermissions = append(validPermissions, permission)
95
		}
96
	}
97
98
	if err != nil && len(validPermissions) == 0 {
99
		// If the error is unimplemented, handle it with a MassEntityFilter
100
		if errors.Is(err, schema.ErrUnimplemented) {
101
			err = NewMassEntityFilter(engine.dataReader).EntityFilter(ctx, request, publisher, &ERMap{})
102
			if err != nil {
103
				return nil, err
104
			}
105
		} else { // For other errors, simply return the error
106
			return nil, err
107
		}
108
	} else {
109
110
		request.Permissions = validPermissions
111
		// Create a map to keep track of visited entities
112
		visits := &ERMap{}
113
		// Create
114
115
		permissionChecks := &ERMap{}
116
117
		entityReferences := make([]*base.RelationReference, 0)
118
119
		for _, permisson := range request.GetPermissions() {
120
			entityReferences = append(entityReferences, &base.RelationReference{
121
				Type:     request.GetEntityType(),
122
				Relation: permisson,
123
			})
124
		}
125
126
		// Perform an entity filter operation based on the permission request
127
		err = NewSchemaBasedEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
128
			TenantId: request.GetTenantId(),
129
			Metadata: &base.PermissionEntityFilterRequestMetadata{
130
				SnapToken:     request.GetMetadata().GetSnapToken(),
131
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
132
				Depth:         request.GetMetadata().GetDepth(),
133
			},
134
			EntityReferences: entityReferences,
135
			Subject:          request.GetSubject(),
136
			Context:          request.GetContext(),
137
			Cursor:           request.GetContinuousToken(),
138
		}, visits, publisher, permissionChecks)
139
140
		if err != nil {
141
			return nil, err
142
		}
143
	}
144
145
	// At this point, the BulkChecker has collected and sorted requests
146
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
147
	if err != nil {
148
		return nil, err
149
	}
150
151
	// Return response containing allowed entity IDs
152
	return &base.PermissionLookupEntityResponse{
153
		EntityIds:       entityIDsByPermission,
154
		ContinuousToken: ct,
155
	}, nil
156
}
157
158
// LookupEntityStream performs a permission check on a set of entities and streams the results
159
// containing the IDs of the entities that have the requested permission.
160
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
161
	size := request.GetPageSize()
162
	if size == 0 {
163
		size = 1000
164
	}
165
166
	// Define a callback function that will be called for each entity that passes the permission check.
167
	// If the check result is allowed, it sends the entity ID to the server stream.
168
	callback := func(entityID, permission string, token string) {
169
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
170
			EntityId:        entityID,
171
			Permission:      permission,
172
			ContinuousToken: token,
173
		})
174
		// If there is an error in sending the response, the function will return
175
		if err != nil {
176
			return
177
		}
178
	}
179
180
	// Create and start BulkChecker. It performs permission checks concurrently.
181
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
182
183
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
184
	publisher := NewBulkEntityPublisher(ctx, request, checker)
185
186
	// Retrieve the entity definition schema based on the tenantId and schema version
187
	var sc *base.SchemaDefinition
188
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
189
	if err != nil {
190
		return err
191
	}
192
193
	// Perform a permission check walk through the entity schema
194
	for _, permission := range request.GetPermissions() {
195
		err = schema.NewWalker(sc).Walk(request.GetEntityType(), permission)
196
		// If error exists in permission check walk
197
		if err != nil {
198
			// If the error is unimplemented, handle it with a MassEntityFilter
199
			if errors.Is(err, schema.ErrUnimplemented) {
200
				err = NewMassEntityFilter(engine.dataReader).EntityFilter(ctx, request, publisher, &ERMap{})
201
				if err != nil {
202
					return err
203
				}
204
			} else { // For other types of errors, simply return the error
205
				return err
206
			}
207
		} else { // If there was no error in permission check walk
208
			visits := &ERMap{}
209
			permissionChecks := &ERMap{}
210
211
			// Perform an entity filter operation based on the permission request
212
			err = NewSchemaBasedEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
213
				TenantId: request.GetTenantId(),
214
				Metadata: &base.PermissionEntityFilterRequestMetadata{
215
					SnapToken:     request.GetMetadata().GetSnapToken(),
216
					SchemaVersion: request.GetMetadata().GetSchemaVersion(),
217
					Depth:         request.GetMetadata().GetDepth(),
218
				},
219
				EntityReferences: []*base.RelationReference{
220
					{
221
						Type:     request.GetEntityType(),
222
						Relation: permission,
223
					},
224
				},
225
				Subject: request.GetSubject(),
226
				Context: request.GetContext(),
227
			}, visits, publisher, permissionChecks)
228
229
			if err != nil {
230
				return err
231
			}
232
			if err != nil {
233
				return err
234
			}
235
236
		}
237
	}
238
239
	err = checker.ExecuteRequests(size)
240
	if err != nil {
241
		return err
242
	}
243
244
	return nil
245
}
246
247
// LookupSubject checks if a subject has a particular permission based on the schema and version.
248
// It returns a list of subjects that have the given permission.
249
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
250
	size := request.GetPageSize()
251
	if size == 0 {
252
		size = 1000
253
	}
254
255
	// Use a mutex to protect concurrent writes to the subjectIDs slice.
256
	var mu sync.Mutex
257
	var subjectIDs []string
258
	var ct string
259
260
	// Callback function to handle the results of permission checks.
261
	// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
262
	callback := func(subjectID, permission string, token string) {
263
		mu.Lock()         // Lock to prevent concurrent modification of the slice.
264
		defer mu.Unlock() // Unlock after the ID is appended.
265
		subjectIDs = append(subjectIDs, subjectID)
266
		ct = token
267
	}
268
269
	// Create and initiate a BulkChecker to perform permission checks in parallel.
270
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_SUBJECT, callback, engine.concurrencyLimit)
271
272
	// Create and start a BulkPublisher to provide entities to the BulkChecker.
273
	publisher := NewBulkSubjectPublisher(ctx, request, checker)
274
275
	// Retrieve the schema of the entity based on the provided tenantId and schema version.
276
	var sc *base.SchemaDefinition
277
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
278
	if err != nil {
279
		// Return an error if there was an issue retrieving the schema.
280
		return nil, err
281
	}
282
283
	// Walk the entity schema to perform a permission check.
284
	err = schema.NewWalker(sc).Walk(request.GetEntity().GetType(), request.GetPermission())
285
	if err != nil {
286
		// If the error indicates the schema walk is unimplemented, handle it with a MassEntityFilter.
287
		if errors.Is(err, schema.ErrUnimplemented) {
288
289
			// Use a mutex to protect concurrent writes to the subjectIDs slice.
290
			var mu sync.Mutex
291
			var subjectIDs []string
292
293
			// Callback function to handle the results of permission checks.
294
			// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
295
			callback := func(subjectID, permission string, result base.CheckResult) {
296
				if result == base.CheckResult_CHECK_RESULT_ALLOWED {
297
					mu.Lock()         // Lock to prevent concurrent modification of the slice.
298
					defer mu.Unlock() // Unlock after the ID is appended.
299
					subjectIDs = append(subjectIDs, subjectID)
300
				}
301
			}
302
303
			// Create and initiate a BulkChecker to perform permission checks in parallel.
304
			checker := NewBulkChecker(ctx, engine.checkEngine, callback, engine.concurrencyLimit)
305
			checker.Start(BULK_SUBJECT)
306
307
			// Create and start a BulkPublisher to provide entities to the BulkChecker.
308
			publisher := NewBulkSubjectPublisher(ctx, request, checker)
309
310
			err = NewMassSubjectFilter(engine.dataReader).SubjectFilter(ctx, request, publisher)
311
			if err != nil {
312
				// Return an error if there was an issue with the subject filter.
313
				return nil, err
314
			}
315
		} else { // For other errors, simply return the error
316
			return nil, err
317
		}
318
	} else {
319
		// Use the schema-based subject filter to get the list of subjects with the requested permission.
320
		ids, err := NewSchemaBasedSubjectFilter(engine.schemaReader, engine.dataReader, SchemaBaseSubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
321
		if err != nil {
322
			return nil, err
323
		}
324
325
		for _, id := range ids {
326
			publisher.Publish(&base.Subject{
327
				Type:     request.GetSubjectReference().GetType(),
328
				Id:       id,
329
				Relation: request.GetSubjectReference().GetRelation(),
330
			}, &base.PermissionCheckRequestMetadata{
331
				SnapToken:     request.GetMetadata().GetSnapToken(),
332
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
333
				Depth:         request.GetMetadata().GetDepth(),
334
			}, request.GetContext(), base.CheckResult_CHECK_RESULT_ALLOWED)
335
		}
336
	}
337
338
	err = checker.ExecuteRequests(size)
339
	if err != nil {
340
		// Return an error if there was an issue with the subject filter.
341
		return nil, err
342
	}
343
344
	// Return the list of entity IDs that have the required permission.
345
	return &base.PermissionLookupSubjectResponse{
346
		SubjectIds:      subjectIDs,
347
		ContinuousToken: ct,
348
	}, nil
349
}
350
351
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
352
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
353
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
354
	// Create a unique cache key by combining the tenantID and schemaVersion.
355
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
356
	cacheKey := tenantID + "|" + schemaVersion
357
358
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
359
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
360
		// If the schema is present in the cache, cast it to its correct type and return.
361
		return sch.(*base.SchemaDefinition), nil
362
	}
363
364
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
365
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
366
	if err != nil {
367
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
368
		return nil, err
369
	}
370
371
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
372
	engine.schemaMap.Store(cacheKey, sch)
373
374
	// Return the freshly read schema.
375
	return sch, nil
376
}
377