Passed
Pull Request — master (#1470)
by Tolga
02:39
created

engines.*BulkChecker.Stop   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 0
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
package engines
2
3
import (
4
	"context"
5
	"sort"
6
	"sync"
7
	"sync/atomic"
8
9
	"golang.org/x/sync/errgroup"
10
	"golang.org/x/sync/semaphore"
11
12
	"github.com/Permify/permify/internal/invoke"
13
	"github.com/Permify/permify/internal/storage/memory/utils"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
type BulkCheckerType string
18
19
const (
20
	BULK_SUBJECT BulkCheckerType = "subject"
21
	BULK_ENTITY  BulkCheckerType = "entity"
22
)
23
24
// BulkCheckerRequest is a struct for a permission check request and the channel to send the result.
25
type BulkCheckerRequest struct {
26
	Request *base.PermissionCheckRequest
27
	Result  base.CheckResult
28
}
29
30
// BulkChecker is a struct for checking permissions in bulk.
31
// It processes permission check requests concurrently and maintains a sorted list of these requests.
32
type BulkChecker struct {
33
	// typ defines the type of bulk checking being performed.
34
	// It distinguishes between different modes of operation within the BulkChecker,
35
	// such as whether the check is focused on entities, subjects, or another criterion.
36
	typ BulkCheckerType
37
38
	checker invoke.Check
39
	// RequestChan is the input queue for permission check requests.
40
	// Incoming requests are received on this channel and processed by the BulkChecker.
41
	RequestChan chan BulkCheckerRequest
42
43
	// ctx is the context used to manage goroutines and handle cancellation.
44
	// It allows for graceful shutdown of all goroutines when the context is canceled.
45
	ctx context.Context
46
47
	// g is an errgroup used for managing multiple goroutines.
48
	// It allows BulkChecker to wait for all goroutines to finish and to capture any errors they may return.
49
	g *errgroup.Group
50
51
	// concurrencyLimit is the maximum number of concurrent permission checks that can be processed at one time.
52
	// It controls the level of parallelism within the BulkChecker.
53
	concurrencyLimit int
54
55
	// callback is a function that handles the result of each permission check.
56
	// It is called with the entity ID and the result of the permission check (e.g., allowed or denied).
57
	callback func(entityID, ct string)
58
59
	// sortedList is a slice that stores BulkCheckerRequest objects.
60
	// This list is maintained in a sorted order based on some criteria, such as the entity ID.
61
	list []BulkCheckerRequest
62
63
	// mu is a mutex used for thread-safe access to the sortedList.
64
	// It ensures that only one goroutine can modify the sortedList at a time, preventing data races.
65
	mu sync.Mutex
66
67
	// wg is a WaitGroup used to coordinate the completion of request collection.
68
	// It ensures that all requests are collected and processed before ExecuteRequests begins execution.
69
	// The WaitGroup helps to synchronize the collection of requests with the execution of those requests,
70
	// preventing race conditions where the execution starts before all requests are collected.
71
	wg *sync.WaitGroup
72
}
73
74
// NewBulkChecker creates a new BulkChecker instance.
75
// ctx: context for managing goroutines and cancellation
76
// engine: the CheckEngine to use for permission checks
77
// callback: a callback function that handles the result of each permission check
78
// concurrencyLimit: the maximum number of concurrent permission checks
79
func NewBulkChecker(ctx context.Context, checker invoke.Check, typ BulkCheckerType, callback func(entityID, ct string), concurrencyLimit int) *BulkChecker {
80
	bc := &BulkChecker{
81
		RequestChan:      make(chan BulkCheckerRequest),
82
		checker:          checker,
83
		g:                &errgroup.Group{},
84
		concurrencyLimit: concurrencyLimit,
85
		ctx:              ctx,
86
		callback:         callback,
87
		typ:              typ,
88
		wg:               &sync.WaitGroup{},
89
	}
90
91
	// Start processing requests in a separate goroutine
92
	// Use a WaitGroup to ensure all requests are collected before proceeding
93
	var wg sync.WaitGroup
94
	wg.Add(1)
95
	go func() {
96
		defer wg.Done() // Signal that the request collection is finished
97
		bc.CollectAndSortRequests()
98
	}()
99
100
	bc.wg = &wg // Store the WaitGroup for future use
101
102
	return bc
103
}
104
105
// CollectAndSortRequests processes incoming requests and maintains a sorted list.
106
func (bc *BulkChecker) CollectAndSortRequests() {
107
	for {
108
		select {
109
		case req, ok := <-bc.RequestChan:
110
			if !ok {
111
				// Channel closed, process remaining requests
112
				return
113
			}
114
115
			bc.mu.Lock()
116
			bc.list = append(bc.list, req)
117
			// Optionally, you could sort here or later in ExecuteRequests
118
			bc.mu.Unlock()
119
120
		case <-bc.ctx.Done():
121
			return
122
		}
123
	}
124
}
125
126
// Signal to stop collecting requests and close the channel
127
func (bc *BulkChecker) StopCollectingRequests() {
128
	bc.mu.Lock()
129
	defer bc.mu.Unlock()
130
	// Close the channel to signal no more requests will be sent
131
	close(bc.RequestChan)
132
}
133
134
// sortRequests sorts the sortedList based on the type (entity or subject).
135
func (bc *BulkChecker) sortRequests() {
136
	if bc.typ == BULK_ENTITY {
137
		sort.Slice(bc.list, func(i, j int) bool {
138
			return bc.list[i].Request.GetEntity().GetId() < bc.list[j].Request.GetEntity().GetId()
139
		})
140
	} else if bc.typ == BULK_SUBJECT {
141
		sort.Slice(bc.list, func(i, j int) bool {
142
			return bc.list[i].Request.GetSubject().GetId() < bc.list[j].Request.GetSubject().GetId()
143
		})
144
	}
145
}
146
147
// ExecuteRequests begins processing permission check requests from the sorted list.
148
func (c *BulkChecker) ExecuteRequests(size uint32) error {
149
	// Stop collecting new requests and close the RequestChan to ensure no more requests are added
150
	c.StopCollectingRequests()
151
152
	// Wait for request collection to complete before proceeding
153
	c.wg.Wait()
154
155
	// Track the number of successful permission checks
156
	successCount := int64(0)
157
	// Semaphore to control the maximum number of concurrent permission checks
158
	sem := semaphore.NewWeighted(int64(c.concurrencyLimit))
159
	var mu sync.Mutex
160
161
	// Lock the mutex to prevent race conditions while sorting and copying the list of requests
162
	c.mu.Lock()
163
	c.sortRequests()                                      // Sort requests based on id
164
	listCopy := append([]BulkCheckerRequest{}, c.list...) // Create a copy of the list to avoid modifying the original during processing
165
	c.mu.Unlock()                                         // Unlock the mutex after sorting and copying
166
167
	// Pre-allocate a slice to store the results of the permission checks
168
	results := make([]base.CheckResult, len(listCopy))
169
	// Track the index of the last processed request to ensure results are processed in order
170
	processedIndex := 0
171
172
	// Loop through each request in the copied list
173
	for i, currentRequest := range listCopy {
174
		// If we've reached the success limit, stop processing further requests
175
		if atomic.LoadInt64(&successCount) >= int64(size) {
176
			break
177
		}
178
179
		index := i
180
		req := currentRequest
181
182
		// Use errgroup to manage the goroutines, which allows for error handling and synchronization
183
		c.g.Go(func() error {
184
			// Acquire a slot in the semaphore to control concurrency
185
			if err := sem.Acquire(c.ctx, 1); err != nil {
186
				return err // Return an error if semaphore acquisition fails
187
			}
188
			defer sem.Release(1) // Ensure the semaphore slot is released after processing
189
190
			var result base.CheckResult
191
			if req.Result == base.CheckResult_CHECK_RESULT_UNSPECIFIED {
192
				// Perform the permission check if the result is not already specified
193
				cr, err := c.checker.Check(c.ctx, req.Request)
194
				if err != nil {
195
					return err // Return an error if the check fails
196
				}
197
				result = cr.GetCan() // Get the result from the check
198
			} else {
199
				// Use the already specified result
200
				result = req.Result
201
			}
202
203
			// Lock the mutex to safely update shared resources
204
			mu.Lock()
205
			results[index] = result // Store the result in the pre-allocated slice
206
207
			// Process the results in order, starting from the current processed index
208
			for processedIndex < len(listCopy) && results[processedIndex] != base.CheckResult_CHECK_RESULT_UNSPECIFIED {
209
				// If the result at the processed index is allowed, call the callback function
210
				if results[processedIndex] == base.CheckResult_CHECK_RESULT_ALLOWED {
211
					if atomic.AddInt64(&successCount, 1) <= int64(size) {
212
						ct := ""
213
						if processedIndex+1 < len(listCopy) {
214
							// If there is a next item, create a continuous token with the next ID
215
							if c.typ == BULK_ENTITY {
216
								ct = utils.NewContinuousToken(listCopy[processedIndex+1].Request.GetEntity().GetId()).Encode().String()
217
							} else if c.typ == BULK_SUBJECT {
218
								ct = utils.NewContinuousToken(listCopy[processedIndex+1].Request.GetSubject().GetId()).Encode().String()
219
							}
220
						}
221
						// Depending on the type of check (entity or subject), call the appropriate callback
222
						if c.typ == BULK_ENTITY {
223
							c.callback(listCopy[processedIndex].Request.GetEntity().GetId(), ct)
224
						} else if c.typ == BULK_SUBJECT {
225
							c.callback(listCopy[processedIndex].Request.GetSubject().GetId(), ct)
226
						}
227
					}
228
				}
229
				processedIndex++ // Move to the next index for processing
230
			}
231
			mu.Unlock() // Unlock the mutex after updating the results and processed index
232
233
			return nil // Return nil to indicate successful processing
234
		})
235
	}
236
237
	// Wait for all goroutines to complete and check for any errors
238
	if err := c.g.Wait(); err != nil {
239
		return err // Return the error if any goroutine returned an error
240
	}
241
242
	return nil // Return nil if all processing completed successfully
243
}
244
245
// BulkEntityPublisher is a struct for streaming permission check results.
246
type BulkEntityPublisher struct {
247
	bulkChecker *BulkChecker
248
249
	request *base.PermissionLookupEntityRequest
250
	// context to manage goroutines and cancellation
251
	ctx context.Context
252
}
253
254
// NewBulkEntityPublisher creates a new BulkStreamer instance.
255
func NewBulkEntityPublisher(ctx context.Context, request *base.PermissionLookupEntityRequest, bulkChecker *BulkChecker) *BulkEntityPublisher {
256
	return &BulkEntityPublisher{
257
		bulkChecker: bulkChecker,
258
		request:     request,
259
		ctx:         ctx,
260
	}
261
}
262
263
// Publish publishes a permission check request to the BulkChecker.
264
func (s *BulkEntityPublisher) Publish(entity *base.Entity, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
265
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
266
		Request: &base.PermissionCheckRequest{
267
			TenantId:   s.request.GetTenantId(),
268
			Metadata:   metadata,
269
			Entity:     entity,
270
			Permission: s.request.GetPermission(),
271
			Subject:    s.request.GetSubject(),
272
			Context:    context,
273
		},
274
		Result: result,
275
	}
276
}
277
278
// BulkSubjectPublisher is a struct for streaming permission check results.
279
type BulkSubjectPublisher struct {
280
	bulkChecker *BulkChecker
281
282
	request *base.PermissionLookupSubjectRequest
283
	// context to manage goroutines and cancellation
284
	ctx context.Context
285
}
286
287
// NewBulkSubjectPublisher creates a new BulkStreamer instance.
288
func NewBulkSubjectPublisher(ctx context.Context, request *base.PermissionLookupSubjectRequest, bulkChecker *BulkChecker) *BulkSubjectPublisher {
289
	return &BulkSubjectPublisher{
290
		bulkChecker: bulkChecker,
291
		request:     request,
292
		ctx:         ctx,
293
	}
294
}
295
296
// Publish publishes a permission check request to the BulkChecker.
297
func (s *BulkSubjectPublisher) Publish(subject *base.Subject, metadata *base.PermissionCheckRequestMetadata, context *base.Context, result base.CheckResult) {
298
	s.bulkChecker.RequestChan <- BulkCheckerRequest{
299
		Request: &base.PermissionCheckRequest{
300
			TenantId:   s.request.GetTenantId(),
301
			Metadata:   metadata,
302
			Entity:     s.request.GetEntity(),
303
			Permission: s.request.GetPermission(),
304
			Subject:    subject,
305
			Context:    context,
306
		},
307
		Result: result,
308
	}
309
}
310