Passed
Pull Request — master (#1681)
by
unknown
04:00
created

engines.*BulkChecker.StopCollectingRequests   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 0
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
package engines
2
3
import (
4
	"context"
5
	"sort"
6
	"sync"
7
	"sync/atomic"
8
9
	"golang.org/x/sync/errgroup"
10
	"golang.org/x/sync/semaphore"
11
12
	"github.com/Permify/permify/internal/invoke"
13
	"github.com/Permify/permify/internal/storage/memory/utils"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
type BulkCheckerType string
18
19
const (
20
	BULK_SUBJECT BulkCheckerType = "subject"
21
	BULK_ENTITY  BulkCheckerType = "entity"
22
	BULK_CHECK   BulkCheckerType = "check"
23
)
24
25
type BulkSubjectCallbackParams struct {
26
	subjectID string
27
	token     string
28
}
29
30
type BulkEntityCallbackParams struct {
31
	entityID string
32
	token    string
33
}
34
35
type BulkCheckCallbackParams struct {
36
	index  int
37
	result base.CheckResult
38
}
39
40
// BulkCheckerRequest is a struct for a permission check request and the channel to send the result.
41
type BulkCheckerRequest struct {
42
	Request *base.PermissionCheckRequest
43
	Result  base.CheckResult
44
}
45
46
// BulkChecker is a struct for checking permissions in bulk.
47
// It processes permission check requests concurrently and maintains a sorted list of these requests.
48
type BulkChecker struct {
49
	// typ defines the type of bulk checking being performed.
50
	// It distinguishes between different modes of operation within the BulkChecker,
51
	// such as whether the check is focused on entities, subjects, or another criterion.
52
	typ BulkCheckerType
53
54
	checker invoke.Check
55
	// RequestChan is the input queue for permission check requests.
56
	// Incoming requests are received on this channel and processed by the BulkChecker.
57
	RequestChan chan BulkCheckerRequest
58
59
	// ctx is the context used to manage goroutines and handle cancellation.
60
	// It allows for graceful shutdown of all goroutines when the context is canceled.
61
	ctx context.Context
62
63
	// g is an errgroup used for managing multiple goroutines.
64
	// It allows BulkChecker to wait for all goroutines to finish and to capture any errors they may return.
65
	g *errgroup.Group
66
67
	// concurrencyLimit is the maximum number of concurrent permission checks that can be processed at one time.
68
	// It controls the level of parallelism within the BulkChecker.
69
	concurrencyLimit int
70
71
	// callback is a function that handles the result of each permission check.
72
	// It is called with the entity ID and the result of the permission check (e.g., allowed or denied).
73
	// callback func(entityID, ct string)
74
	callback func(params ...interface{})
75
76
	// sortedList is a slice that stores BulkCheckerRequest objects.
77
	// This list is maintained in a sorted order based on some criteria, such as the entity ID.
78
	list []BulkCheckerRequest
79
80
	// mu is a mutex used for thread-safe access to the sortedList.
81
	// It ensures that only one goroutine can modify the sortedList at a time, preventing data races.
82
	mu sync.Mutex
83
84
	// wg is a WaitGroup used to coordinate the completion of request collection.
85
	// It ensures that all requests are collected and processed before ExecuteRequests begins execution.
86
	// The WaitGroup helps to synchronize the collection of requests with the execution of those requests,
87
	// preventing race conditions where the execution starts before all requests are collected.
88
	wg *sync.WaitGroup
89
}
90
91
// NewBulkChecker creates a new BulkChecker instance.
92
// ctx: context for managing goroutines and cancellation
93
// engine: the CheckEngine to use for permission checks
94
// callback: a callback function that handles the result of each permission check
95
// concurrencyLimit: the maximum number of concurrent permission checks
96
func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(params ...interface{}), concurrencyLimit int) *BulkChecker {
97
	bc := &BulkChecker{
98
		RequestChan:      make(chan BulkCheckerRequest),
99
		checker:          checker,
100
		g:                &errgroup.Group{},
101
		concurrencyLimit: concurrencyLimit,
102
		ctx:              ctx,
103
		callback:         callback,
104
		typ:              typ,
105
		wg:               &sync.WaitGroup{},
106
	}
107
108
	// Start processing requests in a separate goroutine
109
	// Use a WaitGroup to ensure all requests are collected before proceeding
110
	var wg sync.WaitGroup
111
	wg.Add(1)
112
	go func() {
113
		defer wg.Done() // Signal that the request collection is finished
114
		bc.CollectAndSortRequests()
115
	}()
116
117
	bc.wg = &wg // Store the WaitGroup for future use
118
119
	return bc
120
}
121
122
// CollectAndSortRequests processes incoming requests and maintains a sorted list.
123
func (bc *BulkChecker) CollectAndSortRequests() {
124
	for {
125
		select {
126
		case req, ok := <-bc.RequestChan:
127
			if !ok {
128
				// Channel closed, process remaining requests
129
				return
130
			}
131
132
			bc.mu.Lock()
133
			bc.list = append(bc.list, req)
134
			// Optionally, you could sort here or later in ExecuteRequests
135
			bc.mu.Unlock()
136
137
		case <-bc.ctx.Done():
138
			return
139
		}
140
	}
141
}
142
143
// StopCollectingRequests Signal to stop collecting requests and close the channel
144
func (bc *BulkChecker) StopCollectingRequests() {
145
	bc.mu.Lock()
146
	defer bc.mu.Unlock()
147
	// Close the channel to signal no more requests will be sent
148
	close(bc.RequestChan)
149
}
150
151
// sortRequests sorts the sortedList based on the type (entity or subject).
152
func (bc *BulkChecker) sortRequests() {
153
	if bc.typ == BULK_ENTITY {
154
		sort.Slice(bc.list, func(i, j int) bool {
155
			return bc.list[i].Request.GetEntity().GetId() < bc.list[j].Request.GetEntity().GetId()
156
		})
157
	} else if bc.typ == BULK_SUBJECT {
158
		sort.Slice(bc.list, func(i, j int) bool {
159
			return bc.list[i].Request.GetSubject().GetId() < bc.list[j].Request.GetSubject().GetId()
160
		})
161
	}
162
}
163
164
// processLookupResults processes the results for BULK_ENTITY or BULK_SUBJECT.
165
func (bc *BulkChecker) processLookupResults(listCopy []BulkCheckerRequest, results []base.CheckResult, processedIndex *int, successCount *int64, size uint32) {
166
	for *processedIndex < len(listCopy) && results[*processedIndex] != base.CheckResult_CHECK_RESULT_UNSPECIFIED {
167
		// If the result at the processed index is allowed, call the callback function
168
		if results[*processedIndex] == base.CheckResult_CHECK_RESULT_ALLOWED {
169
			if atomic.AddInt64(successCount, 1) <= int64(size) {
170
				ct := ""
171
				if *processedIndex+1 < len(listCopy) {
172
					// If there is a next item, create a continuous token with the next ID
173
					if bc.typ == BULK_ENTITY {
174
						ct = utils.NewContinuousToken(listCopy[*processedIndex+1].Request.GetEntity().GetId()).Encode().String()
175
					} else if bc.typ == BULK_SUBJECT {
176
						ct = utils.NewContinuousToken(listCopy[*processedIndex+1].Request.GetSubject().GetId()).Encode().String()
177
					}
178
				}
179
				// Depending on the type of check (entity or subject), call the appropriate callback
180
				if bc.typ == BULK_ENTITY {
181
					bc.callback(&BulkEntityCallbackParams{
182
						listCopy[*processedIndex].Request.GetEntity().GetId(),
183
						ct,
184
					})
185
				} else if bc.typ == BULK_SUBJECT {
186
					bc.callback(&BulkSubjectCallbackParams{
187
						listCopy[*processedIndex].Request.GetSubject().GetId(),
188
						ct,
189
					})
190
				}
191
			}
192
		}
193
		*processedIndex++ // Move to the next index for processing
194
	}
195
}
196
197
// ExecuteRequests begins processing permission check requests from the sorted list.
198
func (bc *BulkChecker) ExecuteRequests(size uint32) error {
199
	// Stop collecting new requests and close the RequestChan to ensure no more requests are added
200
	bc.StopCollectingRequests()
201
202
	// Wait for request collection to complete before proceeding
203
	bc.wg.Wait()
204
205
	// Track the number of successful permission checks
206
	successCount := int64(0)
207
	// Semaphore to control the maximum number of concurrent permission checks
208
	sem := semaphore.NewWeighted(int64(bc.concurrencyLimit))
209
	var mu sync.Mutex
210
211
	// Lock the mutex to prevent race conditions while sorting and copying the list of requests
212
	bc.mu.Lock()
213
	bc.sortRequests()                                      // Sort requests based on id
214
	listCopy := append([]BulkCheckerRequest{}, bc.list...) // Create a copy of the list to avoid modifying the original during processing
215
	bc.mu.Unlock()                                         // Unlock the mutex after sorting and copying
216
217
	// Pre-allocate a slice to store the results of the permission checks
218
	results := make([]base.CheckResult, len(listCopy))
219
	// Track the index of the last processed request to ensure results are processed in order
220
	processedIndex := 0
221
	// Loop through each request in the copied list
222
	for i, currentRequest := range listCopy {
223
		// If we've reached the success limit, stop processing further requests
224
		if atomic.LoadInt64(&successCount) >= int64(size) {
225
			break
226
		}
227
228
		index := i
229
		req := currentRequest
230
		// Use errgroup to manage the goroutines, which allows for error handling and synchronization
231
		bc.g.Go(func() error {
232
			// Acquire a slot in the semaphore to control concurrency
233
			if err := sem.Acquire(bc.ctx, 1); err != nil {
234
				return err // Return an error if semaphore acquisition fails
235
			}
236
			defer sem.Release(1) // Ensure the semaphore slot is released after processing
237
238
			var result base.CheckResult
239
			if req.Result == base.CheckResult_CHECK_RESULT_UNSPECIFIED {
240
				// Perform the permission check if the result is not already specified
241
242
				cr, err := bc.checker.Check(bc.ctx, req.Request)
243
				if err != nil {
244
					return err // Return an error if the check fails
245
				}
246
247
				result = cr.GetCan() // Get the result from the check
248
			} else {
249
				// Use the already specified result
250
				result = req.Result
251
			}
252
253
			// Lock the mutex to safely update shared resources
254
			mu.Lock()
255
256
			results[index] = result // Store the result in the pre-allocated slice
257
258
			if bc.typ == BULK_CHECK {
259
				bc.callback(&BulkCheckCallbackParams{i, result})
260
			} else {
261
				// Process the results in order, starting from the current processed index
262
				bc.processLookupResults(listCopy, results, &processedIndex, &successCount, size)
263
			}
264
265
			mu.Unlock() // Unlock the mutex after updating the results and processed index
266
267
			return nil // Return nil to indicate successful processing
268
		})
269
	}
270
271
	// Wait for all goroutines to complete and check for any errors
272
	if err := bc.g.Wait(); err != nil {
273
		return err // Return the error if any goroutine returned an error
274
	}
275
276
	return nil // Return nil if all processing completed successfully
277
}
278
279
// BulkEntityPublisher is a struct for streaming permission check results.
280
type BulkEntityPublisher struct {
281
	bulkChecker *BulkChecker
282
283
	request *base.PermissionLookupEntityRequest
284
	// context to manage goroutines and cancellation
285
	ctx context.Context
286
}
287
288
// NewBulkEntityPublisher creates a new BulkStreamer instance.
289
func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher {
290
	return &BulkEntityPublisher{
291
		bulkChecker: bulkChecker,
292
		request:     request,
293
		ctx:         ctx,
294
	}
295
}
296
297
// Publish publishes a permission check request to the BulkChecker.
298
func (s *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
299
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
300
		Request: &base.PermissionCheckRequest{
301
			TenantId:   s.request.GetTenantId(),
302
			Metadata:   metadata,
303
			Entity:     entity,
304
			Permission: s.request.GetPermission(),
305
			Subject:    s.request.GetSubject(),
306
			Context:    context,
307
		},
308
		Result: result,
309
	}
310
}
311
312
// BulkSubjectPublisher is a struct for streaming permission check results.
313
type BulkSubjectPublisher struct {
314
	bulkChecker *BulkChecker
315
316
	request *base.PermissionLookupSubjectRequest
317
	// context to manage goroutines and cancellation
318
	ctx context.Context
319
}
320
321
// NewBulkSubjectPublisher creates a new BulkStreamer instance.
322
func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher {
323
	return &BulkSubjectPublisher{
324
		bulkChecker: bulkChecker,
325
		request:     request,
326
		ctx:         ctx,
327
	}
328
}
329
330
// Publish publishes a permission check request to the BulkChecker.
331
func (s *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
332
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
333
		Request: &base.PermissionCheckRequest{
334
			TenantId:   s.request.GetTenantId(),
335
			Metadata:   metadata,
336
			Entity:     s.request.GetEntity(),
337
			Permission: s.request.GetPermission(),
338
			Subject:    subject,
339
			Context:    context,
340
		},
341
		Result: result,
342
	}
