Passed
Pull Request — master (#1681)
by
unknown
04:00
created

invoke.*DirectInvoker.BulkCheck   A

Complexity

Conditions 3

Size

Total Lines 29
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 18
nop 2
dl 0
loc 29
rs 9.5
c 0
b 0
f 0
1
package invoke
2
3
import (
4
	"context"
5
	"fmt"
6
	"time"
7
8
	"go.opentelemetry.io/otel"
9
	"go.opentelemetry.io/otel/attribute"
10
	otelCodes "go.opentelemetry.io/otel/codes"
11
	api "go.opentelemetry.io/otel/metric"
12
	"go.opentelemetry.io/otel/trace"
13
14
	"github.com/Permify/permify/internal/storage"
15
	base "github.com/Permify/permify/pkg/pb/base/v1"
16
	"github.com/Permify/permify/pkg/telemetry"
17
	"github.com/Permify/permify/pkg/token"
18
	"github.com/Permify/permify/pkg/tuple"
19
)
20
21
var (
22
	tracer = otel.Tracer("invoke")
23
	meter  = otel.Meter("invoke")
24
)
25
26
// Invoker is an interface that groups multiple permission-related interfaces.
27
// It is used to define a common contract for invoking various permission operations.
28
type Invoker interface {
29
	Check
30
	Expand
31
	Lookup
32
	SubjectPermission
33
}
34
35
// Check is an interface that defines a method for checking permissions.
36
// It requires an implementation of InvokeCheck that takes a context and a PermissionCheckRequest,
37
// and returns a PermissionCheckResponse and an error if any.
38
type Check interface {
39
	Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error)
40
	BulkCheck(ctx context.Context, request *base.BulkPermissionCheckRequest) (response *base.BulkPermissionCheckResponse, err error)
41
}
42
43
// Expand is an interface that defines a method for expanding permissions.
44
// It requires an implementation of InvokeExpand that takes a context and a PermissionExpandRequest,
45
// and returns a PermissionExpandResponse and an error if any.
46
type Expand interface {
47
	Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error)
48
}
49
50
type Lookup interface {
51
	LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error)
52
	LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error)
53
	LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error)
54
}
55
56
// SubjectPermission -
57
type SubjectPermission interface {
58
	SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error)
