internal/engines/bulk.go   A
last analyzed

Size/Duplication

Total Lines 348
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 42
eloc 173
dl 0
loc 348
rs 9.0399
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A engines.IsContextRelatedError 0 2 2
A engines.*BulkChecker.CollectAndSortRequests 0 16 5
A engines.*BulkChecker.prepareRequestList 0 6 1
A engines.NewBulkSubjectPublisher 0 5 1
A engines.NewBulkEntityPublisher 0 5 1
A engines.*BulkChecker.sortRequests 0 8 5
A engines.*BulkEntityPublisher.Publish 0 11 1
A engines.*BulkSubjectPublisher.Publish 0 11 1
A engines.*BulkChecker.StopCollectingRequests 0 5 1
F engines.*BulkChecker.ExecuteRequests 0 119 22
A engines.NewBulkChecker 0 24 2
1
package engines
2
3
import (
4
	"context"
5
	"sort"
6
	"sync"
7
	"sync/atomic"
8
9
	"github.com/pkg/errors"
10
11
	"golang.org/x/sync/errgroup"
12
	"golang.org/x/sync/semaphore"
13
14
	"github.com/Permify/permify/internal/invoke"
15
	"github.com/Permify/permify/internal/storage/memory/utils"
16
	base "github.com/Permify/permify/pkg/pb/base/v1"
17
)
18
19
type BulkCheckerType string
20
21
const (
22
	BULK_SUBJECT BulkCheckerType = "subject"
23
	BULK_ENTITY  BulkCheckerType = "entity"
24
)
25
26
// BulkCheckerRequest is a struct for a permission check request and the channel to send the result.
27
type BulkCheckerRequest struct {
28
	Request *base.PermissionCheckRequest
29
	Result  base.CheckResult
30
}
31
32
// BulkChecker is a struct for checking permissions in bulk.
33
// It processes permission check requests concurrently and maintains a sorted list of these requests.
34
type BulkChecker struct {
35
	// typ defines the type of bulk checking being performed.
36
	// It distinguishes between different modes of operation within the BulkChecker,
37
	// such as whether the check is focused on entities, subjects, or another criterion.
38
	typ BulkCheckerType
39
40
	checker invoke.Check
41
	// RequestChan is the input queue for permission check requests.
42
	// Incoming requests are received on this channel and processed by the BulkChecker.
43
	RequestChan chan BulkCheckerRequest
44
45
	// ctx is the context used to manage goroutines and handle cancellation.
46
	// It allows for graceful shutdown of all goroutines when the context is canceled.
47
	ctx context.Context
48
49
	// g is an errgroup used for managing multiple goroutines.
50
	// It allows BulkChecker to wait for all goroutines to finish and to capture any errors they may return.
51
	g *errgroup.Group
52
53
	// concurrencyLimit is the maximum number of concurrent permission checks that can be processed at one time.
54
	// It controls the level of parallelism within the BulkChecker.
55
	concurrencyLimit int
56
57
	// callback is a function that handles the result of each permission check.
58
	// It is called with the entity ID and the result of the permission check (e.g., allowed or denied).
59
	callback func(entityID, ct string)
60
61
	// sortedList is a slice that stores BulkCheckerRequest objects.
62
	// This list is maintained in a sorted order based on some criteria, such as the entity ID.
63
	list []BulkCheckerRequest
64
65
	// mu is a mutex used for thread-safe access to the sortedList.
66
	// It ensures that only one goroutine can modify the sortedList at a time, preventing data races.
67
	mu sync.Mutex
68
69
	// wg is a WaitGroup used to coordinate the completion of request collection.
70
	// It ensures that all requests are collected and processed before ExecuteRequests begins execution.
71
	// The WaitGroup helps to synchronize the collection of requests with the execution of those requests,
72
	// preventing race conditions where the execution starts before all requests are collected.
73
	wg *sync.WaitGroup
74
}
75
76
// NewBulkChecker creates a new BulkChecker instance.
77
// ctx: context for managing goroutines and cancellation
78
// engine: the CheckEngine to use for permission checks
79
// callback: a callback function that handles the result of each permission check
80
// concurrencyLimit: the maximum number of concurrent permission checks
81
func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(entityID, ct string), concurrencyLimit int) *BulkChecker {
82
	bc := &BulkChecker{
83
		RequestChan:      make(chan BulkCheckerRequest),
84
		checker:          checker,
85
		g:                &errgroup.Group{},
86
		concurrencyLimit: concurrencyLimit,
87
		ctx:              ctx,
88
		callback:         callback,
89
		typ:              typ,
90
		wg:               &sync.WaitGroup{},
91
	}
92
93
	// Start processing requests in a separate goroutine
94
	// Use a WaitGroup to ensure all requests are collected before proceeding
95
	var wg sync.WaitGroup
96
	wg.Add(1)
97
	go func() {
98
		defer wg.Done() // Signal that the request collection is finished
99
		bc.CollectAndSortRequests()
100
	}()
101
102
	bc.wg = &wg // Store the WaitGroup for future use
103
104
	return bc
105
}
106
107
// CollectAndSortRequests processes incoming requests and maintains a sorted list.
108
func (bc *BulkChecker) CollectAndSortRequests() {
109
	for {
110
		select {
111
		case req, ok := <-bc.RequestChan:
112
			if !ok {
113
				// Channel closed, process remaining requests
114
				return
115
			}
116
117
			bc.mu.Lock()
118
			bc.list = append(bc.list, req)
119
			// Optionally, you could sort here or later in ExecuteRequests
120
			bc.mu.Unlock()
121
122
		case <-bc.ctx.Done():
123
			return
124
		}
125
	}
126
}
127
128
// prepareRequestList sorts and prepares a copy of the request list.
129
func (bc *BulkChecker) prepareRequestList() []BulkCheckerRequest {
130
	bc.mu.Lock()
131
	defer bc.mu.Unlock()
132
133
	bc.sortRequests()                                 // Sort requests based on ID
134
	return append([]BulkCheckerRequest{}, bc.list...) // Return a copy of the list to avoid modifying the original
135
}
136
137
// StopCollectingRequests Signal to stop collecting requests and close the channel
138
func (bc *BulkChecker) StopCollectingRequests() {
139
	bc.mu.Lock()
140
	defer bc.mu.Unlock()
141
	// Close the channel to signal no more requests will be sent
142
	close(bc.RequestChan)
143
}
144
145
// sortRequests sorts the sortedList based on the type (entity or subject).
146
func (bc *BulkChecker) sortRequests() {
147
	if bc.typ == BULK_ENTITY {
148
		sort.Slice(bc.list, func(i, j int) bool {
149
			return bc.list[i].Request.GetEntity().GetId() < bc.list[j].Request.GetEntity().GetId()
150
		})
151
	} else if bc.typ == BULK_SUBJECT {
152
		sort.Slice(bc.list, func(i, j int) bool {
153
			return bc.list[i].Request.GetSubject().GetId() < bc.list[j].Request.GetSubject().GetId()
154
		})
155
	}
156
}
157
158
// ExecuteRequests begins processing permission check requests from the sorted list.
159
func (bc *BulkChecker) ExecuteRequests(size uint32) error {
160
	// Stop collecting new requests and close the RequestChan to ensure no more requests are added
161
	bc.StopCollectingRequests()
162
163
	// Wait for request collection to complete before proceeding
164
	bc.wg.Wait()
165
166
	// Create a context with cancellation that will be used to stop further processing
167
	ctx, cancel := context.WithCancel(bc.ctx)
168
	defer cancel() // Ensure the context is canceled when done
169
170
	// Track the number of successful permission checks
171
	successCount := int64(0)
172
	// Semaphore to control the maximum number of concurrent permission checks
173
	sem := semaphore.NewWeighted(int64(bc.concurrencyLimit))
174
	var mu sync.Mutex
175
176
	// Sort and copy the list of requests
177
	listCopy := bc.prepareRequestList()
178
179
	// Pre-allocate a slice to store the results of the permission checks
180
	results := make([]base.CheckResult, len(listCopy))
181
	// Track the index of the last processed request to ensure results are processed in order
182
	processedIndex := 0
183
184
	// Loop through each request in the copied list
185
	for i, currentRequest := range listCopy {
186
		// If we've reached the success limit, stop processing further requests
187
		if atomic.LoadInt64(&successCount) >= int64(size) {
188
			cancel() // Cancel the context to stop further goroutines
189
			break
190
		}
191
192
		index := i
193
		req := currentRequest
194
195
		// Use errgroup to manage the goroutines, which allows for error handling and synchronization
196
		bc.g.Go(func() error {
197
			// Check if the context has been canceled, and return without error if so
198
			if err := ctx.Err(); err == context.Canceled {
199
				return nil // Gracefully exit the goroutine if context is canceled
200
			}
201
202
			// Acquire a slot in the semaphore to control concurrency
203
			if err := sem.Acquire(ctx, 1); err != nil {
204
				// Return nil instead of error if the context was canceled during semaphore acquisition
205
				if IsContextRelatedError(ctx, err) {
206
					return nil
207
				}
208
				return err // Return an error if semaphore acquisition fails for other reasons
209
			}
210
			defer sem.Release(1) // Ensure the semaphore slot is released after processing
211
212
			var result base.CheckResult
213
			if req.Result == base.CheckResult_CHECK_RESULT_UNSPECIFIED {
214
				// Perform the permission check if the result is not already specified
215
				cr, err := bc.checker.Check(ctx, req.Request)
216
				if err != nil {
217
					// Handle context cancellation error here
218
					if IsContextRelatedError(ctx, err) {
219
						return nil // Ignore the cancellation error
220
					}
221
					return err // Return the actual error if it's not due to cancellation
222
				}
223
				result = cr.GetCan() // Get the result from the check
224
			} else {
225
				// Use the already specified result
226
				result = req.Result
227
			}
228
229
			// Lock the mutex to safely update shared resources
230
			mu.Lock()
231
			results[index] = result // Store the result in the pre-allocated slice
232
233
			// Process the results in order, starting from the current processed index
234
			for processedIndex < len(listCopy) && results[processedIndex] != base.CheckResult_CHECK_RESULT_UNSPECIFIED {
235
				// If the result at the processed index is allowed, call the callback function
236
				if results[processedIndex] == base.CheckResult_CHECK_RESULT_ALLOWED {
237
					if atomic.AddInt64(&successCount, 1) <= int64(size) {
238
						ct := ""
239
						if processedIndex+1 < len(listCopy) {
240
							// If there is a next item, create a continuous token with the next ID
241
							if bc.typ == BULK_ENTITY {
242
								ct = utils.NewContinuousToken(listCopy[processedIndex+1].Request.GetEntity().GetId()).Encode().String()
243
							} else if bc.typ == BULK_SUBJECT {
244
								ct = utils.NewContinuousToken(listCopy[processedIndex+1].Request.GetSubject().GetId()).Encode().String()
245
							}
246
						}
247
						// Depending on the type of check (entity or subject), call the appropriate callback
248
						if bc.typ == BULK_ENTITY {
249
							bc.callback(listCopy[processedIndex].Request.GetEntity().GetId(), ct)
250
						} else if bc.typ == BULK_SUBJECT {
251
							bc.callback(listCopy[processedIndex].Request.GetSubject().GetId(), ct)
252
						}
253
					}
254
				}
255
				processedIndex++ // Move to the next index for processing
256
			}
257
			mu.Unlock() // Unlock the mutex after updating the results and processed index
258
259
			// If the success limit has been reached, cancel the context to stop further processing
260
			if atomic.LoadInt64(&successCount) >= int64(size) {
261
				cancel()
262
			}
263
264
			return nil // Return nil to indicate successful processing
265
		})
266
	}
267
268
	// Wait for all goroutines to complete and check for any errors
269
	if err := bc.g.Wait(); err != nil {
270
		// Ignore context cancellation as an error
271
		if IsContextRelatedError(ctx, err) {
272
			return nil
273
		}
274
		return err // Return other errors if any goroutine returned an error
275
	}
276
277
	return nil // Return nil if all processing completed successfully
278
}
279
280
// BulkEntityPublisher is a struct for streaming permission check results.
281
type BulkEntityPublisher struct {
282
	bulkChecker *BulkChecker
283
284
	request *base.PermissionLookupEntityRequest
285
	// context to manage goroutines and cancellation
286
	ctx context.Context
287
}
288
289
// NewBulkEntityPublisher creates a new BulkStreamer instance.
290
func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher {
291
	return &BulkEntityPublisher{
292
		bulkChecker: bulkChecker,
293
		request:     request,
294
		ctx:         ctx,
295
	}
296
}
297
298
// Publish publishes a permission check request to the BulkChecker.
299
func (s *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
300
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
301
		Request: &base.PermissionCheckRequest{
302
			TenantId:   s.request.GetTenantId(),
303
			Metadata:   metadata,
304
			Entity:     entity,
305
			Permission: s.request.GetPermission(),
306
			Subject:    s.request.GetSubject(),
307
			Context:    context,
308
		},
309
		Result: result,
310
	}
