Passed
Pull Request — master (#1691)
by
unknown
03:16
created

engines.*LookupEngine.LookupEntities   C

Complexity

Conditions 8

Size

Total Lines 79
Code Lines 48

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 48
nop 2
dl 0
loc 79
rs 6.8351
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package engines
2
3
import (
4
	"context"
5
	"errors"
6
	"github.com/Permify/permify/internal/invoke"
7
	"github.com/Permify/permify/internal/schema"
8
	"github.com/Permify/permify/internal/storage"
9
	base "github.com/Permify/permify/pkg/pb/base/v1"
10
	"sync"
11
)
12
13
type LookupEngine struct {
14
	// schemaReader is responsible for reading schema information
15
	schemaReader storage.SchemaReader
16
	// schemaReader is responsible for reading data
17
	dataReader storage.DataReader
18
	// checkEngine is responsible for performing permission checks
19
	checkEngine invoke.Check
20
	// schemaMap is a map that keeps track of schema versions
21
	schemaMap sync.Map
22
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
23
	concurrencyLimit int
24
}
25
26
func NewLookupEngine(
27
	check invoke.Check,
28
	schemaReader storage.SchemaReader,
29
	dataReader storage.DataReader,
30
	opts ...LookupOption,
31
) *LookupEngine {
32
	engine := &LookupEngine{
33
		schemaReader:     schemaReader,
34
		checkEngine:      check,
35
		dataReader:       dataReader,
36
		schemaMap:        sync.Map{},
37
		concurrencyLimit: _defaultConcurrencyLimit,
38
	}
39
40
	// options
41
	for _, opt := range opts {
42
		opt(engine)
43
	}
44
45
	return engine
46
}
47
48
// LookupEntity performs a permission check on a set of entities and returns a response
49
// containing the IDs of the entities that have the requested permission.
50
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
51
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
52
	var mu sync.Mutex
53
	var entityIDs []string
54
	var ct string
55
56
	size := request.GetPageSize()
57
	if size == 0 {
58
		size = 1000
59
	}
60
61
	// Callback function which is called for each entity. If the entity passes the permission check,
62
	// the entity ID is appended to the entityIDs slice.
63
	callback := func(entityID, permission, token string) {
64
		mu.Lock()         // Safeguard access to the shared slice with a mutex
65
		defer mu.Unlock() // Ensure the lock is released after appending the ID
66
		entityIDs = append(entityIDs, entityID)
67
		ct = token
68
	}
69
70
	// Create and start BulkChecker. It performs permission checks in parallel.
71
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
72
73
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
74
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
75
76
	// Retrieve the schema of the entity based on the tenantId and schema version
77
	var sc *base.SchemaDefinition
78
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
79
	if err != nil {
80
		return nil, err
81
	}
82
83
	// Create a map to keep track of visited entities
84
	visits := &VisitsMap{}
85
86
	permissionChecks := &VisitsMap{}
87
88
	// Perform an entity filter operation based on the permission request
89
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
90
		TenantId: request.GetTenantId(),
91
		Metadata: &base.PermissionEntityFilterRequestMetadata{
92
			SnapToken:     request.GetMetadata().GetSnapToken(),
93
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
94
			Depth:         request.GetMetadata().GetDepth(),
95
		},
96
		Entrances: []*base.Entrance{
97
			{
98
				Type:  request.GetEntityType(),
99
				Value: request.GetPermission(),
100
			},
101
		},
102
		Subject: request.GetSubject(),
103
		Context: request.GetContext(),
104
		Scope:   request.GetScope(),
105
		Cursor:  request.GetContinuousToken(),
106
	}, visits, publisher, permissionChecks)
107
	if err != nil {
108
		return nil, err
109
	}
110
111
	// At this point, the BulkChecker has collected and sorted requests
112
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
113
	if err != nil {
114
		return nil, err
115
	}
116
117
	// Return response containing allowed entity IDs
118
	return &base.PermissionLookupEntityResponse{
119
		EntityIds:       entityIDs,
120
		ContinuousToken: ct,
121
	}, nil
