Passed
Pull Request — master (#1349)
by
unknown
02:37
created

invoke.NewDirectInvoker   A

Complexity

Conditions 1

Size

Total Lines 24
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 23
nop 6
dl 0
loc 24
rs 9.328
c 0
b 0
f 0
1
package invoke
2
3
import (
4
	"context"
5
	"time"
6
7
	"go.opentelemetry.io/otel"
8
	"go.opentelemetry.io/otel/attribute"
9
	otelCodes "go.opentelemetry.io/otel/codes"
10
	api "go.opentelemetry.io/otel/metric"
11
	"go.opentelemetry.io/otel/trace"
12
13
	"github.com/Permify/permify/internal/storage"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
	"github.com/Permify/permify/pkg/telemetry"
16
	"github.com/Permify/permify/pkg/token"
17
	"github.com/Permify/permify/pkg/tuple"
18
)
19
20
var (
21
	tracer = otel.Tracer("invoke")
22
	meter  = otel.Meter("invoke")
23
)
24
25
// Invoker is an interface that groups multiple permission-related interfaces.
26
// It is used to define a common contract for invoking various permission operations.
27
type Invoker interface {
28
	Check
29
	Expand
30
	Lookup
31
	SubjectPermission
32
}
33
34
// Check is an interface that defines a method for checking permissions.
35
// It requires an implementation of InvokeCheck that takes a context and a PermissionCheckRequest,
36
// and returns a PermissionCheckResponse and an error if any.
37
type Check interface {
38
	Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error)
39
}
40
41
// Expand is an interface that defines a method for expanding permissions.
42
// It requires an implementation of InvokeExpand that takes a context and a PermissionExpandRequest,
43
// and returns a PermissionExpandResponse and an error if any.
44
type Expand interface {
45
	Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error)
46
}
47
48
type Lookup interface {
49
	LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error)
50
	LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error)
51
	LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error)
52
}
53
54
// SubjectPermission -
55
type SubjectPermission interface {
56
	SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error)