59
}
60
61
// DirectInvoker is a struct that implements the Invoker interface.
62
// It holds references to various engines needed for permission-related operations.
63
type DirectInvoker struct {
64
	// schemaReader is responsible for reading schema information
65
	schemaReader storage.SchemaReader
66
	// relationshipReader is responsible for reading relationship information
67
	dataReader storage.DataReader
68
	// Check engine for permission checks
69
	cc Check
70
	// Expand engine for expanding permissions
71
	ec Expand
72
	// LookupEntity engine for looking up entities with permissions
73
	lo Lookup
74
	// LookupSubject
75
	sp SubjectPermission
76
77
	// Metrics
78
	checkCounter             api.Int64Counter
79
	lookupEntityCounter      api.Int64Counter
80
	lookupSubjectCounter     api.Int64Counter
81
	subjectPermissionCounter api.Int64Counter
82
83
	checkDurationHistogram             api.Int64Histogram
84
	lookupEntityDurationHistogram      api.Int64Histogram
85
	lookupSubjectDurationHistogram     api.Int64Histogram
86
	subjectPermissionDurationHistogram api.Int64Histogram
87
}
88
89
// NewDirectInvoker is a constructor for DirectInvoker.
90
// It takes pointers to CheckEngine, ExpandEngine, LookupSchemaEngine, and LookupEntityEngine as arguments
91
// and returns an Invoker instance.
92
func NewDirectInvoker(
93
	schemaReader storage.SchemaReader,
94
	dataReader storage.DataReader,
95
	cc Check,
96
	ec Expand,
97
	lo Lookup,
98
	sp SubjectPermission,
99
) *DirectInvoker {
100
	return &DirectInvoker{
101
		schemaReader:                       schemaReader,
102
		dataReader:                         dataReader,
103
		cc:                                 cc,
104
		ec:                                 ec,
105
		lo:                                 lo,
106
		sp:                                 sp,
107
		checkCounter:                       telemetry.NewCounter(meter, "check_count", "Number of permission checks performed"),
108
		lookupEntityCounter:                telemetry.NewCounter(meter, "lookup_entity_count", "Number of permission lookup entity performed"),
109
		lookupSubjectCounter:               telemetry.NewCounter(meter, "lookup_subject_count", "Number of permission lookup subject performed"),
110
		subjectPermissionCounter:           telemetry.NewCounter(meter, "subject_permission_count", "Number of subject permission performed"),
111
		checkDurationHistogram:             telemetry.NewHistogram(meter, "check_duration", "microseconds", "Duration of checks in microseconds"),
112
		lookupEntityDurationHistogram:      telemetry.NewHistogram(meter, "lookup_entity_duration", "microseconds", "Duration of lookup entity duration in microseconds"),
113
		lookupSubjectDurationHistogram:     telemetry.NewHistogram(meter, "lookup_subject_duration", "microseconds", "Duration of lookup subject duration in microseconds"),
114
		subjectPermissionDurationHistogram: telemetry.NewHistogram(meter, "subject_permission_duration", "microseconds", "Duration of subject permission duration in microseconds"),
115
	}
116
}
117
118
// Check is a method that implements the Check interface.
119
// It calls the Run method of the CheckEngine with the provided context and PermissionCheckRequest,
120
// and returns a PermissionCheckResponse and an error if any.
121
func (invoker *DirectInvoker) Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error) {
122
	ctx, span := tracer.Start(ctx, "check", trace.WithAttributes(
123
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
124
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
125
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
126
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
127
	))
128
	defer span.End()
129
130
	start := time.Now()
131
132
	// Validate the depth of the request.
133
	err = checkDepth(request)
134
	if err != nil {
135
		span.RecordError(err)
136
		span.SetStatus(otelCodes.Error, err.Error())
137
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
138
		return &base.PermissionCheckResponse{
139
			Can: base.CheckResult_CHECK_RESULT_DENIED,
140
			Metadata: &base.PermissionCheckResponseMetadata{
141
				CheckCount: 0,
142
			},
143
		}, err
144
	}
145
146
	// Set the SnapToken if it's not provided in the request.
147
	if request.GetMetadata().GetSnapToken() == "" {
148
		var st token.SnapToken
149
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
150
		if err != nil {
151
			span.RecordError(err)
152
			span.SetStatus(otelCodes.Error, err.Error())
153
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
154
			return &base.PermissionCheckResponse{
155
				Can: base.CheckResult_CHECK_RESULT_DENIED,
156
				Metadata: &base.PermissionCheckResponseMetadata{
157
					CheckCount: 0,
158
				},
159
			}, err
160
		}
161
		request.Metadata.SnapToken = st.Encode().String()
162
	}
163
164
	// Set the SchemaVersion if it's not provided in the request.
165
	if request.GetMetadata().GetSchemaVersion() == "" {
166
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
167
		if err != nil {
168
			span.RecordError(err)
169
			span.SetStatus(otelCodes.Error, err.Error())
170
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
171
			return &base.PermissionCheckResponse{
172
				Can: base.CheckResult_CHECK_RESULT_DENIED,
173
				Metadata: &base.PermissionCheckResponseMetadata{
174
					CheckCount: 0,
175
				},
176
			}, err
177
		}
178
	}
179
180
	// Decrease the depth of the request metadata.
181
	request.Metadata = decreaseDepth(request.GetMetadata())
182
183
	// Perform the actual permission check using the provided request.
184
	response, err = invoker.cc.Check(ctx, request)
185
	if err != nil {
186
		span.RecordError(err)
187
		span.SetStatus(otelCodes.Error, err.Error())
188
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
189
		return &base.PermissionCheckResponse{
190
			Can: base.CheckResult_CHECK_RESULT_DENIED,
191
			Metadata: &base.PermissionCheckResponseMetadata{
192
				CheckCount: 0,
193
			},
194
		}, err
195
	}
196
	duration := time.Since(start)
197
	invoker.checkDurationHistogram.Record(ctx, duration.Microseconds())
198
199
	// Increase the check count in the response metadata.
200
	response.Metadata = increaseCheckCount(response.Metadata)
201
202
	// Increase the check count in the metrics.
203
	invoker.checkCounter.Add(ctx, 1)
204
205
	span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(response.GetCan().String())})
206
	return
