Passed
Pull Request — master (#1465)
by
unknown
02:46
created

engines.*LookupEngine.LookupEntitiesStream   B

Complexity

Conditions 8

Size

Total Lines 68
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 41
nop 3
dl 0
loc 68
rs 7.0293
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package engines
2
3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
8
	"github.com/Permify/permify/internal/invoke"
9
	"github.com/Permify/permify/internal/schema"
10
	"github.com/Permify/permify/internal/storage"
11
	base "github.com/Permify/permify/pkg/pb/base/v1"
12
)
13
14
type LookupEngine struct {
15
	// schemaReader is responsible for reading schema information
16
	schemaReader storage.SchemaReader
17
	// schemaReader is responsible for reading data
18
	dataReader storage.DataReader
19
	// checkEngine is responsible for performing permission checks
20
	checkEngine invoke.Check
21
	// schemaMap is a map that keeps track of schema versions
22
	schemaMap sync.Map
23
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
24
	concurrencyLimit int
25
}
26
27
func NewLookupEngine(
28
	check invoke.Check,
29
	schemaReader storage.SchemaReader,
30
	dataReader storage.DataReader,
31
	opts ...LookupOption,
32
) *LookupEngine {
33
	engine := &LookupEngine{
34
		schemaReader:     schemaReader,
35
		checkEngine:      check,
36
		dataReader:       dataReader,
37
		schemaMap:        sync.Map{},
38
		concurrencyLimit: _defaultConcurrencyLimit,
39
	}
40
41
	// options
42
	for _, opt := range opts {
43
		opt(engine)
44
	}
45
46
	return engine
47
}
48
49
// LookupEntity performs a permission check on a set of entities and returns a response
50
// containing the IDs of the entities that have the requested permission.
51
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
52
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
53
	var mu sync.Mutex
54
	var entityIDs []string
55
	var ct string
56
57
	size := request.GetPageSize()
58
	if size == 0 {
59
		size = 1000
60
	}
61
62
	// Callback function which is called for each entity. If the entity passes the permission check,
63
	// the entity ID is appended to the entityIDs slice.
64
	callback := func(entityID, permission, token string) {
65
		mu.Lock()         // Safeguard access to the shared slice with a mutex
66
		defer mu.Unlock() // Ensure the lock is released after appending the ID
67
		entityIDs = append(entityIDs, entityID)
68
		ct = token
69
	}
70
71
	// Create and start BulkChecker. It performs permission checks in parallel.
72
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
73
74
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
75
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
76
77
	// Retrieve the schema of the entity based on the tenantId and schema version
78
	var sc *base.SchemaDefinition
79
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
80
	if err != nil {
81
		return nil, err
82
	}
83
84
	// Create a map to keep track of visited entities
85
	visits := &VisitsMap{}
86
87
	permissionChecks := &VisitsMap{}
88
89
	// Perform an entity filter operation based on the permission request
90
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
91
		TenantId: request.GetTenantId(),
92
		Metadata: &base.PermissionEntityFilterRequestMetadata{
93
			SnapToken:     request.GetMetadata().GetSnapToken(),
94
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
95
			Depth:         request.GetMetadata().GetDepth(),
96
		},
97
		Entrances: []*base.Entrance{
98
			{
99
				Type:  request.GetEntityType(),
100
				Value: request.GetPermission(),
101
			},
102
		},
103
		Subject: request.GetSubject(),
104
		Context: request.GetContext(),
105
		Scope:   request.GetScope(),
106
		Cursor:  request.GetContinuousToken(),
107
	}, visits, publisher, permissionChecks)
108
	if err != nil {
109
		return nil, err
110
	}
111
112
	// At this point, the BulkChecker has collected and sorted requests
113
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
114
	if err != nil {
115
		return nil, err
116
	}
117
118
	// Return response containing allowed entity IDs
119
	return &base.PermissionLookupEntityResponse{
120
		EntityIds:       entityIDs,
121
		ContinuousToken: ct,
122
	}, nil
