Passed
Push — master ( e5b4f5...995604 )
by Tolga
01:53 queued 15s
created

engines.*LookupEngine.LookupSubject   C

Complexity

Conditions 10

Size

Total Lines 74
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 43
nop 2
dl 0
loc 74
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like engines.*LookupEngine.LookupSubject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package engines
2
3
import (
4
	"context"
5
	"slices"
6
	"sort"
7
	"strings"
8
	"sync"
9
10
	"github.com/Permify/permify/internal/invoke"
11
	"github.com/Permify/permify/internal/storage"
12
	"github.com/Permify/permify/internal/storage/context/utils"
13
	"github.com/Permify/permify/pkg/database"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
type LookupEngine struct {
18
	// schemaReader is responsible for reading schema information
19
	schemaReader storage.SchemaReader
20
	// schemaReader is responsible for reading data
21
	dataReader storage.DataReader
22
	// checkEngine is responsible for performing permission checks
23
	checkEngine invoke.Check
24
	// schemaMap is a map that keeps track of schema versions
25
	schemaMap sync.Map
26
	// concurrencyLimit is the maximum number of concurrent permission checks allowed
27
	concurrencyLimit int
28
}
29
30
func NewLookupEngine(
31
	check invoke.Check,
32
	schemaReader storage.SchemaReader,
33
	dataReader storage.DataReader,
34
	opts ...LookupOption,
35
) *LookupEngine {
36
	engine := &LookupEngine{
37
		schemaReader:     schemaReader,
38
		checkEngine:      check,
39
		dataReader:       dataReader,
40
		schemaMap:        sync.Map{},
41
		concurrencyLimit: _defaultConcurrencyLimit,
42
	}
43
44
	// options
45
	for _, opt := range opts {
46
		opt(engine)
47
	}
48
49
	return engine
50
}
51
52
// LookupEntity performs a permission check on a set of entities and returns a response
53
// containing the IDs of the entities that have the requested permission.
54
func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
55
	// A mutex and slice are declared to safely store entity IDs from concurrent callbacks
56
	var mu sync.Mutex
57
	var entityIDs []string
58
	var ct string
59
60
	size := request.GetPageSize()
61
	if size == 0 {
62
		size = 1000
63
	}
64
65
	// Callback function which is called for each entity. If the entity passes the permission check,
66
	// the entity ID is appended to the entityIDs slice.
67
	callback := func(entityID, token string) {
68
		mu.Lock()         // Safeguard access to the shared slice with a mutex
69
		defer mu.Unlock() // Ensure the lock is released after appending the ID
70
		entityIDs = append(entityIDs, entityID)
71
		ct = token
72
	}
73
74
	// Create and start BulkChecker. It performs permission checks in parallel.
75
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
76
77
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
78
	publisher := NewBulkEntityPublisher(ctx, request, checker)
79
80
	// Retrieve the schema of the entity based on the tenantId and schema version
81
	var sc *base.SchemaDefinition
82
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
83
	if err != nil {
84
		return nil, err
85
	}
86
87
	// Create a map to keep track of visited entities
88
	visits := &VisitsMap{}
89
90
	// Perform an entity filter operation based on the permission request
91
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
92
		TenantId: request.GetTenantId(),
93
		Metadata: &base.PermissionEntityFilterRequestMetadata{
94
			SnapToken:     request.GetMetadata().GetSnapToken(),
95
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
96
			Depth:         request.GetMetadata().GetDepth(),
97
		},
98
		Entrance: &base.Entrance{
99
			Type:  request.GetEntityType(),
100
			Value: request.GetPermission(),
101
		},
102
		Subject: request.GetSubject(),
103
		Context: request.GetContext(),
104
		Scope:   request.GetScope(),
105
		Cursor:  request.GetContinuousToken(),
106
	}, visits, publisher)
107
	if err != nil {
108
		return nil, err
109
	}
110
111
	// At this point, the BulkChecker has collected and sorted requests
112
	err = checker.ExecuteRequests(size) // Execute the collected requests in parallel
113
	if err != nil {
114
		return nil, err
115
	}
116
117
	// Return response containing allowed entity IDs
118
	return &base.PermissionLookupEntityResponse{
119
		EntityIds:       entityIDs,
120
		ContinuousToken: ct,
121
	}, nil
122
}
123
124
// LookupEntityStream performs a permission check on a set of entities and streams the results
125
// containing the IDs of the entities that have the requested permission.
126
func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
127
	size := request.GetPageSize()
128
	if size == 0 {
129
		size = 1000
130
	}
131
132
	// Define a callback function that will be called for each entity that passes the permission check.
133
	// If the check result is allowed, it sends the entity ID to the server stream.
134
	callback := func(entityID, token string) {
135
		err := server.Send(&base.PermissionLookupEntityStreamResponse{
136
			EntityId:        entityID,
137
			ContinuousToken: token,
138
		})
139
		// If there is an error in sending the response, the function will return
140
		if err != nil {
141
			return
142
		}
143
	}
144
145
	// Create and start BulkChecker. It performs permission checks concurrently.
146
	checker := NewBulkChecker(ctx, engine.checkEngine, BULK_ENTITY, callback, engine.concurrencyLimit)