343
}
344
345
// BulkCheckPublisher is a struct for streaming permission check results.
346
type BulkCheckPublisher struct {
347
	bulkChecker *BulkChecker
348
349
	request *base.BulkPermissionCheckRequest
350
	// context to manage goroutines and cancellation
351
	ctx context.Context
352
}
353
354
// NewBulkCheckPublisher creates a new BulkStreamer instance.
355
func NewBulkCheckPublisher(ctx context.Context, request *base.BulkPermissionCheckRequest, bulkChecker *BulkChecker) *BulkCheckPublisher {
356
	return &BulkCheckPublisher{
357
		bulkChecker: bulkChecker,
358
		request:     request,
359
		ctx:         ctx,
360
	}
361
}
362
363
// Publish publishes a permission check request to the BulkChecker.
364
func (s *BulkCheckPublisher) Publish(result base.CheckResult) {
365
	// Loop through all mapping requests inside s.request.Requests
366
	for _, req := range s.request.Checks {
367
		s.bulkChecker.RequestChan <- BulkCheckerRequest{
368
			Request: &base.PermissionCheckRequest{
369
				TenantId: s.request.GetTenantId(), // Manually set tenantID from the main request
370
				Metadata: &base.PermissionCheckRequestMetadata{
371
					SnapToken:     req.GetMetadata().GetSnapToken(),
372
					SchemaVersion: req.GetMetadata().GetSchemaVersion(),
373
					Depth:         req.GetMetadata().GetDepth(),
374
				},
375
				Entity:     req.GetEntity(),     // Use the specific request's entity
376
				Permission: req.GetPermission(), // Use the specific request's permission
377
				Subject:    req.GetSubject(),    // The subject passed into the function
378
				Context:    req.GetContext(),    // The context passed into the function
379
			},
380
			Result: result, // Pass the result into the bulk checker
381
		}
382
	}
383
}
384
385
func (s *BulkCheckPublisher) ProcessResult(results []base.CheckResult) {}
386