57
}
58
59
// DirectInvoker is a struct that implements the Invoker interface.
60
// It holds references to various engines needed for permission-related operations.
61
type DirectInvoker struct {
62
	// schemaReader is responsible for reading schema information
63
	schemaReader storage.SchemaReader
64
	// relationshipReader is responsible for reading relationship information
65
	dataReader storage.DataReader
66
	// Check engine for permission checks
67
	cc Check
68
	// Expand engine for expanding permissions
69
	ec Expand
70
	// LookupEntity engine for looking up entities with permissions
71
	lo Lookup
72
	// LookupSubject
73
	sp SubjectPermission
74
75
	// Metrics
76
	checkCounter             api.Int64Counter
77
	lookupEntityCounter      api.Int64Counter
78
	lookupSubjectCounter     api.Int64Counter
79
	subjectPermissionCounter api.Int64Counter
80
81
	checkDurationHistogram             api.Int64Histogram
82
	lookupEntityDurationHistogram      api.Int64Histogram
83
	lookupSubjectDurationHistogram     api.Int64Histogram
84
	subjectPermissionDurationHistogram api.Int64Histogram
85
}
86
87
// NewDirectInvoker is a constructor for DirectInvoker.
88
// It takes pointers to CheckEngine, ExpandEngine, LookupSchemaEngine, and LookupEntityEngine as arguments
89
// and returns an Invoker instance.
90
func NewDirectInvoker(
91
	schemaReader storage.SchemaReader,
92
	dataReader storage.DataReader,
93
	cc Check,
94
	ec Expand,
95
	lo Lookup,
96
	sp SubjectPermission,
97
) *DirectInvoker {
98
99
	return &DirectInvoker{
100
		schemaReader:                       schemaReader,
101
		dataReader:                         dataReader,
102
		cc:                                 cc,
103
		ec:                                 ec,
104
		lo:                                 lo,
105
		sp:                                 sp,
106
		checkCounter:                       telemetry.NewCounter(meter, "check_count", "Number of permission checks performed"),
107
		lookupEntityCounter:                telemetry.NewCounter(meter, "lookup_entity_count", "Number of permission lookup entity performed"),
108
		lookupSubjectCounter:               telemetry.NewCounter(meter, "lookup_subject_count", "Number of permission lookup subject performed"),
109
		subjectPermissionCounter:           telemetry.NewCounter(meter, "subject_permission_count", "Number of subject permission performed"),
110
		checkDurationHistogram:             telemetry.NewHistogram(meter, "check_duration", "microseconds", "Duration of checks in microseconds"),
111
		lookupEntityDurationHistogram:      telemetry.NewHistogram(meter, "lookup_entity_duration", "microseconds", "Duration of lookup entity duration in microseconds"),
112
		lookupSubjectDurationHistogram:     telemetry.NewHistogram(meter, "lookup_subject_duration", "microseconds", "Duration of lookup subject duration in microseconds"),
113
		subjectPermissionDurationHistogram: telemetry.NewHistogram(meter, "subject_permission_duration", "microseconds", "Duration of subject permission duration in microseconds"),
114
	}
115
}
116
117
// Check is a method that implements the Check interface.
118
// It calls the Run method of the CheckEngine with the provided context and PermissionCheckRequest,
119
// and returns a PermissionCheckResponse and an error if any.
120
func (invoker *DirectInvoker) Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error) {
121
	ctx, span := tracer.Start(ctx, "check", trace.WithAttributes(
122
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
123
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
124
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
125
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
126
	))
127
	defer span.End()
128
129
	start := time.Now()
130
131
	// Validate the depth of the request.
132
	err = checkDepth(request)
133
	if err != nil {
134
		span.RecordError(err)
135
		span.SetStatus(otelCodes.Error, err.Error())
136
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
137
		return &base.PermissionCheckResponse{
138
			Can: base.CheckResult_CHECK_RESULT_DENIED,
139
			Metadata: &base.PermissionCheckResponseMetadata{
140
				CheckCount: 0,
141
			},
142
		}, err
143
	}
144
145
	// Set the SnapToken if it's not provided in the request.
146
	if request.GetMetadata().GetSnapToken() == "" {
147
		var st token.SnapToken
148
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
149
		if err != nil {
150
			span.RecordError(err)
151
			span.SetStatus(otelCodes.Error, err.Error())
152
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
153
			return &base.PermissionCheckResponse{
154
				Can: base.CheckResult_CHECK_RESULT_DENIED,
155
				Metadata: &base.PermissionCheckResponseMetadata{
156
					CheckCount: 0,
157
				},
158
			}, err
159
		}
160
		request.Metadata.SnapToken = st.Encode().String()
161
	}
162
163
	// Set the SchemaVersion if it's not provided in the request.
164
	if request.GetMetadata().GetSchemaVersion() == "" {
165
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
166
		if err != nil {
167
			span.RecordError(err)
168
			span.SetStatus(otelCodes.Error, err.Error())
169
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
170
			return &base.PermissionCheckResponse{
171
				Can: base.CheckResult_CHECK_RESULT_DENIED,
172
				Metadata: &base.PermissionCheckResponseMetadata{
173
					CheckCount: 0,
174
				},
175
			}, err
176
		}
177
	}
178
179
	// Decrease the depth of the request metadata.
180
	request.Metadata = decreaseDepth(request.GetMetadata())
181
182
	// Perform the actual permission check using the provided request.
183
	response, err = invoker.cc.Check(ctx, request)
184
	if err != nil {
185
		span.RecordError(err)
186
		span.SetStatus(otelCodes.Error, err.Error())
187
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
188
		return &base.PermissionCheckResponse{
189
			Can: base.CheckResult_CHECK_RESULT_DENIED,
190
			Metadata: &base.PermissionCheckResponseMetadata{
191
				CheckCount: 0,
192
			},
193
		}, err
194
	}
195
	duration := time.Now().Sub(start)
196
	invoker.checkDurationHistogram.Record(ctx, duration.Microseconds())
197
198
	// Increase the check count in the response metadata.
199
	response.Metadata = increaseCheckCount(response.Metadata)
200
201
	// Increase the check count in the metrics.
202
	invoker.checkCounter.Add(ctx, 1)
203
204
	span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(response.GetCan().String())})