207
}
208
209
// BulkCheck implements Check.
210
func (invoker *DirectInvoker) BulkCheck(ctx context.Context, request *base.BulkPermissionCheckRequest) (response *base.BulkPermissionCheckResponse, err error) {
211
212
	ctx, span := tracer.Start(ctx, "bulk-check", trace.WithAttributes(
213
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
214
		attribute.KeyValue{Key: "no_checks", Value: attribute.IntValue(len(request.GetChecks()))},
215
	))
216
	defer span.End()
217
218
	start := time.Now()
219
220
	var schemaVersion string
221
	schemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
222
	fmt.Println(invoker.schemaReader.ReadSchema(ctx, request.GetTenantId(), schemaVersion))
223
	for i, check := range request.GetChecks() {
224
		if check.GetMetadata().GetSchemaVersion() == "" {
225
			request.Checks[i].Metadata.SchemaVersion = schemaVersion
226
		}
227
	}
228
229
	fmt.Println(request.GetChecks())
230
	resp, err := invoker.cc.BulkCheck(ctx, request)
231
232
	duration := time.Since(start)
233
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
234
235
	// Increase the lookup entity count in the metrics.
236
	invoker.checkCounter.Add(ctx, int64(len(request.GetChecks())))
237
238
	return resp, err
239
}
240
241
// Expand is a method that implements the Expand interface.
242
// It calls the Run method of the ExpandEngine with the provided context and PermissionExpandRequest,
243
// and returns a PermissionExpandResponse and an error if any.
244
func (invoker *DirectInvoker) Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error) {
245
	ctx, span := tracer.Start(ctx, "expand", trace.WithAttributes(
246
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
247
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
248
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
249
	))
250
	defer span.End()
251
252
	if request.GetMetadata().GetSnapToken() == "" {
253
		var st token.SnapToken
254
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
255
		if err != nil {
256
			span.RecordError(err)
257
			span.SetStatus(otelCodes.Error, err.Error())
258
			return response, err
259
		}
260
		request.Metadata.SnapToken = st.Encode().String()
261
	}
262
263
	if request.GetMetadata().GetSchemaVersion() == "" {
264
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
265
		if err != nil {
266
			span.RecordError(err)
267
			span.SetStatus(otelCodes.Error, err.Error())
268
			return response, err
269
		}
270
	}
271
272
	return invoker.ec.Expand(ctx, request)
273
}
274
275
// LookupEntity is a method that implements the LookupEntity interface.
276
// It calls the Run method of the LookupEntityEngine with the provided context and PermissionLookupEntityRequest,
277
// and returns a PermissionLookupEntityResponse and an error if any.
278
func (invoker *DirectInvoker) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
279
	ctx, span := tracer.Start(ctx, "lookup-entity", trace.WithAttributes(
280
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
281
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
282
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
283
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
284
	))
285
	defer span.End()
286
287
	start := time.Now()
288
289
	// Set SnapToken if not provided
290
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
291
		var st token.SnapToken
292
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
293
		if err != nil {
294
			span.RecordError(err)
295
			span.SetStatus(otelCodes.Error, err.Error())
296
			return response, err
297
		}
298
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
299
	}
300
301
	// Set SchemaVersion if not provided
302
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
303
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
304
		if err != nil {
305
			span.RecordError(err)
306
			span.SetStatus(otelCodes.Error, err.Error())
307
			return response, err
308
		}
309
	}
310
311
	resp, err := invoker.lo.LookupEntity(ctx, request)
312
313
	duration := time.Since(start)
314
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
315
316
	// Increase the lookup entity count in the metrics.
317
	invoker.lookupEntityCounter.Add(ctx, 1)
318
319
	return resp, err
320
}
321
322
// LookupEntityStream is a method that implements the LookupEntityStream interface.
323
// It calls the Stream method of the LookupEntityEngine with the provided context, PermissionLookupEntityRequest, and Permission_LookupEntityStreamServer,
324
// and returns an error if any.
325
func (invoker *DirectInvoker) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
326
	ctx, span := tracer.Start(ctx, "lookup-entity-stream", trace.WithAttributes(
327
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
328
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
329
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
330
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
331
	))
332
	defer span.End()
333
334
	start := time.Now()
335
336
	// Set SnapToken if not provided
337
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
338
		var st token.SnapToken
339
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
340
		if err != nil {
341
			span.RecordError(err)
342
			span.SetStatus(otelCodes.Error, err.Error())
343
			return err
344
		}
345
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
346
	}
347
348
	// Set SchemaVersion if not provided