122
}
123
124
// LookupEntities performs a permission check on a set of entities and returns a response
125
// containing the IDs of the entities that have the requested permissions.
126
func (engine *LookupEngine) LookupEntities(ctx context.Context, request *base.PermissionsLookupEntityRequest) (response *base.PermissionsLookupEntityResponse, err error) {
127
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
128
	var mu sync.Mutex
129
	entityIDsByPermission := make(map[string]*base.EntityIds)
130
	var ct string
131
132
	size := request.GetPageSize()
133
	if size == 0 {
134
		size = 1000
135
	}
136
137
	// Callback function which is called for each entity. If the entity passes the permission check,
138
	// the entity ID is appended to the entityIDs slice.
139
	callback := func(entityID, permission, token string) {
140
		mu.Lock()         // Safeguard access to the shared slice with a mutex
141
		defer mu.Unlock() // Ensure the lock is released after appending the ID
142
		if _, exists := entityIDsByPermission[permission]; !exists {
143
			// If not, initialize it with an empty EntityIds struct
144
			entityIDsByPermission[permission] = &base.EntityIds{Ids: []string{}}
145
		}
146
		entityIDsByPermission[permission].Ids = append(entityIDsByPermission[permission].Ids, entityID)
147
		ct = token
148
	}
149
150
	// Create and start BulkChecker. It performs permission checks in parallel.
151
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
152
153
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
154
	publisher := NewBulkEntityPublisher(ctx, request, checker)
155
156
	// Retrieve the schema of the entity based on the tenantId and schema version
157
	var sc *base.SchemaDefinition
158
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
159
	if err != nil {
160
		return nil, err
161
	}
162
163
	// Create a map to keep track of visited entities
164
	visits := &VisitsMap{}
165
	permissionChecks := &VisitsMap{}
166
167
	entrances := make([]*base.Entrance, 0)
168
169
	for _, permission := range request.GetPermissions() {
170
		entrances = append(entrances, &base.Entrance{
171
			Type:  request.GetEntityType(),
172
			Value: permission,
173
		})
174
	}
175
176
	// Perform an entity filter operation based on the permissions request
177
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
178
		TenantId: request.GetTenantId(),
179
		Metadata: &base.PermissionEntityFilterRequestMetadata{
180
			SnapToken:     request.GetMetadata().GetSnapToken(),
181
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
182
			Depth:         request.GetMetadata().GetDepth(),
183
		},
184
		Entrances: entrances,
185
		Subject:   request.GetSubject(),
186
		Context:   request.GetContext(),
187
		Scope:     request.GetScope(),
188
		Cursor:    request.GetContinuousToken(),
189
	}, visits, publisher, permissionChecks)
190
	if err != nil {
191
		return nil, err
192
	}
193
194
	// At this point, the BulkChecker has collected and sorted requests
195
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
196
	if err != nil {
197
		return nil, err
198
	}
199
200
	// Return response containing allowed entity IDs
201
	return &base.PermissionsLookupEntityResponse{
202
		EntityIds:       entityIDsByPermission,
203
		ContinuousToken: ct,
204
	}, nil
205
}
206
207
// LookupEntityStream performs a permission check on a set of entities and streams the results
208
// containing the IDs of the entities that have the requested permission.
209
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
210
	size := request.GetPageSize()
211
	if size == 0 {
212
		size = 1000
213
	}
214
215
	// Define a callback function that will be called for each entity that passes the permission check.
216
	// If the check result is allowed, it sends the entity ID to the server stream.
217
	callback := func(entityID, permission, token string) {
218
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
219
			EntityId:        entityID,
220
			ContinuousToken: token,
221
		})
222
		// If there is an error in sending the response, the function will return
223
		if err != nil {
224
			return
225
		}
226
	}
227
228
	// Create and start BulkChecker. It performs permission checks concurrently.
229
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
230
231
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
232
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
233
234
	// Retrieve the entity definition schema based on the tenantId and schema version
235
	var sc *base.SchemaDefinition
236
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
237
	if err != nil {
238
		return err
239
	}
240
241
	visits := &VisitsMap{}
242
	permissionChecks := &VisitsMap{}
243
244
	// Perform an entity filter operation based on the permission request
245
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
246
		TenantId: request.GetTenantId(),
247
		Metadata: &base.PermissionEntityFilterRequestMetadata{
248
			SnapToken:     request.GetMetadata().GetSnapToken(),
249
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
250
			Depth:         request.GetMetadata().GetDepth(),
251
		},
252
		Entrances: []*base.Entrance{
253
			{
254
				Type:  request.GetEntityType(),
255
				Value: request.GetPermission(),
256
			},
257
		},
258
		Subject: request.GetSubject(),
259
		Context: request.GetContext(),
260
		Cursor:  request.GetContinuousToken(),
261
	}, visits, publisher, permissionChecks)
262
	if err != nil {
263
		return err
264
	}
265
266
	err = checker.ExecuteRequests(size)
267
	if err != nil {
268
		return err
269
	}
270
271
	return nil
272
}
273
274
// LookupEntitiesStream performs a permission check on a set of entities and streams the results
275
// containing the IDs of the entities that have the requested permissions.
276
func (engine *LookupEngine) LookupEntitiesStream(ctx context.Context, request *base.PermissionsLookupEntityRequest, server base.Permission_LookupEntitiesStreamServer) (err error) {
277
	size := request.GetPageSize()
278
	if size == 0 {
279
		size = 1000
280
	}
281
282
	// Define a callback function that will be called for each entity that passes the permission check.
283
	// If the check result is allowed, it sends the entity ID to the server stream.
284
	callback := func(entityID, permission, token string) {
285
		err := server.Send(&base.PermissionsLookupEntityStreamResponse{
286
			EntityId:        entityID,
287
			Permission:      permission,
288
			ContinuousToken: token,
289
		})
290
		// If there is an error in sending the response, the function will return
291
		if err != nil {
292
			return
293
		}
294
	}
295
296
	// Create and start BulkChecker. It performs permission checks concurrently.
297
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
298
299
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
300
	publisher := NewBulkEntityPublisher(ctx, request, checker)
301
302
	// Retrieve the entity definition schema based on the tenantId and schema version
303
	var sc *base.SchemaDefinition
304
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
305
	if err != nil {
306
		return err
307
	}
308
309
	visits := &VisitsMap{}
310
	permissionChecks := &VisitsMap{}
311
312
	entrances := make([]*base.Entrance, 0)
313
314
	for _, permission := range request.GetPermissions() {
315
		entrances = append(entrances, &base.Entrance{
316
			Type:  request.GetEntityType(),
317
			Value: permission,
318
		})
319
	}
320
321
	// Perform an entity filter operation based on the permission request
322
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
323
		TenantId: request.GetTenantId(),
324
		Metadata: &base.PermissionEntityFilterRequestMetadata{
325
			SnapToken:     request.GetMetadata().GetSnapToken(),
326
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
327
			Depth:         request.GetMetadata().GetDepth(),
328
		},
329
		Entrances: entrances,
330
		Subject:   request.GetSubject(),
331
		Context:   request.GetContext(),
332
		Cursor:    request.GetContinuousToken(),
333
	}, visits, publisher, permissionChecks)
