Passed
Pull Request — master (#1465)
by
unknown
02:47
created

engines.NewLookupEngine   A

Complexity

Conditions 2

Size

Total Lines 20
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 15
nop 4
dl 0
loc 20
rs 9.65
c 0
b 0
f 0
1
package engines
2
3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
8
	"github.com/Permify/permify/internal/invoke"
9
	"github.com/Permify/permify/internal/schema"
10
	"github.com/Permify/permify/internal/storage"
11
	base "github.com/Permify/permify/pkg/pb/base/v1"
12
)
13
14
type LookupEngine struct {
15
	// schemaReader is responsible for reading schema information
16
	schemaReader storage.SchemaReader
17
	// schemaReader is responsible for reading data
18
	dataReader storage.DataReader
19
	// checkEngine is responsible for performing permission checks
20
	checkEngine invoke.Check
21
	// schemaMap is a map that keeps track of schema versions
22
	schemaMap sync.Map
23
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
24
	concurrencyLimit int
25
}
26
27
func NewLookupEngine(
28
	check invoke.Check,
29
	schemaReader storage.SchemaReader,
30
	dataReader storage.DataReader,
31
	opts ...LookupOption,
32
) *LookupEngine {
33
	engine := &LookupEngine{
34
		schemaReader:     schemaReader,
35
		checkEngine:      check,
36
		dataReader:       dataReader,
37
		schemaMap:        sync.Map{},
38
		concurrencyLimit: _defaultConcurrencyLimit,
39
	}
40
41
	// options
42
	for _, opt := range opts {
43
		opt(engine)
44
	}
45
46
	return engine
47
}
48
49
// LookupEntity performs a permission check on a set of entities and returns a response
50
// containing the IDs of the entities that have the requested permission.
51
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
52
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
53
	var mu sync.Mutex
54
	var entityIDs []string
55
	var ct string
56
57
	size := request.GetPageSize()
58
	if size == 0 {
59
		size = 1000
60
	}
61
62
	// Callback function which is called for each entity. If the entity passes the permission check,
63
	// the entity ID is appended to the entityIDs slice.
64
	callback := func(entityID, permission, token string) {
65
		mu.Lock()         // Safeguard access to the shared slice with a mutex
66
		defer mu.Unlock() // Ensure the lock is released after appending the ID
67
		if _, exists := entityIDsByPermission[permission]; !exists {
68
			// If not, initialize it with an empty EntityIds struct
69
			entityIDsByPermission[permission] = &base.EntityIds{Ids: []string{}}
70
		}
71
		entityIDsByPermission[permission].Ids = append(entityIDsByPermission[permission].Ids, entityID)
72
		ct = token
73
	}
74
75
	// Create and start BulkChecker. It performs permission checks in parallel.
76
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
77
78
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
79
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
80
81
	// Retrieve the schema of the entity based on the tenantId and schema version
82
	var sc *base.SchemaDefinition
83
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
84
	if err != nil {
85
		return nil, err
86
	}
87
88
	// Create a map to keep track of visited entities
89
	visits := &VisitsMap{}
90
91
	permissionChecks := &VisitsMap{}
92
93
	// Perform an entity filter operation based on the permission request
94
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
95
		TenantId: request.GetTenantId(),
96
		Metadata: &base.PermissionEntityFilterRequestMetadata{
97
			SnapToken:     request.GetMetadata().GetSnapToken(),
98
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
99
			Depth:         request.GetMetadata().GetDepth(),
100
		},
101
		Entrances: []*base.Entrance{
102
			{
103
				Type:  request.GetEntityType(),
104
				Value: request.GetPermission(),
105
			},
106
		},
107
		Subject: request.GetSubject(),
108
		Context: request.GetContext(),
109
		Scope:   request.GetScope(),
110
		Cursor:  request.GetContinuousToken(),
111
	}, visits, publisher, permissionChecks)
112
	if err != nil {
113
		return nil, err
114
	}
