internal/engines/lookup.go   A
last analyzed

Size/Duplication

Total Lines 311
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 30
eloc 176
dl 0
loc 311
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
B engines.*LookupEngine.LookupEntityStream 0 70 8
C engines.*LookupEngine.LookupSubject 0 74 10
B engines.*LookupEngine.LookupEntity 0 78 7
A engines.NewLookupEngine 0 20 2
A engines.*LookupEngine.readSchema 0 23 3
1
package engines
2
3
import (
4
	"context"
5
	"fmt"
6
	"slices"
7
	"sort"
8
	"strings"
9
	"sync"
10
11
	"github.com/Permify/permify/internal/invoke"
12
	"github.com/Permify/permify/internal/storage"
13
	"github.com/Permify/permify/internal/storage/context/utils"
14
	"github.com/Permify/permify/pkg/database"
15
	base "github.com/Permify/permify/pkg/pb/base/v1"
16
)
17
18
type LookupEngine struct {
19
	// schemaReader is responsible for reading schema information
20
	schemaReader storage.SchemaReader
21
	// schemaReader is responsible for reading data
22
	dataReader storage.DataReader
23
	// checkEngine is responsible for performing permission checks
24
	checkEngine invoke.Check
25
	// schemaMap is a map that keeps track of schema versions
26
	schemaMap sync.Map
27
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
28
	concurrencyLimit int
29
}
30
31
func NewLookupEngine(
32
	check invoke.Check,
33
	schemaReader storage.SchemaReader,
34
	dataReader storage.DataReader,
35
	opts ...LookupOption,
36
) *LookupEngine {
37
	engine := &LookupEngine{
38
		schemaReader:     schemaReader,
39
		checkEngine:      check,
40
		dataReader:       dataReader,
41
		schemaMap:        sync.Map{},
42
		concurrencyLimit: _defaultConcurrencyLimit,
43
	}
44
45
	// options
46
	for _, opt := range opts {
47
		opt(engine)
48
	}
49
50
	return engine
51
}
52
53
// LookupEntity performs a permission check on a set of entities and returns a response
54
// containing the IDs of the entities that have the requested permission.
55
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
56
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
57
	var mu sync.Mutex
58
	var entityIDs []string
59
	var ct string
60
61
	size := request.GetPageSize()
62
	if size == 0 {
63
		size = 1000
64
	}
65
66
	// Callback function which is called for each entity. If the entity passes the permission check,
67
	// the entity ID is appended to the entityIDs slice.
68
	callback := func(entityID, token string) {
69
		mu.Lock()         // Safeguard access to the shared slice with a mutex
70
		defer mu.Unlock() // Ensure the lock is released after appending the ID
71
		entityIDs = append(entityIDs, entityID)
72
		ct = token
73
	}
74
75
	// Create configuration for BulkChecker
76
	config := BulkCheckerConfig{
77
		ConcurrencyLimit: engine.concurrencyLimit,
78
		BufferSize:       1000,
79
	}
80
81
	// Create and start BulkChecker. It performs permission checks in parallel.
82
	checker, err := NewBulkChecker(ctx, engine.checkEngine, BulkCheckerTypeEntity, callback, config)
83
	if err != nil {
84
		return nil, fmt.Errorf("failed to create bulk checker: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
85
	}
86
	defer checker.Close()
87
88
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
89
	publisher := NewBulkEntityPublisher(ctx, request, checker)
90
91
	// Retrieve the schema of the entity based on the tenantId and schema version
92
	var sc *base.SchemaDefinition
93
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
94
	if err != nil {
95
		return nil, err
96
	}
97
98
	// Create a map to keep track of visited entities
99
	visits := &VisitsMap{}
100
101
	// Perform an entity filter operation based on the permission request
102
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
103
		TenantId: request.GetTenantId(),
104
		Metadata: &base.PermissionEntityFilterRequestMetadata{
105
			SnapToken:     request.GetMetadata().GetSnapToken(),
106
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
107
			Depth:         request.GetMetadata().GetDepth(),
108
		},
109
		Entrance: &base.Entrance{
110
			Type:  request.GetEntityType(),
111
			Value: request.GetPermission(),
112
		},
113
		Subject: request.GetSubject(),
114
		Context: request.GetContext(),
115
		Scope:   request.GetScope(),
116
		Cursor:  request.GetContinuousToken(),
117
	}, visits, publisher)
118
	if err != nil {
119
		return nil, err
120
	}
121
122
	// At this point, the BulkChecker has collected and sorted requests
123
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
124
	if err != nil {
125
		return nil, err
126
	}
127
128
	// Return response containing allowed entity IDs
129
	return &base.PermissionLookupEntityResponse{
130
		EntityIds:       entityIDs,
131
		ContinuousToken: ct,
132
	}, nil
133
}
134
135
// LookupEntityStream performs a permission check on a set of entities and streams the results
136
// containing the IDs of the entities that have the requested permission.
137
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
138
	size := request.GetPageSize()
139
	if size == 0 {
140
		size = 1000
141
	}
142
143
	// Define a callback function that will be called for each entity that passes the permission check.
144
	// If the check result is allowed, it sends the entity ID to the server stream.
145
	callback := func(entityID, token string) {
146
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
147
			EntityId:        entityID,
148
			ContinuousToken: token,
149
		})