311
}
312
313
// BulkSubjectPublisher is a struct for streaming permission check results.
314
type BulkSubjectPublisher struct {
315
	bulkChecker *BulkChecker
316
317
	request *base.PermissionLookupSubjectRequest
318
	// context to manage goroutines and cancellation
319
	ctx context.Context
320
}
321
322
// NewBulkSubjectPublisher creates a new BulkStreamer instance.
323
func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher {
324
	return &BulkSubjectPublisher{
325
		bulkChecker: bulkChecker,
326
		request:     request,
327
		ctx:         ctx,
328
	}
329
}
330
331
// Publish publishes a permission check request to the BulkChecker.
332
func (s *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
333
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
334
		Request: &base.PermissionCheckRequest{
335
			TenantId:   s.request.GetTenantId(),
336
			Metadata:   metadata,
337
			Entity:     s.request.GetEntity(),
338
			Permission: s.request.GetPermission(),
339
			Subject:    subject,
340
			Context:    context,
341
		},
342
		Result: result,
343
	}
344
}
345
346
// IsContextRelatedError checks if the error is due to context cancellation, deadline exceedance, or closed connection
347
func IsContextRelatedError(ctx context.Context, err error) bool {
348
	return errors.Is(ctx.Err(), context.Canceled) || errors.Is(ctx.Err(), context.DeadlineExceeded)
349
}
350