115
116
	// At this point, the BulkChecker has collected and sorted requests
117
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
118
	if err != nil {
119
		return nil, err
120
	}
121
122
	// Return response containing allowed entity IDs
123
	return &base.PermissionLookupEntityResponse{
124
		EntityIds:       entityIDs,
125
		ContinuousToken: ct,
126
	}, nil
127
}
128
129
// LookupEntity performs a permission check on a set of entities and returns a response
130
// containing the IDs of the entities that have the requested permission.
131
func (engine *LookupEngine) LookupEntities(ctx context.Context, request *base.PermissionsLookupEntityRequest) (response *base.PermissionsLookupEntityResponse, err error) {
132
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
133
	var mu sync.Mutex
134
	entityIDsByPermission := make(map[string]*base.EntityIds)
135
	var ct string
136
137
	size := request.GetPageSize()
138
	if size == 0 {
139
		size = 1000
140
	}
141
142
	// Callback function which is called for each entity. If the entity passes the permission check,
143
	// the entity ID is appended to the entityIDs slice.
144
	callback := func(entityID, permission, token string) {
145
		mu.Lock()         // Safeguard access to the shared slice with a mutex
146
		defer mu.Unlock() // Ensure the lock is released after appending the ID
147
		if _, exists := entityIDsByPermission[permission]; !exists {
148
			// If not, initialize it with an empty EntityIds struct
149
			entityIDsByPermission[permission] = &base.EntityIds{Ids: []string{}}
150
		}
151
		entityIDsByPermission[permission].Ids = append(entityIDsByPermission[permission].Ids, entityID)
152
		ct = token
153
	}
154
155
	// Create and start BulkChecker. It performs permission checks in parallel.
156
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
157
158
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
159
	publisher := NewBulkEntityPublisher(ctx, request, checker)
160
161
	// Retrieve the schema of the entity based on the tenantId and schema version
162
	var sc *base.SchemaDefinition
163
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
164
	if err != nil {
165
		return nil, err
166
	}
167
168
	// Create a map to keep track of visited entities
169
	visits := &VisitsMap{}
170
	permissionChecks := &VisitsMap{}
171
172
	entrances := make([]*base.Entrance, 0)
173
174
	for _, permission := range request.GetPermissions() {
175
		entrances = append(entrances, &base.Entrance{
176
			Type:  request.GetEntityType(),
177
			Value: permission,
178
		})
179
	}
180
181
	// Perform an entity filter operation based on the permission request
182
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
183
		TenantId: request.GetTenantId(),
184
		Metadata: &base.PermissionEntityFilterRequestMetadata{
185
			SnapToken:     request.GetMetadata().GetSnapToken(),
186
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
187
			Depth:         request.GetMetadata().GetDepth(),
188
		},
189
		Entrances: entrances,
190
		Subject:   request.GetSubject(),
191
		Context:   request.GetContext(),
192
		Scope:     request.GetScope(),
193
		Cursor:    request.GetContinuousToken(),
194
	}, visits, publisher, permissionChecks)
195
	if err != nil {
196
		return nil, err
197
	}
198
199
	// At this point, the BulkChecker has collected and sorted requests
200
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
201
	if err != nil {
202
		return nil, err
203
	}
204
205
	// Return response containing allowed entity IDs
206
	return &base.PermissionsLookupEntityResponse{
207
		EntityIds:       entityIDsByPermission,
208
		ContinuousToken: ct,
209
	}, nil
210
}
211
212
// LookupEntityStream performs a permission check on a set of entities and streams the results
213
// containing the IDs of the entities that have the requested permission.
214
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
215
	size := request.GetPageSize()
216
	if size == 0 {
217
		size = 1000
218
	}
219
220
	// Define a callback function that will be called for each entity that passes the permission check.
221
	// If the check result is allowed, it sends the entity ID to the server stream.
222
	callback := func(entityID, permission, token string) {
223
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
224
			EntityId:        entityID,
225
			Permission:      permission,
226
			ContinuousToken: token,
227
		})