150
		// If there is an error in sending the response, the function will return
151
		if err != nil {
152
			return
153
		}
154
	}
155
156
	// Create configuration for BulkChecker
157
	config := BulkCheckerConfig{
158
		ConcurrencyLimit: engine.concurrencyLimit,
159
		BufferSize:       1000,
160
	}
161
162
	// Create and start BulkChecker. It performs permission checks concurrently.
163
	checker, err := NewBulkChecker(ctx, engine.checkEngine, BulkCheckerTypeEntity, callback, config)
164
	if err != nil {
165
		return fmt.Errorf("failed to create bulk checker: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
166
	}
167
	defer checker.Close()
168
169
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
170
	publisher := NewBulkEntityPublisher(ctx, request, checker)
171
172
	// Retrieve the entity definition schema based on the tenantId and schema version
173
	var sc *base.SchemaDefinition
174
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
175
	if err != nil {
176
		return err
177
	}
178
179
	visits := &VisitsMap{}
180
181
	// Perform an entity filter operation based on the permission request
182
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
183
		TenantId: request.GetTenantId(),
184
		Metadata: &base.PermissionEntityFilterRequestMetadata{
185
			SnapToken:     request.GetMetadata().GetSnapToken(),
186
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
187
			Depth:         request.GetMetadata().GetDepth(),
188
		},
189
		Entrance: &base.Entrance{
190
			Type:  request.GetEntityType(),
191
			Value: request.GetPermission(),
192
		},
193
		Subject: request.GetSubject(),
194
		Context: request.GetContext(),
195
		Cursor:  request.GetContinuousToken(),
196
	}, visits, publisher)
197
	if err != nil {
198
		return err
199
	}
200
201
	err = checker.ExecuteRequests(size)
202
	if err != nil {
203
		return err
204
	}
205
206
	return nil
207
}
208
209
// LookupSubject checks if a subject has a particular permission based on the schema and version.
210
// It returns a list of subjects that have the given permission.
211
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
212
	size := request.GetPageSize()
213
	if size == 0 {
214
		size = 1000
215
	}
216
217
	var ids []string
218
	var ct string
219
220
	// Use the schema-based subject filter to get the list of subjects with the requested permission.
221
	ids, err = NewSubjectFilter(engine.schemaReader, engine.dataReader, SubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
222
	if err != nil {
223
		return nil, err
224
	}
225
226
	// Initialize excludedIds to be used in the query
227
	var excludedIds []string
228
229
	// Check if the wildcard '<>' is present in the ids.Ids or if it's formatted like "<>-1,2,3"
230
	for _, id := range ids {
231
		if id == ALL {
232
			// Handle '<>' case: no exclusions, include all resources
233
			excludedIds = nil
234
			break
235
		} else if strings.HasPrefix(id, ALL+"-") {
236
			// Handle '<>-1,2,3' case: parse exclusions after '-'
237
			excludedIds = strings.Split(strings.TrimPrefix(id, ALL+"-"), ",")
238
			break
239
		}
240
	}
241
242
	// If '<>' was found, query all subjects with exclusions if provided
243
	if excludedIds != nil || slices.Contains(ids, ALL) {
244
		resp, pct, err := engine.dataReader.QueryUniqueSubjectReferences(
245
			ctx,
246
			request.GetTenantId(),
247
			request.GetSubjectReference(),
248
			excludedIds, // Pass the exclusions if any
249
			request.GetMetadata().GetSnapToken(),
250
			database.NewPagination(database.Size(size), database.Token(request.GetContinuousToken())),
251
		)
252
		if err != nil {
253
			return nil, err
254
		}
255
		ct = pct.String()
256
257
		// Return the list of entity IDs that have the required permission.
258
		return &base.PermissionLookupSubjectResponse{
259
			SubjectIds:      resp,
260
			ContinuousToken: ct,
261
		}, nil
262
	}
263
264
	// Sort the IDs
265
	sort.Strings(ids)
266
267
	// Convert page size to int for compatibility with startIndex
268
	pageSize := int(size)
269
270
	// Determine the end index based on the page size and total number of IDs
271
	end := min(pageSize, len(ids))
272
273
	// Generate the next continuous token if there are more results
274
	if end < len(ids) {
275
		ct = utils.NewContinuousToken(ids[end]).Encode().String()
276
	} else {
277
		ct = ""
278
	}
279
280
	// Return the paginated list of IDs
281
	return &base.PermissionLookupSubjectResponse{
282
		SubjectIds:      ids[:end], // Slice the IDs based on pagination
283
		ContinuousToken: ct,        // Return the next continuous token
284
	}, nil
285
}
286
287
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
288
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
289
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
290
	// Create a unique cache key by combining the tenantID and schemaVersion.
291
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
292
	cacheKey := tenantID + "|" + schemaVersion
293
294
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
295
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
296
		// If the schema is present in the cache, cast it to its correct type and return.
297
		return sch.(*base.SchemaDefinition), nil
298
	}
299
300
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
301
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
302
	if err != nil {
303
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
304
		return nil, err
305
	}
306
307
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
308
	engine.schemaMap.Store(cacheKey, sch)
309
310
	// Return the freshly read schema.
311
	return sch, nil
312
}
313