Passed
Push — master ( 8c3de8...fb82f9 )
by Tolga
01:29 queued 14s
created

engines.isContextError   A

Complexity

Conditions 2

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package engines
2
3
import (
4
	"context"
5
	"fmt"
6
	"sort"
7
	"sync"
8
	"sync/atomic"
9
10
	"github.com/pkg/errors"
11
12
	"golang.org/x/sync/errgroup"
13
	"golang.org/x/sync/semaphore"
14
15
	"github.com/Permify/permify/internal/invoke"
16
	"github.com/Permify/permify/internal/storage/memory/utils"
17
	base "github.com/Permify/permify/pkg/pb/base/v1"
18
)
19
20
// BulkCheckerType defines the type of bulk checking operation.
21
// This enum determines how requests are sorted and processed.
22
type BulkCheckerType string
23
24
const (
25
	// BulkCheckerTypeSubject indicates that requests should be sorted and processed by subject ID
26
	BulkCheckerTypeSubject BulkCheckerType = "subject"
27
	// BulkCheckerTypeEntity indicates that requests should be sorted and processed by entity ID
28
	BulkCheckerTypeEntity BulkCheckerType = "entity"
29
)
30
31
// BulkCheckerRequest represents a permission check request with optional pre-computed result.
32
// This struct encapsulates both the permission check request and an optional pre-determined result,
33
// allowing for optimization when results are already known (e.g., from caching).
34
type BulkCheckerRequest struct {
35
	// Request contains the actual permission check request
36
	Request *base.PermissionCheckRequest
37
	// Result holds a pre-computed result if available, otherwise CHECK_RESULT_UNSPECIFIED
38
	Result base.CheckResult
39
}
40
41
// BulkCheckerConfig holds configuration parameters for the BulkChecker.
42
// This struct allows for fine-tuning the behavior and performance characteristics
43
// of the bulk permission checking system.
44
type BulkCheckerConfig struct {
45
	// ConcurrencyLimit defines the maximum number of concurrent permission checks
46
	// that can be processed simultaneously. Higher values increase throughput
47
	// but may consume more system resources.
48
	ConcurrencyLimit int
49
	// BufferSize defines the size of the internal request buffer.
50
	// This should be set based on expected request volume to avoid blocking.
51
	BufferSize int
52
}
53
54
// DefaultBulkCheckerConfig returns a sensible default configuration
55
// that balances performance and resource usage for most use cases.
56
func DefaultBulkCheckerConfig() BulkCheckerConfig {
57
	return BulkCheckerConfig{
58
		ConcurrencyLimit: 10,   // Moderate concurrency for most workloads
59
		BufferSize:       1000, // Buffer for 1000 requests
60
	}
61
}
62
63
// BulkChecker handles concurrent permission checks with ordered result processing.
64
// This struct implements a high-performance bulk permission checking system that:
65
// - Collects permission check requests asynchronously
66
// - Processes them concurrently with controlled parallelism
67
// - Maintains strict ordering of results based on request sorting
68
// - Provides efficient resource management and error handling
69
type BulkChecker struct {
70
	// typ determines the sorting strategy and processing behavior
71
	typ BulkCheckerType
72
	// checker is the underlying permission checking engine
73
	checker invoke.Check
74
	// config holds the operational configuration
75
	config BulkCheckerConfig
76
	// ctx provides context for cancellation and timeout management
77
	ctx context.Context
78
	// cancel allows for graceful shutdown of all operations
79
	cancel context.CancelFunc
80
81
	// Request handling components
82
	// requestChan is the input channel for receiving permission check requests
83
	requestChan chan BulkCheckerRequest
84
	// requests stores all collected requests before processing
85
	requests []BulkCheckerRequest
86
	// requestsMu provides thread-safe access to the requests slice
87
	requestsMu sync.RWMutex
88
89
	// Execution state management
90
	// executionState tracks the progress and results of request processing
91
	executionState *executionState
92
	// collectionDone signals when request collection has completed
93
	collectionDone chan struct{}
94
95
	// Callback for processing results
96
	// callback is invoked for each successful permission check with the entity/subject ID and continuous token
97
	callback func(entityID, continuousToken string)
98
}
99
100
// executionState manages the execution of requests and maintains processing order.
101
// This struct ensures that results are processed in the correct sequence
102
// even when requests complete out of order due to concurrent processing.
103
type executionState struct {
104
	// mu protects access to the execution state
105
	mu sync.Mutex
106
	// results stores the results of all requests in their original order
107
	results []base.CheckResult
108
	// processedIndex tracks the next result to be processed in order
109
	processedIndex int
110
	// successCount tracks the number of successful permission checks
111
	successCount int64
112
	// limit defines the maximum number of successful results to process
113
	limit int64
114
}
115
116
// NewBulkChecker creates a new BulkChecker instance with comprehensive validation and error handling.
117
// This constructor ensures that all dependencies are properly initialized and validates
118
// configuration parameters to prevent runtime errors.
119
//
120
// Parameters:
121
//   - ctx: Context for managing the lifecycle of the BulkChecker
122
//   - checker: The permission checking engine to use for actual permission checks
123
//   - typ: The type of bulk checking operation (entity or subject)
124
//   - callback: Function called for each successful permission check
125
//   - config: Configuration parameters for tuning performance and behavior
126
//
127
// Returns:
128
//   - *BulkChecker: The initialized BulkChecker instance
129
//   - error: Any error that occurred during initialization
130
func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(entityID, ct string), config BulkCheckerConfig) (*BulkChecker, error) {
131
	// Validate all required parameters
132
	if ctx == nil {
133
		return nil, fmt.Errorf("context cannot be nil")
134
	}
135
	if checker == nil {
136
		return nil, fmt.Errorf("checker cannot be nil")
137
	}
138
	if callback == nil {
139
		return nil, fmt.Errorf("callback cannot be nil")
140
	}
141
142
	// Apply default values for invalid configuration
143
	if config.ConcurrencyLimit <= 0 {
144
		config.ConcurrencyLimit = DefaultBulkCheckerConfig().ConcurrencyLimit
145
	}
146
	if config.BufferSize <= 0 {
147
		config.BufferSize = DefaultBulkCheckerConfig().BufferSize
148
	}
149
150
	// Create a cancellable context for the BulkChecker
151
	ctx, cancel := context.WithCancel(ctx)
152
153
	// Initialize the BulkChecker with all components
154
	bc := &BulkChecker{
155
		typ:            typ,
156
		checker:        checker,
157
		config:         config,
158
		ctx:            ctx,
159
		cancel:         cancel,
160
		requestChan:    make(chan BulkCheckerRequest, config.BufferSize),
161
		requests:       make([]BulkCheckerRequest, 0, config.BufferSize),
162
		callback:       callback,
163
		collectionDone: make(chan struct{}),
164
	}
165
166
	// Start the background request collection goroutine
167
	go bc.collectRequests()
168
169
	return bc, nil
170
}
171
172
// collectRequests safely collects requests until the channel is closed.
173
// This method runs in a separate goroutine and continuously processes
174
// incoming requests until either the channel is closed or the context is cancelled.
175
// It ensures thread-safe addition of requests to the internal collection.
176
func (bc *BulkChecker) collectRequests() {
177
	// Signal completion when this goroutine exits
178
	defer close(bc.collectionDone)
179
180
	for {
181
		select {
182
		case req, ok := <-bc.requestChan:
183
			if !ok {
184
				// Channel closed, stop collecting
185
				return
186
			}
187
			bc.addRequest(req)
188
		case <-bc.ctx.Done():
189
			// Context cancelled, stop collecting
190
			return
191
		}
192
	}
193
}
194
195
// addRequest safely adds a request to the internal list.
196
// This method uses a mutex to ensure thread-safe access to the requests slice,
197
// preventing race conditions when multiple goroutines are adding requests.
198
func (bc *BulkChecker) addRequest(req BulkCheckerRequest) {
199
	bc.requestsMu.Lock()
200
	defer bc.requestsMu.Unlock()
201
	bc.requests = append(bc.requests, req)
202
}
203
204
// StopCollectingRequests safely stops request collection and waits for completion.
205
// This method closes the input channel and waits for the collection goroutine
206
// to finish processing any remaining requests. This ensures that no requests
207
// are lost during shutdown.
208
func (bc *BulkChecker) StopCollectingRequests() {
209
	close(bc.requestChan)
210
	<-bc.collectionDone // Wait for collection to complete
211
}
212
213
// getSortedRequests returns a sorted copy of requests based on the checker type.
214
// This method creates a copy of the requests to avoid modifying the original
215
// collection and sorts them according to the BulkCheckerType (entity ID or subject ID).
216
// The sorting ensures consistent and predictable result ordering.
217
func (bc *BulkChecker) getSortedRequests() []BulkCheckerRequest {
218
	bc.requestsMu.RLock()
219
	defer bc.requestsMu.RUnlock()
220
221
	// Create a copy to avoid modifying the original
222
	requests := make([]BulkCheckerRequest, len(bc.requests))
223
	copy(requests, bc.requests)
224
225
	// Sort the copy based on the checker type
226
	bc.sortRequests(requests)
227
	return requests
228
}
229
230
// sortRequests sorts requests based on the checker type.
231
// This method implements different sorting strategies:
232
// - For entity-based checks: sorts by entity ID
233
// - For subject-based checks: sorts by subject ID
234
// The sorting ensures that results are processed in a consistent order.
235
func (bc *BulkChecker) sortRequests(requests []BulkCheckerRequest) {
236
	switch bc.typ {
237
	case BulkCheckerTypeEntity:
238
		sort.Slice(requests, func(i, j int) bool {
239
			return requests[i].Request.GetEntity().GetId() < requests[j].Request.GetEntity().GetId()
240
		})
241
	case BulkCheckerTypeSubject:
242
		sort.Slice(requests, func(i, j int) bool {
243
			return requests[i].Request.GetSubject().GetId() < requests[j].Request.GetSubject().GetId()
244
		})
245
	}
246
}
247
248
// ExecuteRequests processes requests concurrently with comprehensive error handling and resource management.
249
// This method is the main entry point for bulk permission checking. It:
250
// 1. Stops collecting new requests
251
// 2. Sorts all collected requests
252
// 3. Processes them concurrently with controlled parallelism
253
// 4. Maintains strict ordering of results
254
// 5. Handles errors gracefully and manages resources properly
255
//
256
// Parameters:
257
//   - size: The maximum number of successful results to process
258
//
259
// Returns:
260
//   - error: Any error that occurred during processing (context cancellation is not considered an error)
261
func (bc *BulkChecker) ExecuteRequests(size uint32) error {
262
	if size == 0 {
263
		return fmt.Errorf("size must be greater than 0")
264
	}
265
266
	// Stop collecting new requests and wait for collection to complete
267
	bc.StopCollectingRequests()
268
269
	// Get sorted requests for processing
270
	requests := bc.getSortedRequests()
271
	if len(requests) == 0 {
272
		return nil // No requests to process
273
	}
274
275
	// Initialize execution state for tracking progress
276
	bc.executionState = &executionState{
277
		results: make([]base.CheckResult, len(requests)),
278
		limit:   int64(size),
279
	}
280
281
	// Create execution context with cancellation for graceful shutdown
282
	execCtx, execCancel := context.WithCancel(bc.ctx)
283
	defer execCancel()
284
285
	// Create semaphore to control concurrency
286
	sem := semaphore.NewWeighted(int64(bc.config.ConcurrencyLimit))
287
288
	// Create error group for managing goroutines and error propagation
289
	g, ctx := errgroup.WithContext(execCtx)
290
291
	// Process requests concurrently
292
	for i, req := range requests {
293
		// Check if we've reached the success limit
294
		if atomic.LoadInt64(&bc.executionState.successCount) >= int64(size) {
295
			break
296
		}
297
298
		index := i
299
		request := req
300
301
		// Launch goroutine for each request
302
		g.Go(func() error {
303
			return bc.processRequest(ctx, sem, index, request)
304
		})
305
	}
306
307
	// Wait for all goroutines to complete and handle any errors
308
	if err := g.Wait(); err != nil {
309
		if isContextError(err) {
310
			return nil // Context cancellation is not an error
311
		}
312
		return fmt.Errorf("bulk execution failed: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
313
	}
314
315
	return nil
316
}
317
318
// processRequest handles a single request with comprehensive error handling.
319
// This method is executed in a separate goroutine for each request and:
320
// 1. Acquires a semaphore slot to control concurrency
321
// 2. Performs the permission check or uses pre-computed result
322
// 3. Processes the result in the correct order
323
// 4. Handles context cancellation and other errors gracefully
324
//
325
// Parameters:
326
//   - ctx: Context for cancellation and timeout
327
//   - sem: Semaphore for concurrency control
328
//   - index: The index of this request in the sorted list
329
//   - req: The permission check request to process
330
//
331
// Returns:
332
//   - error: Any error that occurred during processing
333
func (bc *BulkChecker) processRequest(ctx context.Context, sem *semaphore.Weighted, index int, req BulkCheckerRequest) error {
334
	// Check context before acquiring semaphore
335
	if err := ctx.Err(); err != nil {
336
		return nil
337
	}
338
339
	// Acquire semaphore slot to control concurrency
340
	if err := sem.Acquire(ctx, 1); err != nil {
341
		if isContextError(err) {
342
			return nil
343
		}
344
		return fmt.Errorf("failed to acquire semaphore: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
345
	}
346
	defer sem.Release(1)
347
348
	// Determine the result for this request
349
	result, err := bc.getRequestResult(ctx, req)
350
	if err != nil {
351
		if isContextError(err) {
352
			return nil
353
		}
354
		return fmt.Errorf("failed to get request result: %w", err)
0 ignored issues
show
introduced by
unrecognized printf verb 'w'
Loading history...
355
	}
356
357
	// Process the result in the correct order
358
	return bc.processResult(index, result)
359
}
360
361
// getRequestResult determines the result for a request.
362
// This method either uses a pre-computed result if available,
363
// or performs the actual permission check using the underlying checker.
364
//
365
// Parameters:
366
//   - ctx: Context for the permission check
367
//   - req: The request to get the result for
368
//
369
// Returns:
370
//   - base.CheckResult: The result of the permission check
371
//   - error: Any error that occurred during the check
372
func (bc *BulkChecker) getRequestResult(ctx context.Context, req BulkCheckerRequest) (base.CheckResult, error) {
373
	// Use pre-computed result if available
374
	if req.Result != base.CheckResult_CHECK_RESULT_UNSPECIFIED {
375
		return req.Result, nil
376
	}
377
378
	// Perform the actual permission check
379
	response, err := bc.checker.Check(ctx, req.Request)
380
	if err != nil {
381
		return base.CheckResult_CHECK_RESULT_UNSPECIFIED, err
382
	}
383
384
	return response.GetCan(), nil
385
}
386
387
// processResult processes a single result with thread-safe state updates.
388
// This method ensures that results are processed in the correct order
389
// even when requests complete out of order due to concurrent processing.
390
// It maintains the execution state and calls the callback for successful results.
391
//
392
// Parameters:
393
//   - index: The index of the result in the sorted list
394
//   - result: The result of the permission check
395
//
396
// Returns:
397
//   - error: Any error that occurred during processing
398
func (bc *BulkChecker) processResult(index int, result base.CheckResult) error {
399
	bc.executionState.mu.Lock()
400
	defer bc.executionState.mu.Unlock()
401
402
	// Store the result at the correct index
403
	bc.executionState.results[index] = result
404
405
	// Process results in order, starting from the current processed index
406
	for bc.executionState.processedIndex < len(bc.executionState.results) {
407
		currentResult := bc.executionState.results[bc.executionState.processedIndex]
408
		if currentResult == base.CheckResult_CHECK_RESULT_UNSPECIFIED {
409
			break // Wait for this result to be computed
410
		}
411
412
		// Process successful results
413
		if currentResult == base.CheckResult_CHECK_RESULT_ALLOWED {
414
			// Check if we've reached the success limit
415
			if atomic.LoadInt64(&bc.executionState.successCount) >= bc.executionState.limit {
416
				return nil
417
			}
418
419
			// Increment success count and call callback
420
			atomic.AddInt64(&bc.executionState.successCount, 1)
421
			bc.callbackWithToken(bc.executionState.processedIndex)
422
		}
423
424
		// Move to the next result
425
		bc.executionState.processedIndex++
426
	}
427
428
	return nil
429
}
430
431
// callbackWithToken calls the callback with the appropriate entity/subject ID and continuous token.
432
// This method retrieves the correct request from the sorted list and generates
433
// the appropriate continuous token for pagination support.
434
//
435
// Parameters:
436
//   - index: The index of the result in the sorted list
437
func (bc *BulkChecker) callbackWithToken(index int) {
438
	requests := bc.getSortedRequests()
439
440
	// Validate index bounds
441
	if index >= len(requests) {
442
		return
443
	}
444
445
	var id string
446
	var ct string
447
448
	// Extract ID and generate continuous token based on checker type
449
	switch bc.typ {
450
	case BulkCheckerTypeEntity:
451
		id = requests[index].Request.GetEntity().GetId()
452
		if index+1 < len(requests) {
453
			ct = utils.NewContinuousToken(requests[index+1].Request.GetEntity().GetId()).Encode().String()
454
		}
455
	case BulkCheckerTypeSubject:
456
		id = requests[index].Request.GetSubject().GetId()
457
		if index+1 < len(requests) {
458
			ct = utils.NewContinuousToken(requests[index+1].Request.GetSubject().GetId()).Encode().String()
459
		}
460
	}
461
462
	// Call the user-provided callback
463
	bc.callback(id, ct)
464
}
465
466
// Close properly cleans up resources and cancels all operations.
467
// This method should be called when the BulkChecker is no longer needed
468
// to ensure proper resource cleanup and prevent goroutine leaks.
469
//
470
// Returns:
471
//   - error: Any error that occurred during cleanup
472
func (bc *BulkChecker) Close() error {
473
	bc.cancel()
474
	return nil
475
}
476
477
// BulkEntityPublisher handles entity-based permission check publishing.
478
// This struct provides a convenient interface for publishing entity permission
479
// check requests to a BulkChecker instance.
480
type BulkEntityPublisher struct {
481
	// bulkChecker is the target BulkChecker for publishing requests
482
	bulkChecker *BulkChecker
483
	// request contains the base lookup request parameters
484
	request *base.PermissionLookupEntityRequest
485
}
486
487
// NewBulkEntityPublisher creates a new BulkEntityPublisher instance.
488
// This constructor initializes a publisher for entity-based permission checks.
489
//
490
// Parameters:
491
//   - ctx: Context for the publisher (currently unused but kept for API consistency)
492
//   - request: The base lookup request containing common parameters
493
//   - bulkChecker: The BulkChecker instance to publish to
494
//
495
// Returns:
496
//   - *BulkEntityPublisher: The initialized publisher instance
497
func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher {
498
	return &BulkEntityPublisher{
499
		bulkChecker: bulkChecker,
500
		request:     request,
501
	}
502
}
503
504
// Publish sends an entity permission check request to the bulk checker.
505
// This method creates a permission check request from the provided parameters
506
// and sends it to the BulkChecker for processing. It handles context cancellation
507
// gracefully by dropping requests when the context is done.
508
//
509
// Parameters:
510
//   - entity: The entity to check permissions for
511
//   - metadata: Metadata for the permission check request
512
//   - context: Additional context for the permission check
513
//   - result: Optional pre-computed result
514
func (p *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
515
	select {
516
	case p.bulkChecker.requestChan <- BulkCheckerRequest{
517
		Request: &base.PermissionCheckRequest{
518
			TenantId:   p.request.GetTenantId(),
519
			Metadata:   metadata,
520
			Entity:     entity,
521
			Permission: p.request.GetPermission(),
522
			Subject:    p.request.GetSubject(),
523
			Context:    context,
524
		},
525
		Result: result,
526
	}:
527
	case <-p.bulkChecker.ctx.Done():
528
		// Context cancelled, drop the request
529
	}
530
}
531
532
// BulkSubjectPublisher handles subject-based permission check publishing.
533
// This struct provides a convenient interface for publishing subject permission
534
// check requests to a BulkChecker instance.
535
type BulkSubjectPublisher struct {
536
	// bulkChecker is the target BulkChecker for publishing requests
537
	bulkChecker *BulkChecker
538
	// request contains the base lookup request parameters
539
	request *base.PermissionLookupSubjectRequest
540
}
541
542
// NewBulkSubjectPublisher creates a new BulkSubjectPublisher instance.
543
// This constructor initializes a publisher for subject-based permission checks.
544
//
545
// Parameters:
546
//   - ctx: Context for the publisher (currently unused but kept for API consistency)
547
//   - request: The base lookup request containing common parameters
548
//   - bulkChecker: The BulkChecker instance to publish to
549
//
550
// Returns:
551
//   - *BulkSubjectPublisher: The initialized publisher instance
552
func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher {
553
	return &BulkSubjectPublisher{
554
		bulkChecker: bulkChecker,
555
		request:     request,
556
	}
557
}
558
559
// Publish sends a subject permission check request to the bulk checker.
560
// This method creates a permission check request from the provided parameters
561
// and sends it to the BulkChecker for processing. It handles context cancellation
562
// gracefully by dropping requests when the context is done.
563
//
564
// Parameters:
565
//   - subject: The subject to check permissions for
566
//   - metadata: Metadata for the permission check request
567
//   - context: Additional context for the permission check
568
//   - result: Optional pre-computed result
569
func (p *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
570
	select {
571
	case p.bulkChecker.requestChan <- BulkCheckerRequest{
572
		Request: &base.PermissionCheckRequest{
573
			TenantId:   p.request.GetTenantId(),
574
			Metadata:   metadata,
575
			Entity:     p.request.GetEntity(),
576
			Permission: p.request.GetPermission(),
577
			Subject:    subject,
578
			Context:    context,
579
		},
580
		Result: result,
581
	}:
582
	case <-p.bulkChecker.ctx.Done():
583
		// Context cancelled, drop the request
584
	}
585
}
586
587
// isContextError checks if an error is related to context cancellation or timeout.
588
// This helper function centralizes the logic for identifying context-related errors
589
// that should not be treated as actual errors in the bulk processing system.
590
//
591
// Parameters:
592
//   - err: The error to check
593
//
594
// Returns:
595
//   - bool: True if the error is context-related, false otherwise
596
func isContextError(err error) bool {
597
	return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
598
}
599
600
// IsContextRelatedError is a legacy function maintained for backward compatibility.
601
// This function provides the same functionality as isContextError but with
602
// the original signature to maintain compatibility with existing code.
603
//
604
// Parameters:
605
//   - ctx: Context (unused, kept for compatibility)
606
//   - err: The error to check
607
//
608
// Returns:
609
//   - bool: True if the error is context-related, false otherwise
610
func IsContextRelatedError(ctx context.Context, err error) bool {
611
	return isContextError(err)
612
}
613