228
		// If there is an error in sending the response, the function will return
229
		if err != nil {
230
			return
231
		}
232
	}
233
234
	// Create and start BulkChecker. It performs permission checks concurrently.
235
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
236
237
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
238
	publisher := NewBulkEntityPublisher(ctx, ConvertToPermissionsLookupEntityRequest(request), checker)
239
240
	// Retrieve the entity definition schema based on the tenantId and schema version
241
	var sc *base.SchemaDefinition
242
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
243
	if err != nil {
244
		return err
245
	}
246
247
	visits := &VisitsMap{}
248
	permissionChecks := &VisitsMap{}
249
250
	// Perform an entity filter operation based on the permission request
251
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
252
		TenantId: request.GetTenantId(),
253
		Metadata: &base.PermissionEntityFilterRequestMetadata{
254
			SnapToken:     request.GetMetadata().GetSnapToken(),
255
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
256
			Depth:         request.GetMetadata().GetDepth(),
257
		},
258
		Entrances: []*base.Entrance{
259
			{
260
				Type:  request.GetEntityType(),
261
				Value: request.GetPermission(),
262
			},
263
		},
264
		Subject: request.GetSubject(),
265
		Context: request.GetContext(),
266
		Cursor:  request.GetContinuousToken(),
267
	}, visits, publisher, permissionChecks)
268
	if err != nil {
269
		return err
270
	}
271
272
	err = checker.ExecuteRequests(size)
273
	if err != nil {
274
		return err
275
	}
276
277
	return nil
278
}
279
280
// LookupEntityStream performs a permission check on a set of entities and streams the results
281
// containing the IDs of the entities that have the requested permission.
282
func (engine *LookupEngine) LookupEntitiesStream(ctx context.Context, request *base.PermissionsLookupEntityRequest, server base.Permission_LookupEntitiesStreamServer) (err error) {
283
	size := request.GetPageSize()
284
	if size == 0 {
285
		size = 1000
286
	}
287
288
	// Define a callback function that will be called for each entity that passes the permission check.
289
	// If the check result is allowed, it sends the entity ID to the server stream.
290
	callback := func(entityID, permission, token string) {
291
		err := server.Send(&base.PermissionsLookupEntityStreamResponse{
292
			EntityId:        entityID,
293
			Permission:      permission,
294
			ContinuousToken: token,
295
		})
296
		// If there is an error in sending the response, the function will return
297
		if err != nil {
298
			return
299
		}
300
	}
301
302
	// Create and start BulkChecker. It performs permission checks concurrently.
303
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
304
305
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
306
	publisher := NewBulkEntityPublisher(ctx, request, checker)
307
308
	// Retrieve the entity definition schema based on the tenantId and schema version
309
	var sc *base.SchemaDefinition
310
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
311
	if err != nil {
312
		return err
313
	}
314
315
	visits := &VisitsMap{}
316
	permissionChecks := &VisitsMap{}
317
318
	entrances := make([]*base.Entrance, 0)
319
320
	for _, permission := range request.GetPermissions() {
321
		entrances = append(entrances, &base.Entrance{
322
			Type:  request.GetEntityType(),
323
			Value: permission,
324
		})
325
	}
326
327
	// Perform an entity filter operation based on the permission request
328
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
329
		TenantId: request.GetTenantId(),
330
		Metadata: &base.PermissionEntityFilterRequestMetadata{
331
			SnapToken:     request.GetMetadata().GetSnapToken(),
332
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
333
			Depth:         request.GetMetadata().GetDepth(),
334
		},
335
		Entrances: entrances,
336
		Subject:   request.GetSubject(),
337
		Context:   request.GetContext(),
338
		Cursor:    request.GetContinuousToken(),
339
	}, visits, publisher, permissionChecks)
340
	if err != nil {
341
		return err
342
	}
343
344
	err = checker.ExecuteRequests(size)
