1 | package engines |
||
2 | |||
3 | import ( |
||
4 | "context" |
||
5 | "fmt" |
||
6 | "sort" |
||
7 | "sync" |
||
8 | "sync/atomic" |
||
9 | |||
10 | "github.com/pkg/errors" |
||
11 | |||
12 | "golang.org/x/sync/errgroup" |
||
13 | "golang.org/x/sync/semaphore" |
||
14 | |||
15 | "github.com/Permify/permify/internal/invoke" |
||
16 | "github.com/Permify/permify/internal/storage/memory/utils" |
||
17 | base "github.com/Permify/permify/pkg/pb/base/v1" |
||
18 | ) |
||
19 | |||
20 | // BulkCheckerType defines the type of bulk checking operation. |
||
21 | // This enum determines how requests are sorted and processed. |
||
22 | type BulkCheckerType string |
||
23 | |||
24 | const ( |
||
25 | // BulkCheckerTypeSubject indicates that requests should be sorted and processed by subject ID |
||
26 | BulkCheckerTypeSubject BulkCheckerType = "subject" |
||
27 | // BulkCheckerTypeEntity indicates that requests should be sorted and processed by entity ID |
||
28 | BulkCheckerTypeEntity BulkCheckerType = "entity" |
||
29 | ) |
||
30 | |||
31 | // BulkCheckerRequest represents a permission check request with optional pre-computed result. |
||
32 | // This struct encapsulates both the permission check request and an optional pre-determined result, |
||
33 | // allowing for optimization when results are already known (e.g., from caching). |
||
34 | type BulkCheckerRequest struct { |
||
35 | // Request contains the actual permission check request |
||
36 | Request *base.PermissionCheckRequest |
||
37 | // Result holds a pre-computed result if available, otherwise CHECK_RESULT_UNSPECIFIED |
||
38 | Result base.CheckResult |
||
39 | } |
||
40 | |||
41 | // BulkCheckerConfig holds configuration parameters for the BulkChecker. |
||
42 | // This struct allows for fine-tuning the behavior and performance characteristics |
||
43 | // of the bulk permission checking system. |
||
44 | type BulkCheckerConfig struct { |
||
45 | // ConcurrencyLimit defines the maximum number of concurrent permission checks |
||
46 | // that can be processed simultaneously. Higher values increase throughput |
||
47 | // but may consume more system resources. |
||
48 | ConcurrencyLimit int |
||
49 | // BufferSize defines the size of the internal request buffer. |
||
50 | // This should be set based on expected request volume to avoid blocking. |
||
51 | BufferSize int |
||
52 | } |
||
53 | |||
54 | // DefaultBulkCheckerConfig returns a sensible default configuration |
||
55 | // that balances performance and resource usage for most use cases. |
||
56 | func DefaultBulkCheckerConfig() BulkCheckerConfig { |
||
57 | return BulkCheckerConfig{ |
||
58 | ConcurrencyLimit: 10, // Moderate concurrency for most workloads |
||
59 | BufferSize: 1000, // Buffer for 1000 requests |
||
60 | } |
||
61 | } |
||
62 | |||
63 | // BulkChecker handles concurrent permission checks with ordered result processing. |
||
64 | // This struct implements a high-performance bulk permission checking system that: |
||
65 | // - Collects permission check requests asynchronously |
||
66 | // - Processes them concurrently with controlled parallelism |
||
67 | // - Maintains strict ordering of results based on request sorting |
||
68 | // - Provides efficient resource management and error handling |
||
69 | type BulkChecker struct { |
||
70 | // typ determines the sorting strategy and processing behavior |
||
71 | typ BulkCheckerType |
||
72 | // checker is the underlying permission checking engine |
||
73 | checker invoke.Check |
||
74 | // config holds the operational configuration |
||
75 | config BulkCheckerConfig |
||
76 | // ctx provides context for cancellation and timeout management |
||
77 | ctx context.Context |
||
78 | // cancel allows for graceful shutdown of all operations |
||
79 | cancel context.CancelFunc |
||
80 | |||
81 | // Request handling components |
||
82 | // requestChan is the input channel for receiving permission check requests |
||
83 | requestChan chan BulkCheckerRequest |
||
84 | // requests stores all collected requests before processing |
||
85 | requests []BulkCheckerRequest |
||
86 | // requestsMu provides thread-safe access to the requests slice |
||
87 | requestsMu sync.RWMutex |
||
88 | |||
89 | // Execution state management |
||
90 | // executionState tracks the progress and results of request processing |
||
91 | executionState *executionState |
||
92 | // collectionDone signals when request collection has completed |
||
93 | collectionDone chan struct{} |
||
94 | |||
95 | // Callback for processing results |
||
96 | // callback is invoked for each successful permission check with the entity/subject ID and continuous token |
||
97 | callback func(entityID, continuousToken string) |
||
98 | } |
||
99 | |||
100 | // executionState manages the execution of requests and maintains processing order. |
||
101 | // This struct ensures that results are processed in the correct sequence |
||
102 | // even when requests complete out of order due to concurrent processing. |
||
103 | type executionState struct { |
||
104 | // mu protects access to the execution state |
||
105 | mu sync.Mutex |
||
106 | // results stores the results of all requests in their original order |
||
107 | results []base.CheckResult |
||
108 | // processedIndex tracks the next result to be processed in order |
||
109 | processedIndex int |
||
110 | // successCount tracks the number of successful permission checks |
||
111 | successCount int64 |
||
112 | // limit defines the maximum number of successful results to process |
||
113 | limit int64 |
||
114 | } |
||
115 | |||
116 | // NewBulkChecker creates a new BulkChecker instance with comprehensive validation and error handling. |
||
117 | // This constructor ensures that all dependencies are properly initialized and validates |
||
118 | // configuration parameters to prevent runtime errors. |
||
119 | // |
||
120 | // Parameters: |
||
121 | // - ctx: Context for managing the lifecycle of the BulkChecker |
||
122 | // - checker: The permission checking engine to use for actual permission checks |
||
123 | // - typ: The type of bulk checking operation (entity or subject) |
||
124 | // - callback: Function called for each successful permission check |
||
125 | // - config: Configuration parameters for tuning performance and behavior |
||
126 | // |
||
127 | // Returns: |
||
128 | // - *BulkChecker: The initialized BulkChecker instance |
||
129 | // - error: Any error that occurred during initialization |
||
130 | func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(entityID, ct string), config BulkCheckerConfig) (*BulkChecker, error) { |
||
131 | // Validate all required parameters |
||
132 | if ctx == nil { |
||
133 | return nil, fmt.Errorf("context cannot be nil") |
||
134 | } |
||
135 | if checker == nil { |
||
136 | return nil, fmt.Errorf("checker cannot be nil") |
||
137 | } |
||
138 | if callback == nil { |
||
139 | return nil, fmt.Errorf("callback cannot be nil") |
||
140 | } |
||
141 | |||
142 | // Apply default values for invalid configuration |
||
143 | if config.ConcurrencyLimit <= 0 { |
||
144 | config.ConcurrencyLimit = DefaultBulkCheckerConfig().ConcurrencyLimit |
||
145 | } |
||
146 | if config.BufferSize <= 0 { |
||
147 | config.BufferSize = DefaultBulkCheckerConfig().BufferSize |
||
148 | } |
||
149 | |||
150 | // Create a cancellable context for the BulkChecker |
||
151 | ctx, cancel := context.WithCancel(ctx) |
||
152 | |||
153 | // Initialize the BulkChecker with all components |
||
154 | bc := &BulkChecker{ |
||
155 | typ: typ, |
||
156 | checker: checker, |
||
157 | config: config, |
||
158 | ctx: ctx, |
||
159 | cancel: cancel, |
||
160 | requestChan: make(chan BulkCheckerRequest, config.BufferSize), |
||
161 | requests: make([]BulkCheckerRequest, 0, config.BufferSize), |
||
162 | callback: callback, |
||
163 | collectionDone: make(chan struct{}), |
||
164 | } |
||
165 | |||
166 | // Start the background request collection goroutine |
||
167 | go bc.collectRequests() |
||
168 | |||
169 | return bc, nil |
||
170 | } |
||
171 | |||
172 | // collectRequests safely collects requests until the channel is closed. |
||
173 | // This method runs in a separate goroutine and continuously processes |
||
174 | // incoming requests until either the channel is closed or the context is cancelled. |
||
175 | // It ensures thread-safe addition of requests to the internal collection. |
||
176 | func (bc *BulkChecker) collectRequests() { |
||
177 | // Signal completion when this goroutine exits |
||
178 | defer close(bc.collectionDone) |
||
179 | |||
180 | for { |
||
181 | select { |
||
182 | case req, ok := <-bc.requestChan: |
||
183 | if !ok { |
||
184 | // Channel closed, stop collecting |
||
185 | return |
||
186 | } |
||
187 | bc.addRequest(req) |
||
188 | case <-bc.ctx.Done(): |
||
189 | // Context cancelled, stop collecting |
||
190 | return |
||
191 | } |
||
192 | } |
||
193 | } |
||
194 | |||
195 | // addRequest safely adds a request to the internal list. |
||
196 | // This method uses a mutex to ensure thread-safe access to the requests slice, |
||
197 | // preventing race conditions when multiple goroutines are adding requests. |
||
198 | func (bc *BulkChecker) addRequest(req BulkCheckerRequest) { |
||
199 | bc.requestsMu.Lock() |
||
200 | defer bc.requestsMu.Unlock() |
||
201 | bc.requests = append(bc.requests, req) |
||
202 | } |
||
203 | |||
204 | // StopCollectingRequests safely stops request collection and waits for completion. |
||
205 | // This method closes the input channel and waits for the collection goroutine |
||
206 | // to finish processing any remaining requests. This ensures that no requests |
||
207 | // are lost during shutdown. |
||
208 | func (bc *BulkChecker) StopCollectingRequests() { |
||
209 | close(bc.requestChan) |
||
210 | <-bc.collectionDone // Wait for collection to complete |
||
211 | } |
||
212 | |||
213 | // getSortedRequests returns a sorted copy of requests based on the checker type. |
||
214 | // This method creates a copy of the requests to avoid modifying the original |
||
215 | // collection and sorts them according to the BulkCheckerType (entity ID or subject ID). |
||
216 | // The sorting ensures consistent and predictable result ordering. |
||
217 | func (bc *BulkChecker) getSortedRequests() []BulkCheckerRequest { |
||
218 | bc.requestsMu.RLock() |
||
219 | defer bc.requestsMu.RUnlock() |
||
220 | |||
221 | // Create a copy to avoid modifying the original |
||
222 | requests := make([]BulkCheckerRequest, len(bc.requests)) |
||
223 | copy(requests, bc.requests) |
||
224 | |||
225 | // Sort the copy based on the checker type |
||
226 | bc.sortRequests(requests) |
||
227 | return requests |
||
228 | } |
||
229 | |||
230 | // sortRequests sorts requests based on the checker type. |
||
231 | // This method implements different sorting strategies: |
||
232 | // - For entity-based checks: sorts by entity ID |
||
233 | // - For subject-based checks: sorts by subject ID |
||
234 | // The sorting ensures that results are processed in a consistent order. |
||
235 | func (bc *BulkChecker) sortRequests(requests []BulkCheckerRequest) { |
||
236 | switch bc.typ { |
||
237 | case BulkCheckerTypeEntity: |
||
238 | sort.Slice(requests, func(i, j int) bool { |
||
239 | return requests[i].Request.GetEntity().GetId() < requests[j].Request.GetEntity().GetId() |
||
240 | }) |
||
241 | case BulkCheckerTypeSubject: |
||
242 | sort.Slice(requests, func(i, j int) bool { |
||
243 | return requests[i].Request.GetSubject().GetId() < requests[j].Request.GetSubject().GetId() |
||
244 | }) |
||
245 | } |
||
246 | } |
||
247 | |||
248 | // ExecuteRequests processes requests concurrently with comprehensive error handling and resource management. |
||
249 | // This method is the main entry point for bulk permission checking. It: |
||
250 | // 1. Stops collecting new requests |
||
251 | // 2. Sorts all collected requests |
||
252 | // 3. Processes them concurrently with controlled parallelism |
||
253 | // 4. Maintains strict ordering of results |
||
254 | // 5. Handles errors gracefully and manages resources properly |
||
255 | // |
||
256 | // Parameters: |
||
257 | // - size: The maximum number of successful results to process |
||
258 | // |
||
259 | // Returns: |
||
260 | // - error: Any error that occurred during processing (context cancellation is not considered an error) |
||
261 | func (bc *BulkChecker) ExecuteRequests(size uint32) error { |
||
262 | if size == 0 { |
||
263 | return fmt.Errorf("size must be greater than 0") |
||
264 | } |
||
265 | |||
266 | // Stop collecting new requests and wait for collection to complete |
||
267 | bc.StopCollectingRequests() |
||
268 | |||
269 | // Get sorted requests for processing |
||
270 | requests := bc.getSortedRequests() |
||
271 | if len(requests) == 0 { |
||
272 | return nil // No requests to process |
||
273 | } |
||
274 | |||
275 | // Initialize execution state for tracking progress |
||
276 | bc.executionState = &executionState{ |
||
277 | results: make([]base.CheckResult, len(requests)), |
||
278 | limit: int64(size), |
||
279 | } |
||
280 | |||
281 | // Create execution context with cancellation for graceful shutdown |
||
282 | execCtx, execCancel := context.WithCancel(bc.ctx) |
||
283 | defer execCancel() |
||
284 | |||
285 | // Create semaphore to control concurrency |
||
286 | sem := semaphore.NewWeighted(int64(bc.config.ConcurrencyLimit)) |
||
287 | |||
288 | // Create error group for managing goroutines and error propagation |
||
289 | g, ctx := errgroup.WithContext(execCtx) |
||
290 | |||
291 | // Process requests concurrently |
||
292 | for i, req := range requests { |
||
293 | // Check if we've reached the success limit |
||
294 | if atomic.LoadInt64(&bc.executionState.successCount) >= int64(size) { |
||
295 | break |
||
296 | } |
||
297 | |||
298 | index := i |
||
299 | request := req |
||
300 | |||
301 | // Launch goroutine for each request |
||
302 | g.Go(func() error { |
||
303 | return bc.processRequest(ctx, sem, index, request) |
||
304 | }) |
||
305 | } |
||
306 | |||
307 | // Wait for all goroutines to complete and handle any errors |
||
308 | if err := g.Wait(); err != nil { |
||
309 | if isContextError(err) { |
||
310 | return nil // Context cancellation is not an error |
||
311 | } |
||
312 | return fmt.Errorf("bulk execution failed: %w", err) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
313 | } |
||
314 | |||
315 | return nil |
||
316 | } |
||
317 | |||
318 | // processRequest handles a single request with comprehensive error handling. |
||
319 | // This method is executed in a separate goroutine for each request and: |
||
320 | // 1. Acquires a semaphore slot to control concurrency |
||
321 | // 2. Performs the permission check or uses pre-computed result |
||
322 | // 3. Processes the result in the correct order |
||
323 | // 4. Handles context cancellation and other errors gracefully |
||
324 | // |
||
325 | // Parameters: |
||
326 | // - ctx: Context for cancellation and timeout |
||
327 | // - sem: Semaphore for concurrency control |
||
328 | // - index: The index of this request in the sorted list |
||
329 | // - req: The permission check request to process |
||
330 | // |
||
331 | // Returns: |
||
332 | // - error: Any error that occurred during processing |
||
333 | func (bc *BulkChecker) processRequest(ctx context.Context, sem *semaphore.Weighted, index int, req BulkCheckerRequest) error { |
||
334 | // Check context before acquiring semaphore |
||
335 | if err := ctx.Err(); err != nil { |
||
336 | return nil |
||
337 | } |
||
338 | |||
339 | // Acquire semaphore slot to control concurrency |
||
340 | if err := sem.Acquire(ctx, 1); err != nil { |
||
341 | if isContextError(err) { |
||
342 | return nil |
||
343 | } |
||
344 | return fmt.Errorf("failed to acquire semaphore: %w", err) |
||
0 ignored issues
–
show
|
|||
345 | } |
||
346 | defer sem.Release(1) |
||
347 | |||
348 | // Determine the result for this request |
||
349 | result, err := bc.getRequestResult(ctx, req) |
||
350 | if err != nil { |
||
351 | if isContextError(err) { |
||
352 | return nil |
||
353 | } |
||
354 | return fmt.Errorf("failed to get request result: %w", err) |
||
0 ignored issues
–
show
|
|||
355 | } |
||
356 | |||
357 | // Process the result in the correct order |
||
358 | return bc.processResult(index, result) |
||
359 | } |
||
360 | |||
361 | // getRequestResult determines the result for a request. |
||
362 | // This method either uses a pre-computed result if available, |
||
363 | // or performs the actual permission check using the underlying checker. |
||
364 | // |
||
365 | // Parameters: |
||
366 | // - ctx: Context for the permission check |
||
367 | // - req: The request to get the result for |
||
368 | // |
||
369 | // Returns: |
||
370 | // - base.CheckResult: The result of the permission check |
||
371 | // - error: Any error that occurred during the check |
||
372 | func (bc *BulkChecker) getRequestResult(ctx context.Context, req BulkCheckerRequest) (base.CheckResult, error) { |
||
373 | // Use pre-computed result if available |
||
374 | if req.Result != base.CheckResult_CHECK_RESULT_UNSPECIFIED { |
||
375 | return req.Result, nil |
||
376 | } |
||
377 | |||
378 | // Perform the actual permission check |
||
379 | response, err := bc.checker.Check(ctx, req.Request) |
||
380 | if err != nil { |
||
381 | return base.CheckResult_CHECK_RESULT_UNSPECIFIED, err |
||
382 | } |
||
383 | |||
384 | return response.GetCan(), nil |
||
385 | } |
||
386 | |||
387 | // processResult processes a single result with thread-safe state updates. |
||
388 | // This method ensures that results are processed in the correct order |
||
389 | // even when requests complete out of order due to concurrent processing. |
||
390 | // It maintains the execution state and calls the callback for successful results. |
||
391 | // |
||
392 | // Parameters: |
||
393 | // - index: The index of the result in the sorted list |
||
394 | // - result: The result of the permission check |
||
395 | // |
||
396 | // Returns: |
||
397 | // - error: Any error that occurred during processing |
||
398 | func (bc *BulkChecker) processResult(index int, result base.CheckResult) error { |
||
399 | bc.executionState.mu.Lock() |
||
400 | defer bc.executionState.mu.Unlock() |
||
401 | |||
402 | // Store the result at the correct index |
||
403 | bc.executionState.results[index] = result |
||
404 | |||
405 | // Process results in order, starting from the current processed index |
||
406 | for bc.executionState.processedIndex < len(bc.executionState.results) { |
||
407 | currentResult := bc.executionState.results[bc.executionState.processedIndex] |
||
408 | if currentResult == base.CheckResult_CHECK_RESULT_UNSPECIFIED { |
||
409 | break // Wait for this result to be computed |
||
410 | } |
||
411 | |||
412 | // Process successful results |
||
413 | if currentResult == base.CheckResult_CHECK_RESULT_ALLOWED { |
||
414 | // Check if we've reached the success limit |
||
415 | if atomic.LoadInt64(&bc.executionState.successCount) >= bc.executionState.limit { |
||
416 | return nil |
||
417 | } |
||
418 | |||
419 | // Increment success count and call callback |
||
420 | atomic.AddInt64(&bc.executionState.successCount, 1) |
||
421 | bc.callbackWithToken(bc.executionState.processedIndex) |
||
422 | } |
||
423 | |||
424 | // Move to the next result |
||
425 | bc.executionState.processedIndex++ |
||
426 | } |
||
427 | |||
428 | return nil |
||
429 | } |
||
430 | |||
431 | // callbackWithToken calls the callback with the appropriate entity/subject ID and continuous token. |
||
432 | // This method retrieves the correct request from the sorted list and generates |
||
433 | // the appropriate continuous token for pagination support. |
||
434 | // |
||
435 | // Parameters: |
||
436 | // - index: The index of the result in the sorted list |
||
437 | func (bc *BulkChecker) callbackWithToken(index int) { |
||
438 | requests := bc.getSortedRequests() |
||
439 | |||
440 | // Validate index bounds |
||
441 | if index >= len(requests) { |
||
442 | return |
||
443 | } |
||
444 | |||
445 | var id string |
||
446 | var ct string |
||
447 | |||
448 | // Extract ID and generate continuous token based on checker type |
||
449 | switch bc.typ { |
||
450 | case BulkCheckerTypeEntity: |
||
451 | id = requests[index].Request.GetEntity().GetId() |
||
452 | if index+1 < len(requests) { |
||
453 | ct = utils.NewContinuousToken(requests[index+1].Request.GetEntity().GetId()).Encode().String() |
||
454 | } |
||
455 | case BulkCheckerTypeSubject: |
||
456 | id = requests[index].Request.GetSubject().GetId() |
||
457 | if index+1 < len(requests) { |
||
458 | ct = utils.NewContinuousToken(requests[index+1].Request.GetSubject().GetId()).Encode().String() |
||
459 | } |
||
460 | } |
||
461 | |||
462 | // Call the user-provided callback |
||
463 | bc.callback(id, ct) |
||
464 | } |
||
465 | |||
466 | // Close properly cleans up resources and cancels all operations. |
||
467 | // This method should be called when the BulkChecker is no longer needed |
||
468 | // to ensure proper resource cleanup and prevent goroutine leaks. |
||
469 | // |
||
470 | // Returns: |
||
471 | // - error: Any error that occurred during cleanup |
||
472 | func (bc *BulkChecker) Close() error { |
||
473 | bc.cancel() |
||
474 | return nil |
||
475 | } |
||
476 | |||
477 | // BulkEntityPublisher handles entity-based permission check publishing. |
||
478 | // This struct provides a convenient interface for publishing entity permission |
||
479 | // check requests to a BulkChecker instance. |
||
480 | type BulkEntityPublisher struct { |
||
481 | // bulkChecker is the target BulkChecker for publishing requests |
||
482 | bulkChecker *BulkChecker |
||
483 | // request contains the base lookup request parameters |
||
484 | request *base.PermissionLookupEntityRequest |
||
485 | } |
||
486 | |||
487 | // NewBulkEntityPublisher creates a new BulkEntityPublisher instance. |
||
488 | // This constructor initializes a publisher for entity-based permission checks. |
||
489 | // |
||
490 | // Parameters: |
||
491 | // - ctx: Context for the publisher (currently unused but kept for API consistency) |
||
492 | // - request: The base lookup request containing common parameters |
||
493 | // - bulkChecker: The BulkChecker instance to publish to |
||
494 | // |
||
495 | // Returns: |
||
496 | // - *BulkEntityPublisher: The initialized publisher instance |
||
497 | func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher { |
||
498 | return &BulkEntityPublisher{ |
||
499 | bulkChecker: bulkChecker, |
||
500 | request: request, |
||
501 | } |
||
502 | } |
||
503 | |||
504 | // Publish sends an entity permission check request to the bulk checker. |
||
505 | // This method creates a permission check request from the provided parameters |
||
506 | // and sends it to the BulkChecker for processing. It handles context cancellation |
||
507 | // gracefully by dropping requests when the context is done. |
||
508 | // |
||
509 | // Parameters: |
||
510 | // - entity: The entity to check permissions for |
||
511 | // - metadata: Metadata for the permission check request |
||
512 | // - context: Additional context for the permission check |
||
513 | // - result: Optional pre-computed result |
||
514 | func (p *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) { |
||
515 | select { |
||
516 | case p.bulkChecker.requestChan <- BulkCheckerRequest{ |
||
517 | Request: &base.PermissionCheckRequest{ |
||
518 | TenantId: p.request.GetTenantId(), |
||
519 | Metadata: metadata, |
||
520 | Entity: entity, |
||
521 | Permission: p.request.GetPermission(), |
||
522 | Subject: p.request.GetSubject(), |
||
523 | Context: context, |
||
524 | }, |
||
525 | Result: result, |
||
526 | }: |
||
527 | case <-p.bulkChecker.ctx.Done(): |
||
528 | // Context cancelled, drop the request |
||
529 | } |
||
530 | } |
||
531 | |||
532 | // BulkSubjectPublisher handles subject-based permission check publishing. |
||
533 | // This struct provides a convenient interface for publishing subject permission |
||
534 | // check requests to a BulkChecker instance. |
||
535 | type BulkSubjectPublisher struct { |
||
536 | // bulkChecker is the target BulkChecker for publishing requests |
||
537 | bulkChecker *BulkChecker |
||
538 | // request contains the base lookup request parameters |
||
539 | request *base.PermissionLookupSubjectRequest |
||
540 | } |
||
541 | |||
542 | // NewBulkSubjectPublisher creates a new BulkSubjectPublisher instance. |
||
543 | // This constructor initializes a publisher for subject-based permission checks. |
||
544 | // |
||
545 | // Parameters: |
||
546 | // - ctx: Context for the publisher (currently unused but kept for API consistency) |
||
547 | // - request: The base lookup request containing common parameters |
||
548 | // - bulkChecker: The BulkChecker instance to publish to |
||
549 | // |
||
550 | // Returns: |
||
551 | // - *BulkSubjectPublisher: The initialized publisher instance |
||
552 | func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher { |
||
553 | return &BulkSubjectPublisher{ |
||
554 | bulkChecker: bulkChecker, |
||
555 | request: request, |
||
556 | } |
||
557 | } |
||
558 | |||
559 | // Publish sends a subject permission check request to the bulk checker. |
||
560 | // This method creates a permission check request from the provided parameters |
||
561 | // and sends it to the BulkChecker for processing. It handles context cancellation |
||
562 | // gracefully by dropping requests when the context is done. |
||
563 | // |
||
564 | // Parameters: |
||
565 | // - subject: The subject to check permissions for |
||
566 | // - metadata: Metadata for the permission check request |
||
567 | // - context: Additional context for the permission check |
||
568 | // - result: Optional pre-computed result |
||
569 | func (p *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) { |
||
570 | select { |
||
571 | case p.bulkChecker.requestChan <- BulkCheckerRequest{ |
||
572 | Request: &base.PermissionCheckRequest{ |
||
573 | TenantId: p.request.GetTenantId(), |
||
574 | Metadata: metadata, |
||
575 | Entity: p.request.GetEntity(), |
||
576 | Permission: p.request.GetPermission(), |
||
577 | Subject: subject, |
||
578 | Context: context, |
||
579 | }, |
||
580 | Result: result, |
||
581 | }: |
||
582 | case <-p.bulkChecker.ctx.Done(): |
||
583 | // Context cancelled, drop the request |
||
584 | } |
||
585 | } |
||
586 | |||
587 | // isContextError checks if an error is related to context cancellation or timeout. |
||
588 | // This helper function centralizes the logic for identifying context-related errors |
||
589 | // that should not be treated as actual errors in the bulk processing system. |
||
590 | // |
||
591 | // Parameters: |
||
592 | // - err: The error to check |
||
593 | // |
||
594 | // Returns: |
||
595 | // - bool: True if the error is context-related, false otherwise |
||
596 | func isContextError(err error) bool { |
||
597 | return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) |
||
598 | } |
||
599 | |||
600 | // IsContextRelatedError is a legacy function maintained for backward compatibility. |
||
601 | // This function provides the same functionality as isContextError but with |
||
602 | // the original signature to maintain compatibility with existing code. |
||
603 | // |
||
604 | // Parameters: |
||
605 | // - ctx: Context (unused, kept for compatibility) |
||
606 | // - err: The error to check |
||
607 | // |
||
608 | // Returns: |
||
609 | // - bool: True if the error is context-related, false otherwise |
||
610 | func IsContextRelatedError(ctx context.Context, err error) bool { |
||
611 | return isContextError(err) |
||
612 | } |
||
613 |