Passed
Pull Request — master (#1603)
by Tolga
03:57
created

engines.*LookupEngine.LookupEntity   B

Complexity

Conditions 6

Size

Total Lines 68
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 42
nop 2
dl 0
loc 68
rs 7.9386
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package engines
2
3
import (
4
	"context"
5
	"errors"
6
	"sync"
7
8
	"github.com/Permify/permify/internal/invoke"
9
	"github.com/Permify/permify/internal/schema"
10
	"github.com/Permify/permify/internal/storage"
11
	base "github.com/Permify/permify/pkg/pb/base/v1"
12
)
13
14
type LookupEngine struct {
15
	// schemaReader is responsible for reading schema information
16
	schemaReader storage.SchemaReader
17
	// schemaReader is responsible for reading data
18
	dataReader storage.DataReader
19
	// checkEngine is responsible for performing permission checks
20
	checkEngine invoke.Check
21
	// schemaMap is a map that keeps track of schema versions
22
	schemaMap sync.Map
23
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
24
	concurrencyLimit int
25
}
26
27
func NewLookupEngine(
28
	check invoke.Check,
29
	schemaReader storage.SchemaReader,
30
	dataReader storage.DataReader,
31
	opts ...LookupOption,
32
) *LookupEngine {
33
	engine := &LookupEngine{
34
		schemaReader:     schemaReader,
35
		checkEngine:      check,
36
		dataReader:       dataReader,
37
		schemaMap:        sync.Map{},
38
		concurrencyLimit: _defaultConcurrencyLimit,
39
	}
40
41
	// options
42
	for _, opt := range opts {
43
		opt(engine)
44
	}
45
46
	return engine
47
}
48
49
// LookupEntity performs a permission check on a set of entities and returns a response
50
// containing the IDs of the entities that have the requested permission.
51
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
52
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
53
	var mu sync.Mutex
54
	var entityIDs []string
55
	var ct string
56
57
	size := request.GetPageSize()
58
	if size == 0 {
59
		size = 1000
60
	}
61
62
	// Callback function which is called for each entity. If the entity passes the permission check,
63
	// the entity ID is appended to the entityIDs slice.
64
	callback := func(entityID, token string) {
65
		mu.Lock()         // Safeguard access to the shared slice with a mutex
66
		defer mu.Unlock() // Ensure the lock is released after appending the ID
67
		entityIDs = append(entityIDs, entityID)
68
		ct = token
69
	}
70
71
	// Create and start BulkChecker. It performs permission checks in parallel.
72
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
73
74
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
75
	publisher := NewBulkEntityPublisher(ctx, request, checker)
76
77
	// Retrieve the schema of the entity based on the tenantId and schema version
78
	var sc *base.SchemaDefinition
79
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
80
	if err != nil {
81
		return nil, err
82
	}
83
84
	// Create a map to keep track of visited entities
85
	visits := &VisitsMap{}
86
87
	// Perform an entity filter operation based on the permission request
88
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
89
		TenantId: request.GetTenantId(),
90
		Metadata: &base.PermissionEntityFilterRequestMetadata{
91
			SnapToken:     request.GetMetadata().GetSnapToken(),
92
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
93
			Depth:         request.GetMetadata().GetDepth(),
94
		},
95
		Entrance: &base.Entrance{
96
			Type:  request.GetEntityType(),
97
			Value: request.GetPermission(),
98
		},
99
		Subject: request.GetSubject(),
100
		Context: request.GetContext(),
101
		Scope:   request.GetScope(),
102
		Cursor:  request.GetContinuousToken(),
103
	}, visits, publisher)
104
	if err != nil {
105
		return nil, err
106
	}
107
108
	// At this point, the BulkChecker has collected and sorted requests
109
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
110
	if err != nil {
111
		return nil, err
112
	}
113
114
	// Return response containing allowed entity IDs
115
	return &base.PermissionLookupEntityResponse{
116
		EntityIds:       entityIDs,
117
		ContinuousToken: ct,
118
	}, nil
119
}
120
121
// LookupEntityStream performs a permission check on a set of entities and streams the results
122
// containing the IDs of the entities that have the requested permission.
123
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
124
	size := request.GetPageSize()
125
	if size == 0 {
126
		size = 1000
127
	}
128
129
	// Define a callback function that will be called for each entity that passes the permission check.
130
	// If the check result is allowed, it sends the entity ID to the server stream.
131
	callback := func(entityID, token string) {
132
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
133
			EntityId:        entityID,
134
			ContinuousToken: token,
135
		})
136
		// If there is an error in sending the response, the function will return
137
		if err != nil {
138
			return
139
		}
140
	}
141
142
	// Create and start BulkChecker. It performs permission checks concurrently.
143
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
144
145
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
146
	publisher := NewBulkEntityPublisher(ctx, request, checker)
147
148
	// Retrieve the entity definition schema based on the tenantId and schema version
149
	var sc *base.SchemaDefinition