345
	if err != nil {
346
		return err
347
	}
348
349
	return nil
350
}
351
352
// LookupSubject checks if a subject has a particular permission based on the schema and version.
353
// It returns a list of subjects that have the given permission.
354
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
355
	size := request.GetPageSize()
356
	if size == 0 {
357
		size = 1000
358
	}
359
360
	// Use a mutex to protect concurrent writes to the subjectIDs slice.
361
	var mu sync.Mutex
362
	var subjectIDs []string
363
	var ct string
364
365
	// Callback function to handle the results of permission checks.
366
	// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
367
	callback := func(subjectID, permission, token string) {
368
		mu.Lock()         // Lock to prevent concurrent modification of the slice.
369
		defer mu.Unlock() // Unlock after the ID is appended.
370
		subjectIDs = append(subjectIDs, subjectID)
371
		ct = token
372
	}
373
374
	// Create and initiate a BulkChecker to perform permission checks in parallel.
375
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_SUBJECT, callback, engine.concurrencyLimit)
376
377
	// Create and start a BulkPublisher to provide entities to the BulkChecker.
378
	publisher := NewBulkSubjectPublisher(ctx, request, checker)
379
380
	// Retrieve the schema of the entity based on the provided tenantId and schema version.
381
	var sc *base.SchemaDefinition
382
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
383
	if err != nil {
384
		// Return an error if there was an issue retrieving the schema.
385
		return nil, err
386
	}
387
388
	// Walk the entity schema to perform a permission check.
389
	err = schema.NewWalker(sc).Walk(request.GetEntity().GetType(), request.GetPermission())
390
	if err != nil {
391
		// If the error indicates the schema walk is unimplemented, handle it with a MassEntityFilter.
392
		if errors.Is(err, schema.ErrUnimplemented) {
393
			err = NewMassSubjectFilter(engine.dataReader).SubjectFilter(ctx, request, publisher)
394
			if err != nil {
395
				// Return an error if there was an issue with the subject filter.
396
				return nil, err
397
			}
398
		} else { // For other errors, simply return the error
399
			return nil, err
400
		}
401
	} else {
402
		// Use the schema-based subject filter to get the list of subjects with the requested permission.
403
		ids, err := NewSchemaBasedSubjectFilter(engine.schemaReader, engine.dataReader, SchemaBaseSubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
404
		if err != nil {
405
			return nil, err
406
		}
407
408
		for _, id := range ids {
409
			publisher.Publish(&base.Subject{
410
				Type:     request.GetSubjectReference().GetType(),
411
				Id:       id,
412
				Relation: request.GetSubjectReference().GetRelation(),
413
			}, &base.PermissionCheckRequestMetadata{
414
				SnapToken:     request.GetMetadata().GetSnapToken(),
415
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
416
				Depth:         request.GetMetadata().GetDepth(),
417
			}, request.GetContext(), base.CheckResult_CHECK_RESULT_ALLOWED)
418
		}
419
	}
420
421
	err = checker.ExecuteRequests(size)
422
	if err != nil {
423
		// Return an error if there was an issue with the subject filter.
424
		return nil, err
425
	}
426
427
	// Return the list of entity IDs that have the required permission.
428
	return &base.PermissionLookupSubjectResponse{
429
		SubjectIds:      subjectIDs,
430
		ContinuousToken: ct,
431
	}, nil
432
}
433
434
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
435
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
436
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
437
	// Create a unique cache key by combining the tenantID and schemaVersion.
438
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
439
	cacheKey := tenantID + "|" + schemaVersion
440
441
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
442
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
443
		// If the schema is present in the cache, cast it to its correct type and return.
444
		return sch.(*base.SchemaDefinition), nil
445
	}
446
447
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
448
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
449
	if err != nil {
450
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
451
		return nil, err
452
	}
453
454
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
455
	engine.schemaMap.Store(cacheKey, sch)
456
457
	// Return the freshly read schema.
458
	return sch, nil
459
}
460