123
}
124
125
// LookupEntity performs a permission check on a set of entities and returns a response
126
// containing the IDs of the entities that have the requested permission.
127
func (engine *LookupEngine) LookupEntities(ctx context.Context, request *base.PermissionsLookupEntityRequest) (response *base.PermissionsLookupEntityResponse, err error) {
128
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
129
	var mu sync.Mutex
130
	entityIDsByPermission := make(map[string]*base.EntityIds)
131
	var ct string
132
133
	size := request.GetPageSize()
134
	if size == 0 {
135
		size = 1000
136
	}
137
138
	// Callback function which is called for each entity. If the entity passes the permission check,
139
	// the entity ID is appended to the entityIDs slice.
140
	callback := func(entityID, permission, token string) {
141
		mu.Lock()         // Safeguard access to the shared slice with a mutex
142
		defer mu.Unlock() // Ensure the lock is released after appending the ID
143
		if _, exists := entityIDsByPermission[permission]; !exists {
144
			// If not, initialize it with an empty EntityIds struct
145
			entityIDsByPermission[permission] = &base.EntityIds{Ids: []string{}}
146
		}
147
		entityIDsByPermission[permission].Ids = append(entityIDsByPermission[permission].Ids, entityID)
148
		ct = token
149
	}
150
151
	// Create and start BulkChecker. It performs permission checks in parallel.
152
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
153
154
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
155
	publisher := NewBulkEntityPublisher(ctx, request, checker)
156
157
	// Retrieve the schema of the entity based on the tenantId and schema version
158
	var sc *base.SchemaDefinition
159
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
160
	if err != nil {
161
		return nil, err
162
	}
163
164
	// Create a map to keep track of visited entities
165
	visits := &VisitsMap{}
166
	permissionChecks := &VisitsMap{}
167
168
	entrances := make([]*base.Entrance, 0)
169
170
	for _, permission := range request.GetPermissions() {
171
		entrances = append(entrances, &base.Entrance{
172
			Type:  request.GetEntityType(),
173
			Value: permission,
174
		})
175
	}
176
177
	// Perform an entity filter operation based on the permission request
178
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
179
		TenantId: request.GetTenantId(),
180
		Metadata: &base.PermissionEntityFilterRequestMetadata{
181
			SnapToken:     request.GetMetadata().GetSnapToken(),
182
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
183
			Depth:         request.GetMetadata().GetDepth(),
184
		},
185
		Entrances: entrances,
186
		Subject:   request.GetSubject(),
187
		Context:   request.GetContext(),
188
		Scope:     request.GetScope(),
189
		Cursor:    request.GetContinuousToken(),
190
	}, visits, publisher, permissionChecks)
191
	if err != nil {
192
		return nil, err
193
	}
194
195
	// At this point, the BulkChecker has collected and sorted requests
196
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
197
	if err != nil {
198
		return nil, err
199
	}
200
201
	// Return response containing allowed entity IDs
202
	return &base.PermissionsLookupEntityResponse{
203
		EntityIds:       entityIDsByPermission,
204
		ContinuousToken: ct,
205
	}, nil
206
}
207
208
// LookupEntityStream performs a permission check on a set of entities and streams the results
209
// containing the IDs of the entities that have the requested permission.
210
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
211
	size := request.GetPageSize()
212
	if size == 0 {
213
		size = 1000
214
	}
215
216
	// Define a callback function that will be called for each entity that passes the permission check.
217
	// If the check result is allowed, it sends the entity ID to the server stream.
218
	callback := func(entityID, permission, token string) {
219
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
220
			EntityId:        entityID,
221
			ContinuousToken: token,
222
		})
223
		// If there is an error in sending the response, the function will return
224
		if err != nil {
225
			return
226
		}
227
	}
228
229
	// Create and start BulkChecker. It performs permission checks concurrently.
230
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
231
232
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
233
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
234
235
	// Retrieve the entity definition schema based on the tenantId and schema version
236
	var sc *base.SchemaDefinition
237
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
238
	if err != nil {
239
		return err
240
	}
241
242
	visits := &VisitsMap{}
243
	permissionChecks := &VisitsMap{}
244
245
	// Perform an entity filter operation based on the permission request
246
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
247
		TenantId: request.GetTenantId(),
248
		Metadata: &base.PermissionEntityFilterRequestMetadata{
249
			SnapToken:     request.GetMetadata().GetSnapToken(),
250
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
251
			Depth:         request.GetMetadata().GetDepth(),
252
		},
253
		Entrances: []*base.Entrance{
254
			{
255
				Type:  request.GetEntityType(),
256
				Value: request.GetPermission(),
257
			},
258
		},
259
		Subject: request.GetSubject(),
260
		Context: request.GetContext(),
261
		Cursor:  request.GetContinuousToken(),
262
	}, visits, publisher, permissionChecks)
263
	if err != nil {
264
		return err
265
	}
266
267
	err = checker.ExecuteRequests(size)
268
	if err != nil {
269
		return err
270
	}
271
272
	return nil
273
}
274
275
// LookupEntityStream performs a permission check on a set of entities and streams the results
276
// containing the IDs of the entities that have the requested permission.
277
func (engine *LookupEngine) LookupEntitiesStream(ctx context.Context, request *base.PermissionsLookupEntityRequest, server base.Permission_LookupEntitiesStreamServer) (err error) {
278
	size := request.GetPageSize()
279
	if size == 0 {
280
		size = 1000
281
	}
282
283
	// Define a callback function that will be called for each entity that passes the permission check.
284
	// If the check result is allowed, it sends the entity ID to the server stream.
285
	callback := func(entityID, permission, token string) {
286
		err := server.Send(&base.PermissionsLookupEntityStreamResponse{
287
			EntityId:        entityID,
288
			Permission:      permission,
289
			ContinuousToken: token,
290
		})
291
		// If there is an error in sending the response, the function will return
292
		if err != nil {
293
			return
294
		}
295
	}
296
297
	// Create and start BulkChecker. It performs permission checks concurrently.
298
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
299
300
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
301
	publisher := NewBulkEntityPublisher(ctx, request, checker)
302
303
	// Retrieve the entity definition schema based on the tenantId and schema version
304
	var sc *base.SchemaDefinition
305
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
306
	if err != nil {
307
		return err
308
	}
309
310
	visits := &VisitsMap{}
311
	permissionChecks := &VisitsMap{}
312
313
	entrances := make([]*base.Entrance, 0)
314
315
	for _, permission := range request.GetPermissions() {
316
		entrances = append(entrances, &base.Entrance{
317
			Type:  request.GetEntityType(),
318
			Value: permission,
319
		})
320
	}
321
322
	// Perform an entity filter operation based on the permission request
323
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
324
		TenantId: request.GetTenantId(),
325
		Metadata: &base.PermissionEntityFilterRequestMetadata{
326
			SnapToken:     request.GetMetadata().GetSnapToken(),
327
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
328
			Depth:         request.GetMetadata().GetDepth(),
329
		},
330
		Entrances: entrances,
331
		Subject:   request.GetSubject(),
332
		Context:   request.GetContext(),
333
		Cursor:    request.GetContinuousToken(),
334
	}, visits, publisher, permissionChecks)
