Passed
Push — master ( 5dc383...3ad024 )
by Tolga
01:26 queued 20s
created

internal/engines/lookup.go   A

Size/Duplication

Total Lines 318
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 34
eloc 178
dl 0
loc 318
rs 9.68
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
B engines.*LookupEngine.LookupEntityStream 0 60 7
F engines.*LookupEngine.LookupSubject 0 102 16
B engines.*LookupEngine.LookupEntity 0 68 6
A engines.NewLookupEngine 0 20 2
A engines.*LookupEngine.readSchema 0 23 3
1
package engines
2
3
import (
4
	"context"
5
	"slices"
6
	"sort"
7
	"strings"
8
	"sync"
9
10
	"github.com/Permify/permify/internal/invoke"
11
	"github.com/Permify/permify/internal/storage"
12
	"github.com/Permify/permify/internal/storage/context/utils"
13
	"github.com/Permify/permify/pkg/database"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
type LookupEngine struct {
18
	// schemaReader is responsible for reading schema information
19
	schemaReader storage.SchemaReader
20
	// schemaReader is responsible for reading data
21
	dataReader storage.DataReader
22
	// checkEngine is responsible for performing permission checks
23
	checkEngine invoke.Check
24
	// schemaMap is a map that keeps track of schema versions
25
	schemaMap sync.Map
26
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
27
	concurrencyLimit int
28
}
29
30
func NewLookupEngine(
31
	check invoke.Check,
32
	schemaReader storage.SchemaReader,
33
	dataReader storage.DataReader,
34
	opts ...LookupOption,
35
) *LookupEngine {
36
	engine := &LookupEngine{
37
		schemaReader:     schemaReader,
38
		checkEngine:      check,
39
		dataReader:       dataReader,
40
		schemaMap:        sync.Map{},
41
		concurrencyLimit: _defaultConcurrencyLimit,
42
	}
43
44
	// options
45
	for _, opt := range opts {
46
		opt(engine)
47
	}
48
49
	return engine
50
}
51
52
// LookupEntity performs a permission check on a set of entities and returns a response
53
// containing the IDs of the entities that have the requested permission.
54
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
55
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
56
	var mu sync.Mutex
57
	var entityIDs []string
58
	var ct string
59
60
	size := request.GetPageSize()
61
	if size == 0 {
62
		size = 1000
63
	}
64
65
	// Callback function which is called for each entity. If the entity passes the permission check,
66
	// the entity ID is appended to the entityIDs slice.
67
	callback := func(entityID, token string) {
68
		mu.Lock()         // Safeguard access to the shared slice with a mutex
69
		defer mu.Unlock() // Ensure the lock is released after appending the ID
70
		entityIDs = append(entityIDs, entityID)
71
		ct = token
72
	}
73
74
	// Create and start BulkChecker. It performs permission checks in parallel.
75
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
76
77
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
78
	publisher := NewBulkEntityPublisher(ctx, request, checker)
79
80
	// Retrieve the schema of the entity based on the tenantId and schema version
81
	var sc *base.SchemaDefinition
82
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
83
	if err != nil {
84
		return nil, err
85
	}
86
87
	// Create a map to keep track of visited entities
88
	visits := &VisitsMap{}
89
90
	// Perform an entity filter operation based on the permission request
91
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
92
		TenantId: request.GetTenantId(),
93
		Metadata: &base.PermissionEntityFilterRequestMetadata{
94
			SnapToken:     request.GetMetadata().GetSnapToken(),
95
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
96
			Depth:         request.GetMetadata().GetDepth(),
97
		},
98
		Entrance: &base.Entrance{
99
			Type:  request.GetEntityType(),
100
			Value: request.GetPermission(),
101
		},
102
		Subject: request.GetSubject(),
103
		Context: request.GetContext(),
104
		Scope:   request.GetScope(),
105
		Cursor:  request.GetContinuousToken(),
106
	}, visits, publisher)
107
	if err != nil {
108
		return nil, err
109
	}
110
111
	// At this point, the BulkChecker has collected and sorted requests
112
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
113
	if err != nil {
114
		return nil, err
115
	}
116
117
	// Return response containing allowed entity IDs
118
	return &base.PermissionLookupEntityResponse{
119
		EntityIds:       entityIDs,
120
		ContinuousToken: ct,
121
	}, nil
122
}
123
124
// LookupEntityStream performs a permission check on a set of entities and streams the results
125
// containing the IDs of the entities that have the requested permission.
126
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
127
	size := request.GetPageSize()
128
	if size == 0 {
129
		size = 1000
130
	}
131
132
	// Define a callback function that will be called for each entity that passes the permission check.
133
	// If the check result is allowed, it sends the entity ID to the server stream.
134
	callback := func(entityID, token string) {
135
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
136
			EntityId:        entityID,
137
			ContinuousToken: token,
138
		})
139
		// If there is an error in sending the response, the function will return
140
		if err != nil {
141
			return
142
		}
143
	}
144
145
	// Create and start BulkChecker. It performs permission checks concurrently.
146
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
147
148
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
149
	publisher := NewBulkEntityPublisher(ctx, request, checker)
150
151
	// Retrieve the entity definition schema based on the tenantId and schema version