150
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
151
	if err != nil {
152
		return err
153
	}
154
155
	visits := &VisitsMap{}
156
157
	// Perform an entity filter operation based on the permission request
158
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
159
		TenantId: request.GetTenantId(),
160
		Metadata: &base.PermissionEntityFilterRequestMetadata{
161
			SnapToken:     request.GetMetadata().GetSnapToken(),
162
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
163
			Depth:         request.GetMetadata().GetDepth(),
164
		},
165
		Entrance: &base.Entrance{
166
			Type:  request.GetEntityType(),
167
			Value: request.GetPermission(),
168
		},
169
		Subject: request.GetSubject(),
170
		Context: request.GetContext(),
171
		Cursor:  request.GetContinuousToken(),
172
	}, visits, publisher)
173
	if err != nil {
174
		return err
175
	}
176
177
	err = checker.ExecuteRequests(size)
178
	if err != nil {
179
		return err
180
	}
181
182
	return nil
183
}
184
185
// LookupSubject checks if a subject has a particular permission based on the schema and version.
186
// It returns a list of subjects that have the given permission.
187
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
188
	size := request.GetPageSize()
189
	if size == 0 {
190
		size = 1000
191
	}
192
193
	// Use a mutex to protect concurrent writes to the subjectIDs slice.
194
	var mu sync.Mutex
195
	var subjectIDs []string
196
	var ct string
197
198
	// Callback function to handle the results of permission checks.
199
	// If an entity passes the permission check, its ID is stored in the subjectIDs slice.
200
	callback := func(subjectID, token string) {
201
		mu.Lock()         // Lock to prevent concurrent modification of the slice.
202
		defer mu.Unlock() // Unlock after the ID is appended.
203
		subjectIDs = append(subjectIDs, subjectID)
204
		ct = token
205
	}
206
207
	// Create and initiate a BulkChecker to perform permission checks in parallel.
208
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_SUBJECT, callback, engine.concurrencyLimit)
209
210
	// Create and start a BulkPublisher to provide entities to the BulkChecker.
211
	publisher := NewBulkSubjectPublisher(ctx, request, checker)
212
213
	// Retrieve the schema of the entity based on the provided tenantId and schema version.
214
	var sc *base.SchemaDefinition
215
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
216
	if err != nil {
217
		// Return an error if there was an issue retrieving the schema.
218
		return nil, err
219
	}
220
221
	// Walk the entity schema to perform a permission check.
222
	err = schema.NewWalker(sc).Walk(request.GetEntity().GetType(), request.GetPermission())
223
	if err != nil {
224
		// If the error indicates the schema walk is unimplemented, handle it with a MassEntityFilter.
225
		if errors.Is(err, schema.ErrUnimplemented) {
226
			err = NewMassSubjectFilter(engine.dataReader).SubjectFilter(ctx, request, publisher)
227
			if err != nil {
228
				// Return an error if there was an issue with the subject filter.
229
				return nil, err
230
			}
231
		} else { // For other errors, simply return the error
232
			return nil, err
233
		}
234
	} else {
235
		// Use the schema-based subject filter to get the list of subjects with the requested permission.
236
		ids, err := NewSchemaBasedSubjectFilter(engine.schemaReader, engine.dataReader, SchemaBaseSubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
237
		if err != nil {
238
			return nil, err
239
		}
240
241
		for _, id := range ids {
242
			publisher.Publish(&base.Subject{
243
				Type:     request.GetSubjectReference().GetType(),
244
				Id:       id,
245
				Relation: request.GetSubjectReference().GetRelation(),
246
			}, &base.PermissionCheckRequestMetadata{
247
				SnapToken:     request.GetMetadata().GetSnapToken(),
248
				SchemaVersion: request.GetMetadata().GetSchemaVersion(),
249
				Depth:         request.GetMetadata().GetDepth(),
250
			}, request.GetContext(), base.CheckResult_CHECK_RESULT_ALLOWED)
251
		}
252
	}
253
254
	err = checker.ExecuteRequests(size)
255
	if err != nil {
256
		// Return an error if there was an issue with the subject filter.
257
		return nil, err
258
	}
259
260
	// Return the list of entity IDs that have the required permission.
261
	return &base.PermissionLookupSubjectResponse{
262
		SubjectIds:      subjectIDs,
263
		ContinuousToken: ct,
264
	}, nil
265
}
266
267
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
268
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
269
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
270
	// Create a unique cache key by combining the tenantID and schemaVersion.
271
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
272
	cacheKey := tenantID + "|" + schemaVersion
273
274
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
275
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
276
		// If the schema is present in the cache, cast it to its correct type and return.
277
		return sch.(*base.SchemaDefinition), nil
278
	}
279
280
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
281
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
282
	if err != nil {
283
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
284
		return nil, err
285
	}
286
287
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
288
	engine.schemaMap.Store(cacheKey, sch)
289
290
	// Return the freshly read schema.
291
	return sch, nil
292
}
293