1 | package engines |
||
2 | |||
3 | import ( |
||
4 | "context" |
||
5 | "fmt" |
||
6 | "slices" |
||
7 | "sort" |
||
8 | "strings" |
||
9 | "sync" |
||
10 | |||
11 | "github.com/Permify/permify/internal/invoke" |
||
12 | "github.com/Permify/permify/internal/storage" |
||
13 | "github.com/Permify/permify/internal/storage/context/utils" |
||
14 | "github.com/Permify/permify/pkg/database" |
||
15 | base "github.com/Permify/permify/pkg/pb/base/v1" |
||
16 | ) |
||
17 | |||
18 | type LookupEngine struct { |
||
19 | // schemaReader is responsible for reading schema information |
||
20 | schemaReader storage.SchemaReader |
||
21 | // schemaReader is responsible for reading data |
||
22 | dataReader storage.DataReader |
||
23 | // checkEngine is responsible for performing permission checks |
||
24 | checkEngine invoke.Check |
||
25 | // schemaMap is a map that keeps track of schema versions |
||
26 | schemaMap sync.Map |
||
27 | // concurrencyLimit is the maximum number of concurrent permission checks allowed |
||
28 | concurrencyLimit int |
||
29 | } |
||
30 | |||
31 | func NewLookupEngine( |
||
32 | check invoke.Check, |
||
33 | schemaReader storage.SchemaReader, |
||
34 | dataReader storage.DataReader, |
||
35 | opts ...LookupOption, |
||
36 | ) *LookupEngine { |
||
37 | engine := &LookupEngine{ |
||
38 | schemaReader: schemaReader, |
||
39 | checkEngine: check, |
||
40 | dataReader: dataReader, |
||
41 | schemaMap: sync.Map{}, |
||
42 | concurrencyLimit: _defaultConcurrencyLimit, |
||
43 | } |
||
44 | |||
45 | // options |
||
46 | for _, opt := range opts { |
||
47 | opt(engine) |
||
48 | } |
||
49 | |||
50 | return engine |
||
51 | } |
||
52 | |||
53 | // LookupEntity performs a permission check on a set of entities and returns a response |
||
54 | // containing the IDs of the entities that have the requested permission. |
||
55 | func (engine *LookupEngine) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) { |
||
56 | // A mutex and slice are declared to safely store entity IDs from concurrent callbacks |
||
57 | var mu sync.Mutex |
||
58 | var entityIDs []string |
||
59 | var ct string |
||
60 | |||
61 | size := request.GetPageSize() |
||
62 | if size == 0 { |
||
63 | size = 1000 |
||
64 | } |
||
65 | |||
66 | // Callback function which is called for each entity. If the entity passes the permission check, |
||
67 | // the entity ID is appended to the entityIDs slice. |
||
68 | callback := func(entityID, token string) { |
||
69 | mu.Lock() // Safeguard access to the shared slice with a mutex |
||
70 | defer mu.Unlock() // Ensure the lock is released after appending the ID |
||
71 | entityIDs = append(entityIDs, entityID) |
||
72 | ct = token |
||
73 | } |
||
74 | |||
75 | // Create configuration for BulkChecker |
||
76 | config := BulkCheckerConfig{ |
||
77 | ConcurrencyLimit: engine.concurrencyLimit, |
||
78 | BufferSize: 1000, |
||
79 | } |
||
80 | |||
81 | // Create and start BulkChecker. It performs permission checks in parallel. |
||
82 | checker, err := NewBulkChecker(ctx, engine.checkEngine, BulkCheckerTypeEntity, callback, config) |
||
83 | if err != nil { |
||
84 | return nil, fmt.Errorf("failed to create bulk checker: %w", err) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
85 | } |
||
86 | defer checker.Close() |
||
87 | |||
88 | // Create and start BulkPublisher. It receives entities and passes them to BulkChecker. |
||
89 | publisher := NewBulkEntityPublisher(ctx, request, checker) |
||
90 | |||
91 | // Retrieve the schema of the entity based on the tenantId and schema version |
||
92 | var sc *base.SchemaDefinition |
||
93 | sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion()) |
||
94 | if err != nil { |
||
95 | return nil, err |
||
96 | } |
||
97 | |||
98 | // Create a map to keep track of visited entities |
||
99 | visits := &VisitsMap{} |
||
100 | |||
101 | // Perform an entity filter operation based on the permission request |
||
102 | err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{ |
||
103 | TenantId: request.GetTenantId(), |
||
104 | Metadata: &base.PermissionEntityFilterRequestMetadata{ |
||
105 | SnapToken: request.GetMetadata().GetSnapToken(), |
||
106 | SchemaVersion: request.GetMetadata().GetSchemaVersion(), |
||
107 | Depth: request.GetMetadata().GetDepth(), |
||
108 | }, |
||
109 | Entrance: &base.Entrance{ |
||
110 | Type: request.GetEntityType(), |
||
111 | Value: request.GetPermission(), |
||
112 | }, |
||
113 | Subject: request.GetSubject(), |
||
114 | Context: request.GetContext(), |
||
115 | Scope: request.GetScope(), |
||
116 | Cursor: request.GetContinuousToken(), |
||
117 | }, visits, publisher) |
||
118 | if err != nil { |
||
119 | return nil, err |
||
120 | } |
||
121 | |||
122 | // At this point, the BulkChecker has collected and sorted requests |
||
123 | err = checker.ExecuteRequests(size) // Execute the collected requests in parallel |
||
124 | if err != nil { |
||
125 | return nil, err |
||
126 | } |
||
127 | |||
128 | // Return response containing allowed entity IDs |
||
129 | return &base.PermissionLookupEntityResponse{ |
||
130 | EntityIds: entityIDs, |
||
131 | ContinuousToken: ct, |
||
132 | }, nil |
||
133 | } |
||
134 | |||
135 | // LookupEntityStream performs a permission check on a set of entities and streams the results |
||
136 | // containing the IDs of the entities that have the requested permission. |
||
137 | func (engine *LookupEngine) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) { |
||
138 | size := request.GetPageSize() |
||
139 | if size == 0 { |
||
140 | size = 1000 |
||
141 | } |
||
142 | |||
143 | // Define a callback function that will be called for each entity that passes the permission check. |
||
144 | // If the check result is allowed, it sends the entity ID to the server stream. |
||
145 | callback := func(entityID, token string) { |
||
146 | err := server.Send(&base.PermissionLookupEntityStreamResponse{ |
||
147 | EntityId: entityID, |
||
148 | ContinuousToken: token, |
||
149 | }) |
||
150 | // If there is an error in sending the response, the function will return |
||
151 | if err != nil { |
||
152 | return |
||
153 | } |
||
154 | } |
||
155 | |||
156 | // Create configuration for BulkChecker |
||
157 | config := BulkCheckerConfig{ |
||
158 | ConcurrencyLimit: engine.concurrencyLimit, |
||
159 | BufferSize: 1000, |
||
160 | } |
||
161 | |||
162 | // Create and start BulkChecker. It performs permission checks concurrently. |
||
163 | checker, err := NewBulkChecker(ctx, engine.checkEngine, BulkCheckerTypeEntity, callback, config) |
||
164 | if err != nil { |
||
165 | return fmt.Errorf("failed to create bulk checker: %w", err) |
||
0 ignored issues
–
show
|
|||
166 | } |
||
167 | defer checker.Close() |
||
168 | |||
169 | // Create and start BulkPublisher. It receives entities and passes them to BulkChecker. |
||
170 | publisher := NewBulkEntityPublisher(ctx, request, checker) |
||
171 | |||
172 | // Retrieve the entity definition schema based on the tenantId and schema version |
||
173 | var sc *base.SchemaDefinition |
||
174 | sc, err = engine.readSchema(ctx, request.GetTenantId(), request.GetMetadata().GetSchemaVersion()) |
||
175 | if err != nil { |
||
176 | return err |
||
177 | } |
||
178 | |||
179 | visits := &VisitsMap{} |
||
180 | |||
181 | // Perform an entity filter operation based on the permission request |
||
182 | err = NewEntityFilter(engine.dataReader, sc).EntityFilter(ctx, &base.PermissionEntityFilterRequest{ |
||
183 | TenantId: request.GetTenantId(), |
||
184 | Metadata: &base.PermissionEntityFilterRequestMetadata{ |
||
185 | SnapToken: request.GetMetadata().GetSnapToken(), |
||
186 | SchemaVersion: request.GetMetadata().GetSchemaVersion(), |
||
187 | Depth: request.GetMetadata().GetDepth(), |
||
188 | }, |
||
189 | Entrance: &base.Entrance{ |
||
190 | Type: request.GetEntityType(), |
||
191 | Value: request.GetPermission(), |
||
192 | }, |
||
193 | Subject: request.GetSubject(), |
||
194 | Context: request.GetContext(), |
||
195 | Cursor: request.GetContinuousToken(), |
||
196 | }, visits, publisher) |
||
197 | if err != nil { |
||
198 | return err |
||
199 | } |
||
200 | |||
201 | err = checker.ExecuteRequests(size) |
||
202 | if err != nil { |
||
203 | return err |
||
204 | } |
||
205 | |||
206 | return nil |
||
207 | } |
||
208 | |||
209 | // LookupSubject checks if a subject has a particular permission based on the schema and version. |
||
210 | // It returns a list of subjects that have the given permission. |
||
211 | func (engine *LookupEngine) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) { |
||
212 | size := request.GetPageSize() |
||
213 | if size == 0 { |
||
214 | size = 1000 |
||
215 | } |
||
216 | |||
217 | var ids []string |
||
218 | var ct string |
||
219 | |||
220 | // Use the schema-based subject filter to get the list of subjects with the requested permission. |
||
221 | ids, err = NewSubjectFilter(engine.schemaReader, engine.dataReader, SubjectFilterConcurrencyLimit(engine.concurrencyLimit)).SubjectFilter(ctx, request) |
||
222 | if err != nil { |
||
223 | return nil, err |
||
224 | } |
||
225 | |||
226 | // Initialize excludedIds to be used in the query |
||
227 | var excludedIds []string |
||
228 | |||
229 | // Check if the wildcard '<>' is present in the ids.Ids or if it's formatted like "<>-1,2,3" |
||
230 | for _, id := range ids { |
||
231 | if id == ALL { |
||
232 | // Handle '<>' case: no exclusions, include all resources |
||
233 | excludedIds = nil |
||
234 | break |
||
235 | } else if strings.HasPrefix(id, ALL+"-") { |
||
236 | // Handle '<>-1,2,3' case: parse exclusions after '-' |
||
237 | excludedIds = strings.Split(strings.TrimPrefix(id, ALL+"-"), ",") |
||
238 | break |
||
239 | } |
||
240 | } |
||
241 | |||
242 | // If '<>' was found, query all subjects with exclusions if provided |
||
243 | if excludedIds != nil || slices.Contains(ids, ALL) { |
||
244 | resp, pct, err := engine.dataReader.QueryUniqueSubjectReferences( |
||
245 | ctx, |
||
246 | request.GetTenantId(), |
||
247 | request.GetSubjectReference(), |
||
248 | excludedIds, // Pass the exclusions if any |
||
249 | request.GetMetadata().GetSnapToken(), |
||
250 | database.NewPagination(database.Size(size), database.Token(request.GetContinuousToken())), |
||
251 | ) |
||
252 | if err != nil { |
||
253 | return nil, err |
||
254 | } |
||
255 | ct = pct.String() |
||
256 | |||
257 | // Return the list of entity IDs that have the required permission. |
||
258 | return &base.PermissionLookupSubjectResponse{ |
||
259 | SubjectIds: resp, |
||
260 | ContinuousToken: ct, |
||
261 | }, nil |
||
262 | } |
||
263 | |||
264 | // Sort the IDs |
||
265 | sort.Strings(ids) |
||
266 | |||
267 | // Convert page size to int for compatibility with startIndex |
||
268 | pageSize := int(size) |
||
269 | |||
270 | // Determine the end index based on the page size and total number of IDs |
||
271 | end := min(pageSize, len(ids)) |
||
272 | |||
273 | // Generate the next continuous token if there are more results |
||
274 | if end < len(ids) { |
||
275 | ct = utils.NewContinuousToken(ids[end]).Encode().String() |
||
276 | } else { |
||
277 | ct = "" |
||
278 | } |
||
279 | |||
280 | // Return the paginated list of IDs |
||
281 | return &base.PermissionLookupSubjectResponse{ |
||
282 | SubjectIds: ids[:end], // Slice the IDs based on pagination |
||
283 | ContinuousToken: ct, // Return the next continuous token |
||
284 | }, nil |
||
285 | } |
||
286 | |||
287 | // readSchema retrieves a SchemaDefinition for a given tenantID and schemaVersion. |
||
288 | // It first checks a cache (schemaMap) for the schema, and if not found, reads it using the schemaReader. |
||
289 | func (engine *LookupEngine) readSchema(ctx context.Context, tenantID, schemaVersion string) (*base.SchemaDefinition, error) { |
||
290 | // Create a unique cache key by combining the tenantID and schemaVersion. |
||
291 | // This ensures that different combinations of tenantID and schemaVersion get their own cache entries. |
||
292 | cacheKey := tenantID + "|" + schemaVersion |
||
293 | |||
294 | // Attempt to retrieve the schema from the cache (schemaMap) using the generated cacheKey. |
||
295 | if sch, ok := engine.schemaMap.Load(cacheKey); ok { |
||
296 | // If the schema is present in the cache, cast it to its correct type and return. |
||
297 | return sch.(*base.SchemaDefinition), nil |
||
298 | } |
||
299 | |||
300 | // If the schema is not present in the cache, use the schemaReader to read it from the source (e.g., a database or file). |
||
301 | sch, err := engine.schemaReader.ReadSchema(ctx, tenantID, schemaVersion) |
||
302 | if err != nil { |
||
303 | // If there's an error reading the schema (e.g., schema not found or database connection issue), return the error. |
||
304 | return nil, err |
||
305 | } |
||
306 | |||
307 | // Cache the newly read schema in schemaMap so that subsequent reads can be faster. |
||
308 | engine.schemaMap.Store(cacheKey, sch) |
||
309 | |||
310 | // Return the freshly read schema. |
||
311 | return sch, nil |
||
312 | } |
||
313 |