335
	if err != nil {
336
		return err
337
	}
338
339
	err = checker.ExecuteRequests(size)
340
	if err != nil {
341
		return err
342
	}
343
344
	return nil
345
}
346
347
// LookupSubject checks if a subject has a particular permission based on the schema and version.
348
// It returns a list of subjects that have the given permission.
349
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
350
	size := request.GetPageSize()
351
	if size == 0 {
352
		size = 1000
353
	}
354
355
	// Use a mutex to protect concurrent writes to the subjectIDs slice.
356
	var mu sync.Mutex
357
	var subjectIDs []string
358
	var ct string
359
360
	// Callback function to handle the results of permission checks.
361
	// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
362
	callback := func(subjectID, permission, token string) {
363
		mu.Lock()         // Lock to prevent concurrent modification of the slice.
364
		defer mu.Unlock() // Unlock after the ID is appended.
365
		subjectIDs = append(subjectIDs, subjectID)
366
		ct = token
367
	}
368
369
	// Create and initiate a BulkChecker to perform permission checks in parallel.
370
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_SUBJECT, callback, engine.concurrencyLimit)
371
372
	// Create and start a BulkPublisher to provide entities to the BulkChecker.
373
	publisher := NewBulkSubjectPublisher(ctx, request, checker)
374
375
	// Retrieve the schema of the entity based on the provided tenantId and schema version.
376
	var sc *base.SchemaDefinition
377
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
378
	if err != nil {
379
		// Return an error if there was an issue retrieving the schema.
380
		return nil, err
381
	}
382
383
	// Walk the entity schema to perform a permission check.
384
	err = schema.NewWalker(sc).Walk(request.GetEntity().GetType(), request.GetPermission())
385
	if err != nil {
386
		// If the error indicates the schema walk is unimplemented, handle it with a MassEntityFilter.
387
		if errors.Is(err, schema.ErrUnimplemented) {
388
			err = NewMassSubjectFilter(engine.dataReader).SubjectFilter(ctx, request, publisher)
389
			if err != nil {
390
				// Return an error if there was an issue with the subject filter.
391
				return nil, err
392
			}
393
		} else { // For other errors, simply return the error
394
			return nil, err
395
		}
396
	} else {
397
		// Use the schema-based subject filter to get the list of subjects with the requested permission.
398
		ids, err := NewSchemaBasedSubjectFilter(engine.schemaReader, engine.dataReader, SchemaBaseSubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
399
		if err != nil {
400
			return nil, err
401
		}
402
403
		for _, id := range ids {
404
			publisher.Publish(&base.Subject{
405
				Type:     request.GetSubjectReference().GetType(),
406
				Id:       id,
407
				Relation: request.GetSubjectReference().GetRelation(),
408
			}, &base.PermissionCheckRequestMetadata{
409
				SnapToken:     request.GetMetadata().GetSnapToken(),
410
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
411
				Depth:         request.GetMetadata().GetDepth(),
412
			}, request.GetContext(), base.CheckResult_CHECK_RESULT_ALLOWED)
413
		}
414
	}
415
416
	err = checker.ExecuteRequests(size)
417
	if err != nil {
418
		// Return an error if there was an issue with the subject filter.
419
		return nil, err
420
	}
421
422
	// Return the list of entity IDs that have the required permission.
423
	return &base.PermissionLookupSubjectResponse{
424
		SubjectIds:      subjectIDs,
425
		ContinuousToken: ct,
426
	}, nil
427
}
428
429
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
430
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
431
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
432
	// Create a unique cache key by combining the tenantID and schemaVersion.
433
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
434
	cacheKey := tenantID + "|" + schemaVersion
435
436
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
437
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
438
		// If the schema is present in the cache, cast it to its correct type and return.
439
		return sch.(*base.SchemaDefinition), nil
440
	}
441
442
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
443
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
444
	if err != nil {
445
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
446
		return nil, err
447
	}
448
449
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
450
	engine.schemaMap.Store(cacheKey, sch)
451
452
	// Return the freshly read schema.
453
	return sch, nil
454
}
455