205
	return
206
}
207
208
// Expand is a method that implements the Expand interface.
209
// It calls the Run method of the ExpandEngine with the provided context and PermissionExpandRequest,
210
// and returns a PermissionExpandResponse and an error if any.
211
func (invoker *DirectInvoker) Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error) {
212
	ctx, span := tracer.Start(ctx, "expand", trace.WithAttributes(
213
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
214
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
215
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
216
	))
217
	defer span.End()
218
219
	if request.GetMetadata().GetSnapToken() == "" {
220
		var st token.SnapToken
221
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
222
		if err != nil {
223
			span.RecordError(err)
224
			span.SetStatus(otelCodes.Error, err.Error())
225
			return response, err
226
		}
227
		request.Metadata.SnapToken = st.Encode().String()
228
	}
229
230
	if request.GetMetadata().GetSchemaVersion() == "" {
231
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
232
		if err != nil {
233
			span.RecordError(err)
234
			span.SetStatus(otelCodes.Error, err.Error())
235
			return response, err
236
		}
237
	}
238
239
	return invoker.ec.Expand(ctx, request)
240
}
241
242
// LookupEntity is a method that implements the LookupEntity interface.
243
// It calls the Run method of the LookupEntityEngine with the provided context and PermissionLookupEntityRequest,
244
// and returns a PermissionLookupEntityResponse and an error if any.
245
func (invoker *DirectInvoker) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
246
	ctx, span := tracer.Start(ctx, "lookup-entity", trace.WithAttributes(
247
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
248
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
249
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
250
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
251
	))
252
	defer span.End()
253
254
	start := time.Now()
255
256
	// Set SnapToken if not provided
257
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
258
		var st token.SnapToken
259
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
260
		if err != nil {
261
			span.RecordError(err)
262
			span.SetStatus(otelCodes.Error, err.Error())
263
			return response, err
264
		}
265
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
266
	}
267
268
	// Set SchemaVersion if not provided
269
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
270
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
271
		if err != nil {
272
			span.RecordError(err)
273
			span.SetStatus(otelCodes.Error, err.Error())
274
			return response, err
275
		}
276
	}
277
278
	resp, err := invoker.lo.LookupEntity(ctx, request)
279
280
	duration := time.Now().Sub(start)
281
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
282
283
	// Increase the lookup entity count in the metrics.
284
	invoker.lookupEntityCounter.Add(ctx, 1)
285
286
	return resp, err
287
}
288
289
// LookupEntityStream is a method that implements the LookupEntityStream interface.
290
// It calls the Stream method of the LookupEntityEngine with the provided context, PermissionLookupEntityRequest, and Permission_LookupEntityStreamServer,
291
// and returns an error if any.
292
func (invoker *DirectInvoker) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
293
	ctx, span := tracer.Start(ctx, "lookup-entity-stream", trace.WithAttributes(
294
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
295
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
296
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
297
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
298
	))
299
	defer span.End()
300
301
	start := time.Now()
302
303
	// Set SnapToken if not provided
304
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
305
		var st token.SnapToken
306
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
307
		if err != nil {
308
			span.RecordError(err)
309
			span.SetStatus(otelCodes.Error, err.Error())
310
			return err
311
		}
312
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
313
	}
314
315
	// Set SchemaVersion if not provided
316
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
317
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
318
		if err != nil {
319
			span.RecordError(err)
320
			span.SetStatus(otelCodes.Error, err.Error())
321
			return err
322
		}
323
	}
324
325
	resp := invoker.lo.LookupEntityStream(ctx, request, server)
326
327
	duration := time.Now().Sub(start)
328
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
329
330
	// Increase the lookup entity count in the metrics.
331
	invoker.lookupEntityCounter.Add(ctx, 1)