349
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
350
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
351
		if err != nil {
352
			span.RecordError(err)
353
			span.SetStatus(otelCodes.Error, err.Error())
354
			return err
355
		}
356
	}
357
358
	resp := invoker.lo.LookupEntityStream(ctx, request, server)
359
360
	duration := time.Since(start)
361
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
362
363
	// Increase the lookup entity count in the metrics.
364
	invoker.lookupEntityCounter.Add(ctx, 1)
365
366
	return resp
367
}
368
369
// LookupSubject is a method of the DirectInvoker structure. It handles the task of looking up subjects
370
// and returning the results in a response.
371
func (invoker *DirectInvoker) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
372
	ctx, span := tracer.Start(ctx, "lookup-subject", trace.WithAttributes(
373
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
374
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
375
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
376
		attribute.KeyValue{Key: "subject_reference", Value: attribute.StringValue(tuple.ReferenceToString(request.GetSubjectReference()))},
377
	))
378
	defer span.End()
379
380
	start := time.Now()
381
382
	// Check if the request has a SnapToken. If not, a SnapToken is set.
383
	if request.GetMetadata().GetSnapToken() == "" {
384
		// Create an instance of SnapToken
385
		var st token.SnapToken
386
		// Retrieve the head snapshot from the relationship reader
387
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
388
		// If there's an error retrieving the snapshot, return the response and the error
389
		if err != nil {
390
			span.RecordError(err)
391
			span.SetStatus(otelCodes.Error, err.Error())
392
			return response, err
393
		}
394
		// Set the SnapToken in the request metadata
395
		request.Metadata.SnapToken = st.Encode().String()
396
	}
397
398
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
399
	if request.GetMetadata().GetSchemaVersion() == "" {
400
		// Retrieve the head schema version from the schema reader
401
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
402
		// If there's an error retrieving the schema version, return the response and the error
403
		if err != nil {
404
			span.RecordError(err)
405
			span.SetStatus(otelCodes.Error, err.Error())
406
			return response, err
407
		}
408
	}
409
410
	resp, err := invoker.lo.LookupSubject(ctx, request)
411
412
	duration := time.Now().Sub(start)
413
	invoker.lookupSubjectDurationHistogram.Record(ctx, duration.Microseconds())
414
415
	// Increase the lookup subject count in the metrics.
416
	invoker.lookupSubjectCounter.Add(ctx, 1)
417
418
	// Call the LookupSubject function of the ls field in the invoker, pass the context and request,
419
	// and return its response and error
420
	return resp, err
421
}
422
423
// SubjectPermission is a method of the DirectInvoker structure. It handles the task of subject's permissions
424
// and returning the results in a response.
425
func (invoker *DirectInvoker) SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error) {
426
	ctx, span := tracer.Start(ctx, "subject-permission", trace.WithAttributes(
427
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
428
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
429
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
430
	))
431
	defer span.End()
432
433
	start := time.Now()
434
435
	// Check if the request has a SnapToken. If not, a SnapToken is set.
436
	if request.GetMetadata().GetSnapToken() == "" {
437
		// Create an instance of SnapToken
438
		var st token.SnapToken
439
		// Retrieve the head snapshot from the relationship reader
440
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
441
		// If there's an error retrieving the snapshot, return the response and the error
442
		if err != nil {
443
			span.RecordError(err)
444
			span.SetStatus(otelCodes.Error, err.Error())
445
			return response, err
446
		}
447
		// Set the SnapToken in the request metadata
448
		request.Metadata.SnapToken = st.Encode().String()
449
	}
450
451
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
452
	if request.GetMetadata().GetSchemaVersion() == "" {
453
		// Retrieve the head schema version from the schema reader
454
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
455
		// If there's an error retrieving the schema version, return the response and the error
456
		if err != nil {
457
			span.RecordError(err)
458
			span.SetStatus(otelCodes.Error, err.Error())
459
			return response, err
460
		}
461
	}
462
	resp, err := invoker.sp.SubjectPermission(ctx, request)
463
464
	duration := time.Now().Sub(start)
465
	invoker.subjectPermissionDurationHistogram.Record(ctx, duration.Microseconds())
466
467
	// Increase the subject permission count in the metrics.
468
	invoker.subjectPermissionCounter.Add(ctx, 1)
469
470
	// Call the SubjectPermission function of the ls field in the invoker, pass the context and request,
471
	// and return its response and error
472
	return resp, err
473
}
474