147
148
	// Create and start BulkPublisher. It receives entities and passes them to BulkChecker.
149
	publisher := NewBulkEntityPublisher(ctx, request, checker)
150
151
	// Retrieve the entity definition schema based on the tenantId and schema version
152
	var sc *base.SchemaDefinition
153
	sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion())
154
	if err != nil {
155
		return err
156
	}
157
158
	visits := &VisitsMap{}
159
160
	// Perform an entity filter operation based on the permission request
161
	err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{
162
		TenantId: request.GetTenantId(),
163
		Metadata: &base.PermissionEntityFilterRequestMetadata{
164
			SnapToken:     request.GetMetadata().GetSnapToken(),
165
			SchemaVersion: request.GetMetadata().GetSchemaVersion(),
166
			Depth:         request.GetMetadata().GetDepth(),
167
		},
168
		Entrance: &base.Entrance{
169
			Type:  request.GetEntityType(),
170
			Value: request.GetPermission(),
171
		},
172
		Subject: request.GetSubject(),
173
		Context: request.GetContext(),
174
		Cursor:  request.GetContinuousToken(),
175
	}, visits, publisher)
176
	if err != nil {
177
		return err
178
	}
179
180
	err = checker.ExecuteRequests(size)
181
	if err != nil {
182
		return err
183
	}
184
185
	return nil
186
}
187
188
// LookupSubject checks if a subject has a particular permission based on the schema and version.
189
// It returns a list of subjects that have the given permission.
190
func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
191
	size := request.GetPageSize()
192
	if size == 0 {
193
		size = 1000
194
	}
195
196
	var ids []string
197
	var ct string
198
199
	// Use the schema-based subject filter to get the list of subjects with the requested permission.
200
	ids, err = NewSubjectFilter(engine.schemaReader, engine.dataReader, SubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request)
201
	if err != nil {
202
		return nil, err
203
	}
204
205
	// Initialize excludedIds to be used in the query
206
	var excludedIds []string
207
208
	// Check if the wildcard '<>' is present in the ids.Ids or if it's formatted like "<>-1,2,3"
209
	for _, id := range ids {
210
		if id == ALL {
211
			// Handle '<>' case: no exclusions, include all resources
212
			excludedIds = nil
213
			break
214
		} else if strings.HasPrefix(id, ALL+"-") {
215
			// Handle '<>-1,2,3' case: parse exclusions after '-'
216
			excludedIds = strings.Split(strings.TrimPrefix(id, ALL+"-"), ",")
217
			break
218
		}
219
	}
220
221
	// If '<>' was found, query all subjects with exclusions if provided
222
	if excludedIds != nil || slices.Contains(ids, ALL) {
223
		resp, pct, err := engine.dataReader.QueryUniqueSubjectReferences(
224
			ctx,
225
			request.GetTenantId(),
226
			request.GetSubjectReference(),
227
			excludedIds, // Pass the exclusions if any
228
			request.GetMetadata().GetSnapToken(),
229
			database.NewPagination(database.Size(size), database.Token(request.GetContinuousToken())),
230
		)
231
		if err != nil {
232
			return nil, err
233
		}
234
		ct = pct.String()
235
236
		// Return the list of entity IDs that have the required permission.
237
		return &base.PermissionLookupSubjectResponse{
238
			SubjectIds:      resp,
239
			ContinuousToken: ct,
240
		}, nil
241
	}
242
243
	// Sort the IDs
244
	sort.Strings(ids)
245
246
	// Convert page size to int for compatibility with startIndex
247
	pageSize := int(size)
248
249
	// Determine the end index based on the page size and total number of IDs
250
	end := min(pageSize, len(ids))
251
252
	// Generate the next continuous token if there are more results
253
	if end < len(ids) {
254
		ct = utils.NewContinuousToken(ids[end]).Encode().String()
255
	} else {
256
		ct = ""
257
	}
258
259
	// Return the paginated list of IDs
260
	return &base.PermissionLookupSubjectResponse{
261
		SubjectIds:      ids[:end], // Slice the IDs based on pagination
262
		ContinuousToken: ct,        // Return the next continuous token
263
	}, nil
264
}
265
266
// readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion.
267
// It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader.
268
func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) {
269
	// Create a unique cache key by combining the tenantID and schemaVersion.
270
	// This ensures that different combinations of tenantID and schemaVersion get their own cache entries.
271
	cacheKey := tenantID + "|" + schemaVersion
272
273
	// Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey.
274
	if sch, ok := engine.schemaMap.Load(cacheKey); ok {
275
		// If the schema is present in the cache, cast it to its correct type and return.
276
		return sch.(*base.SchemaDefinition), nil
277
	}
278
279
	// If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file).
280
	sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion)
281
	if err != nil {
282
		// If there's an error reading the schema (e.g., schema not found or database connection issue), return the error.
283
		return nil, err
284
	}
285
286
	// Cache the newly read schema in schemaMap so that subsequent reads can be faster.
287
	engine.schemaMap.Store(cacheKey, sch)
288
289
	// Return the freshly read schema.
290
	return sch, nil
291
}
292