332
333
	return resp
334
}
335
336
// LookupSubject is a method of the DirectInvoker structure. It handles the task of looking up subjects
337
// and returning the results in a response.
338
func (invoker *DirectInvoker) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
339
	ctx, span := tracer.Start(ctx, "lookup-subject", trace.WithAttributes(
340
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
341
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
342
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
343
		attribute.KeyValue{Key: "subject_reference", Value: attribute.StringValue(tuple.ReferenceToString(request.GetSubjectReference()))},
344
	))
345
	defer span.End()
346
347
	start := time.Now()
348
349
	// Check if the request has a SnapToken. If not, a SnapToken is set.
350
	if request.GetMetadata().GetSnapToken() == "" {
351
		// Create an instance of SnapToken
352
		var st token.SnapToken
353
		// Retrieve the head snapshot from the relationship reader
354
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
355
		// If there's an error retrieving the snapshot, return the response and the error
356
		if err != nil {
357
			span.RecordError(err)
358
			span.SetStatus(otelCodes.Error, err.Error())
359
			return response, err
360
		}
361
		// Set the SnapToken in the request metadata
362
		request.Metadata.SnapToken = st.Encode().String()
363
	}
364
365
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
366
	if request.GetMetadata().GetSchemaVersion() == "" {
367
		// Retrieve the head schema version from the schema reader
368
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
369
		// If there's an error retrieving the schema version, return the response and the error
370
		if err != nil {
371
			span.RecordError(err)
372
			span.SetStatus(otelCodes.Error, err.Error())
373
			return response, err
374
		}
375
	}
376
377
	resp, err := invoker.lo.LookupSubject(ctx, request)
378
379
	duration := time.Now().Sub(start)
380
	invoker.lookupSubjectDurationHistogram.Record(ctx, duration.Microseconds())
381
382
	// Increase the lookup subject count in the metrics.
383
	invoker.lookupSubjectCounter.Add(ctx, 1)
384
385
	// Call the LookupSubject function of the ls field in the invoker, pass the context and request,
386
	// and return its response and error
387
	return resp, err
388
}
389
390
// SubjectPermission is a method of the DirectInvoker structure. It handles the task of subject's permissions
391
// and returning the results in a response.
392
func (invoker *DirectInvoker) SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error) {
393
	ctx, span := tracer.Start(ctx, "subject-permission", trace.WithAttributes(
394
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
395
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
396
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
397
	))
398
	defer span.End()
399
400
	start := time.Now()
401
402
	// Check if the request has a SnapToken. If not, a SnapToken is set.
403
	if request.GetMetadata().GetSnapToken() == "" {
404
		// Create an instance of SnapToken
405
		var st token.SnapToken
406
		// Retrieve the head snapshot from the relationship reader
407
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
408
		// If there's an error retrieving the snapshot, return the response and the error
409
		if err != nil {
410
			span.RecordError(err)
411
			span.SetStatus(otelCodes.Error, err.Error())
412
			return response, err
413
		}
414
		// Set the SnapToken in the request metadata
415
		request.Metadata.SnapToken = st.Encode().String()
416
	}
417
418
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
419
	if request.GetMetadata().GetSchemaVersion() == "" {
420
		// Retrieve the head schema version from the schema reader
421
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
422
		// If there's an error retrieving the schema version, return the response and the error
423
		if err != nil {
424
			span.RecordError(err)
425
			span.SetStatus(otelCodes.Error, err.Error())
426
			return response, err
427
		}
428
	}
429
	resp, err := invoker.sp.SubjectPermission(ctx, request)
430
431
	duration := time.Now().Sub(start)
432
	invoker.subjectPermissionDurationHistogram.Record(ctx, duration.Microseconds())
433
434
	// Increase the subject permission count in the metrics.
435
	invoker.subjectPermissionCounter.Add(ctx, 1)
436
437
	// Call the SubjectPermission function of the ls field in the invoker, pass the context and request,
438
	// and return its response and error
439
	return resp, err
440
}
441