334
	if err != nil {
335
		return err
336
	}
337
338
	err = checker.ExecuteRequests(size)
339
	if err != nil {
340
		return err
341
	}
342
343
	return nil
344
}
345
346
// LookupSubject checks if a subject has a particular permission based on the schema and version.
347
// It returns a list of subjects that have the given permission.
348
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
349
	size := request.GetPageSize()
350
	if size == 0 {
351
		size = 1000
352
	}
353
354
	// Use a mutex to protect concurrent writes to the subjectIDs slice.
355
	var mu sync.Mutex
356
	var subjectIDs []string
357
	var ct string
358
359
	// Callback function to handle the results of permission checks.
360
	// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
361
	callback := func(subjectID, permission, token string) {
362
		mu.Lock()         // Lock to prevent concurrent modification of the slice.
363
		defer mu.Unlock() // Unlock after the ID is appended.
364
		subjectIDs = append(subjectIDs, subjectID)
365
		ct = token
366
	}
367
368
	// Create and initiate a BulkChecker to perform permission checks in parallel.
369
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_SUBJECT, callback, engine.concurrencyLimit)
370
371
	// Create and start a BulkPublisher to provide entities to the BulkChecker.
372
	publisher := NewBulkSubjectPublisher(ctx, request, checker)
373
374
	// Retrieve the schema of the entity based on the provided tenantId and schema version.
375
	var sc *base.SchemaDefinition
376
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
377
	if err != nil {
378
		// Return an error if there was an issue retrieving the schema.
379
		return nil, err
380
	}
381
382
	// Walk the entity schema to perform a permission check.
383
	err = schema.NewWalker(sc).Walk(request.GetEntity().GetType(), request.GetPermission())
384
	if err != nil {
385
		// If the error indicates the schema walk is unimplemented, handle it with a MassEntityFilter.
386
		if errors.Is(err, schema.ErrUnimplemented) {
387
			err = NewMassSubjectFilter(engine.dataReader).SubjectFilter(ctx, request, publisher)
388
			if err != nil {
389
				// Return an error if there was an issue with the subject filter.
390
				return nil, err
391
			}
392
		} else { // For other errors, simply return the error
393
			return nil, err
394
		}
395
	} else {
396
		// Use the schema-based subject filter to get the list of subjects with the requested permission.
397
		ids, err := NewSchemaBasedSubjectFilter(engine.schemaReader, engine.dataReader, SchemaBaseSubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
398
		if err != nil {
399
			return nil, err
400
		}
401
402
		for _, id := range ids {
403
			publisher.Publish(&base.Subject{
404
				Type:     request.GetSubjectReference().GetType(),
405
				Id:       id,
406
				Relation: request.GetSubjectReference().GetRelation(),
407
			}, &base.PermissionCheckRequestMetadata{
408
				SnapToken:     request.GetMetadata().GetSnapToken(),
409
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
410
				Depth:         request.GetMetadata().GetDepth(),
411
			}, request.GetContext(), base.CheckResult_CHECK_RESULT_ALLOWED)
412
		}
413
	}
414
415
	err = checker.ExecuteRequests(size)
416
	if err != nil {
417
		// Return an error if there was an issue with the subject filter.
418
		return nil, err
419
	}
420
421
	// Return the list of entity IDs that have the required permission.
422
	return &base.PermissionLookupSubjectResponse{
423
		SubjectIds:      subjectIDs,
424
		ContinuousToken: ct,
425
	}, nil
426
}
427
428
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
429
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
430
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
431
	// Create a unique cache key by combining the tenantID and schemaVersion.
432
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
433
	cacheKey := tenantID + "|" + schemaVersion
434
435
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
436
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
437
		// If the schema is present in the cache, cast it to its correct type and return.
438
		return sch.(*base.SchemaDefinition), nil
439
	}
440
441
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
442
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
443
	if err != nil {
444
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
445
		return nil, err
446
	}
447
448
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
449
	engine.schemaMap.Store(cacheKey, sch)
450
451
	// Return the freshly read schema.
452
	return sch, nil
453
}
454