152
	var sc *base.SchemaDefinition
153
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
154
	if err != nil {
155
		return err
156
	}
157
158
	visits := &VisitsMap{}
159
160
	// Perform an entity filter operation based on the permission request
161
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
162
		TenantId: request.GetTenantId(),
163
		Metadata: &base.PermissionEntityFilterRequestMetadata{
164
			SnapToken:     request.GetMetadata().GetSnapToken(),
165
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
166
			Depth:         request.GetMetadata().GetDepth(),
167
		},
168
		Entrance: &base.Entrance{
169
			Type:  request.GetEntityType(),
170
			Value: request.GetPermission(),
171
		},
172
		Subject: request.GetSubject(),
173
		Context: request.GetContext(),
174
		Cursor:  request.GetContinuousToken(),
175
	}, visits, publisher)
176
	if err != nil {
177
		return err
178
	}
179
180
	err = checker.ExecuteRequests(size)
181
	if err != nil {
182
		return err
183
	}
184
185
	return nil
186
}
187
188
// LookupSubject checks if a subject has a particular permission based on the schema and version.
189
// It returns a list of subjects that have the given permission.
190
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
191
	size := request.GetPageSize()
192
	if size == 0 {
193
		size = 1000
194
	}
195
196
	var ids []string
197
	var ct string
198
199
	// Use the schema-based subject filter to get the list of subjects with the requested permission.
200
	ids, err = NewSubjectFilter(engine.schemaReader, engine.dataReader, SubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
201
	if err != nil {
202
		return nil, err
203
	}
204
205
	// Initialize excludedIds to be used in the query
206
	var excludedIds []string
207
208
	// Check if the wildcard '*' is present in the ids.Ids or if it's formatted like "*-1,2,3"
209
	for _, id := range ids {
210
		if id == "*" {
211
			// Handle '*' case: no exclusions, include all resources
212
			excludedIds = nil
213
			break
214
		} else if strings.HasPrefix(id, "*-") {
215
			// Handle '*-1,2,3' case: parse exclusions after '-'
216
			excludedIds = strings.Split(strings.TrimPrefix(id, "*-"), ",")
217
			break
218
		}
219
	}
220
221
	// If '*' was found, query all subjects with exclusions if provided
222
	if excludedIds != nil || slices.Contains(ids, "*") {
223
		resp, pct, err := engine.dataReader.QueryUniqueSubjectReferences(
224
			ctx,
225
			request.GetTenantId(),
226
			request.GetSubjectReference(),
227
			excludedIds, // Pass the exclusions if any
228
			request.GetMetadata().GetSnapToken(),
229
			database.NewPagination(database.Size(size), database.Token(request.GetContinuousToken())),
230
		)
231
		if err != nil {
232
			return nil, err
233
		}
234
		ct = pct.String()
235
236
		// Return the list of entity IDs that have the required permission.
237
		return &base.PermissionLookupSubjectResponse{
238
			SubjectIds:      resp,
239
			ContinuousToken: ct,
240
		}, nil
241
	}
242
243
	// Sort the IDs
244
	sort.Strings(ids)
245
246
	// Initialize the start index as a string (to match token format)
247
	start := ""
248
249
	// Handle continuous token if present
250
	if request.GetContinuousToken() != "" {
251
		var t database.ContinuousToken
252
		t, err := utils.EncodedContinuousToken{Value: request.GetContinuousToken()}.Decode()
253
		if err != nil {
254
			return nil, err
255
		}
256
		start = t.(utils.ContinuousToken).Value
257
	}
258
259
	// Find the start index based on the continuous token
260
	startIndex := 0
261
	if start != "" {
262
		// Locate the position in the sorted list where the ID equals or exceeds the token value
263
		for i, id := range ids {
264
			if id >= start {
265
				startIndex = i
266
				break
267
			}
268
		}
269
	}
270
271
	// Convert size to int for compatibility with startIndex
272
	pageSize := int(size)
273
274
	// Calculate the end index based on the page size
275
	end := startIndex + pageSize
276
	if end > len(ids) {
277
		end = len(ids)
278
	}
279
280
	// Generate the next continuous token if there are more results
281
	if end < len(ids) {
282
		ct = utils.NewContinuousToken(ids[end]).Encode().String()
283
	} else {
284
		ct = ""
285
	}
286
287
	// Return the paginated and sorted list of IDs
288
	return &base.PermissionLookupSubjectResponse{
289
		SubjectIds:      ids[startIndex:end], // Slice the IDs based on pagination
290
		ContinuousToken: ct,                  // Return the next continuous token
291
	}, nil
292
}
293
294
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
295
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
296
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
297
	// Create a unique cache key by combining the tenantID and schemaVersion.
298
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
299
	cacheKey := tenantID + "|" + schemaVersion
300
301
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
302
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
303
		// If the schema is present in the cache, cast it to its correct type and return.
304
		return sch.(*base.SchemaDefinition), nil
305
	}
306
307
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
308
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
309
	if err != nil {
310
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
311
		return nil, err
312
	}
313
314
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
315
	engine.schemaMap.Store(cacheKey, sch)
316
317
	// Return the freshly read schema.
318
	return sch, nil
319
}
320