Passed
Pull Request — master (#1691)
by
unknown
03:16
created

invoke.*DirectInvoker.LookupEntitiesStream   B

Complexity

Conditions 5

Size

Total Lines 42
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 3
dl 0
loc 42
rs 8.7653
c 0
b 0
f 0
1
package invoke
2
3
import (
4
	"context"
5
	"time"
6
7
	"go.opentelemetry.io/otel"
8
	"go.opentelemetry.io/otel/attribute"
9
	otelCodes "go.opentelemetry.io/otel/codes"
10
	api "go.opentelemetry.io/otel/metric"
11
	"go.opentelemetry.io/otel/trace"
12
13
	"github.com/Permify/permify/internal/storage"
14
	base "github.com/Permify/permify/pkg/pb/base/v1"
15
	"github.com/Permify/permify/pkg/telemetry"
16
	"github.com/Permify/permify/pkg/token"
17
	"github.com/Permify/permify/pkg/tuple"
18
)
19
20
var (
21
	tracer = otel.Tracer("invoke")
22
	meter  = otel.Meter("invoke")
23
)
24
25
// Invoker is an interface that groups multiple permission-related interfaces.
26
// It is used to define a common contract for invoking various permission operations.
27
type Invoker interface {
28
	Check
29
	Expand
30
	Lookup
31
	SubjectPermission
32
}
33
34
// Check is an interface that defines a method for checking permissions.
35
// It requires an implementation of InvokeCheck that takes a context and a PermissionCheckRequest,
36
// and returns a PermissionCheckResponse and an error if any.
37
type Check interface {
38
	Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error)
39
}
40
41
// Expand is an interface that defines a method for expanding permissions.
42
// It requires an implementation of InvokeExpand that takes a context and a PermissionExpandRequest,
43
// and returns a PermissionExpandResponse and an error if any.
44
type Expand interface {
45
	Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error)
46
}
47
48
type Lookup interface {
49
	LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error)
50
	LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error)
51
	LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error)
52
	LookupEntities(ctx context.Context, request *base.PermissionsLookupEntityRequest) (response *base.PermissionsLookupEntityResponse, err error)
53
	LookupEntitiesStream(ctx context.Context, request *base.PermissionsLookupEntityRequest, server base.Permission_LookupEntitiesStreamServer) (err error)
54
}
55
56
// SubjectPermission -
57
type SubjectPermission interface {
58
	SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error)
59
}
60
61
// DirectInvoker is a struct that implements the Invoker interface.
62
// It holds references to various engines needed for permission-related operations.
63
type DirectInvoker struct {
64
	// schemaReader is responsible for reading schema information
65
	schemaReader storage.SchemaReader
66
	// relationshipReader is responsible for reading relationship information
67
	dataReader storage.DataReader
68
	// Check engine for permission checks
69
	cc Check
70
	// Expand engine for expanding permissions
71
	ec Expand
72
	// LookupEntity engine for looking up entities with permissions
73
	lo Lookup
74
	// LookupSubject
75
	sp SubjectPermission
76
77
	// Metrics
78
	checkCounter             api.Int64Counter
79
	lookupEntityCounter      api.Int64Counter
80
	lookupSubjectCounter     api.Int64Counter
81
	subjectPermissionCounter api.Int64Counter
82
83
	checkDurationHistogram             api.Int64Histogram
84
	lookupEntityDurationHistogram      api.Int64Histogram
85
	lookupSubjectDurationHistogram     api.Int64Histogram
86
	subjectPermissionDurationHistogram api.Int64Histogram
87
}
88
89
// NewDirectInvoker is a constructor for DirectInvoker.
90
// It takes pointers to CheckEngine, ExpandEngine, LookupSchemaEngine, and LookupEntityEngine as arguments
91
// and returns an Invoker instance.
92
func NewDirectInvoker(
93
	schemaReader storage.SchemaReader,
94
	dataReader storage.DataReader,
95
	cc Check,
96
	ec Expand,
97
	lo Lookup,
98
	sp SubjectPermission,
99
) *DirectInvoker {
100
	return &DirectInvoker{
101
		schemaReader:                       schemaReader,
102
		dataReader:                         dataReader,
103
		cc:                                 cc,
104
		ec:                                 ec,
105
		lo:                                 lo,
106
		sp:                                 sp,
107
		checkCounter:                       telemetry.NewCounter(meter, "check_count", "Number of permission checks performed"),
108
		lookupEntityCounter:                telemetry.NewCounter(meter, "lookup_entity_count", "Number of permission lookup entity performed"),
109
		lookupSubjectCounter:               telemetry.NewCounter(meter, "lookup_subject_count", "Number of permission lookup subject performed"),
110
		subjectPermissionCounter:           telemetry.NewCounter(meter, "subject_permission_count", "Number of subject permission performed"),
111
		checkDurationHistogram:             telemetry.NewHistogram(meter, "check_duration", "microseconds", "Duration of checks in microseconds"),
112
		lookupEntityDurationHistogram:      telemetry.NewHistogram(meter, "lookup_entity_duration", "microseconds", "Duration of lookup entity duration in microseconds"),
113
		lookupSubjectDurationHistogram:     telemetry.NewHistogram(meter, "lookup_subject_duration", "microseconds", "Duration of lookup subject duration in microseconds"),
114
		subjectPermissionDurationHistogram: telemetry.NewHistogram(meter, "subject_permission_duration", "microseconds", "Duration of subject permission duration in microseconds"),
115
	}
116
}
117
118
// Check is a method that implements the Check interface.
119
// It calls the Run method of the CheckEngine with the provided context and PermissionCheckRequest,
120
// and returns a PermissionCheckResponse and an error if any.
121
func (invoker *DirectInvoker) Check(ctx context.Context, request *base.PermissionCheckRequest) (response *base.PermissionCheckResponse, err error) {
122
	ctx, span := tracer.Start(ctx, "check", trace.WithAttributes(
123
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
124
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
125
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
126
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
127
	))
128
	defer span.End()
129
130
	start := time.Now()
131
132
	// Validate the depth of the request.
133
	err = checkDepth(request)
134
	if err != nil {
135
		span.RecordError(err)
136
		span.SetStatus(otelCodes.Error, err.Error())
137
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
138
		return &base.PermissionCheckResponse{
139
			Can: base.CheckResult_CHECK_RESULT_DENIED,
140
			Metadata: &base.PermissionCheckResponseMetadata{
141
				CheckCount: 0,
142
			},
143
		}, err
144
	}
145
146
	// Set the SnapToken if it's not provided in the request.
147
	if request.GetMetadata().GetSnapToken() == "" {
148
		var st token.SnapToken
149
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
150
		if err != nil {
151
			span.RecordError(err)
152
			span.SetStatus(otelCodes.Error, err.Error())
153
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
154
			return &base.PermissionCheckResponse{
155
				Can: base.CheckResult_CHECK_RESULT_DENIED,
156
				Metadata: &base.PermissionCheckResponseMetadata{
157
					CheckCount: 0,
158
				},
159
			}, err
160
		}
161
		request.Metadata.SnapToken = st.Encode().String()
162
	}
163
164
	// Set the SchemaVersion if it's not provided in the request.
165
	if request.GetMetadata().GetSchemaVersion() == "" {
166
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
167
		if err != nil {
168
			span.RecordError(err)
169
			span.SetStatus(otelCodes.Error, err.Error())
170
			span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
171
			return &base.PermissionCheckResponse{
172
				Can: base.CheckResult_CHECK_RESULT_DENIED,
173
				Metadata: &base.PermissionCheckResponseMetadata{
174
					CheckCount: 0,
175
				},
176
			}, err
177
		}
178
	}
179
180
	// Decrease the depth of the request metadata.
181
	request.Metadata = decreaseDepth(request.GetMetadata())
182
183
	// Perform the actual permission check using the provided request.
184
	response, err = invoker.cc.Check(ctx, request)
185
	if err != nil {
186
		span.RecordError(err)
187
		span.SetStatus(otelCodes.Error, err.Error())
188
		span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(base.CheckResult_CHECK_RESULT_DENIED.String())})
189
		return &base.PermissionCheckResponse{
190
			Can: base.CheckResult_CHECK_RESULT_DENIED,
191
			Metadata: &base.PermissionCheckResponseMetadata{
192
				CheckCount: 0,
193
			},
194
		}, err
195
	}
196
	duration := time.Since(start)
197
	invoker.checkDurationHistogram.Record(ctx, duration.Microseconds())
198
199
	// Increase the check count in the response metadata.
200
	response.Metadata = increaseCheckCount(response.Metadata)
201
202
	// Increase the check count in the metrics.
203
	invoker.checkCounter.Add(ctx, 1)
204
205
	span.SetAttributes(attribute.KeyValue{Key: "can", Value: attribute.StringValue(response.GetCan().String())})
206
	return
207
}
208
209
// Expand is a method that implements the Expand interface.
210
// It calls the Run method of the ExpandEngine with the provided context and PermissionExpandRequest,
211
// and returns a PermissionExpandResponse and an error if any.
212
func (invoker *DirectInvoker) Expand(ctx context.Context, request *base.PermissionExpandRequest) (response *base.PermissionExpandResponse, err error) {
213
	ctx, span := tracer.Start(ctx, "expand", trace.WithAttributes(
214
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
215
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
216
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
217
	))
218
	defer span.End()
219
220
	if request.GetMetadata().GetSnapToken() == "" {
221
		var st token.SnapToken
222
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
223
		if err != nil {
224
			span.RecordError(err)
225
			span.SetStatus(otelCodes.Error, err.Error())
226
			return response, err
227
		}
228
		request.Metadata.SnapToken = st.Encode().String()
229
	}
230
231
	if request.GetMetadata().GetSchemaVersion() == "" {
232
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
233
		if err != nil {
234
			span.RecordError(err)
235
			span.SetStatus(otelCodes.Error, err.Error())
236
			return response, err
237
		}
238
	}
239
240
	return invoker.ec.Expand(ctx, request)
241
}
242
243
// LookupEntity is a method that implements the LookupEntity interface.
244
// It calls the Run method of the LookupEntityEngine with the provided context and PermissionLookupEntityRequest,
245
// and returns a PermissionLookupEntityResponse and an error if any.
246
func (invoker *DirectInvoker) LookupEntity(ctx context.Context, request *base.PermissionLookupEntityRequest) (response *base.PermissionLookupEntityResponse, err error) {
247
	ctx, span := tracer.Start(ctx, "lookup-entity", trace.WithAttributes(
248
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
249
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
250
		attribute.KeyValue{Key: "permissions", Value: attribute.StringValue(request.GetPermission())},
251
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
252
	))
253
	defer span.End()
254
255
	start := time.Now()
256
257
	// Set SnapToken if not provided
258
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
259
		var st token.SnapToken
260
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
261
		if err != nil {
262
			span.RecordError(err)
263
			span.SetStatus(otelCodes.Error, err.Error())
264
			return response, err
265
		}
266
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
267
	}
268
269
	// Set SchemaVersion if not provided
270
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
271
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
272
		if err != nil {
273
			span.RecordError(err)
274
			span.SetStatus(otelCodes.Error, err.Error())
275
			return response, err
276
		}
277
	}
278
279
	resp, err := invoker.lo.LookupEntity(ctx, request)
280
281
	duration := time.Since(start)
282
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
283
284
	// Increase the lookup entity count in the metrics.
285
	invoker.lookupEntityCounter.Add(ctx, 1)
286
287
	return resp, err
288
}
289
290
// LookupEntityStream is a method that implements the LookupEntityStream interface.
291
// It calls the Stream method of the LookupEntityEngine with the provided context, PermissionLookupEntityRequest, and Permission_LookupEntityStreamServer,
292
// and returns an error if any.
293
func (invoker *DirectInvoker) LookupEntityStream(ctx context.Context, request *base.PermissionLookupEntityRequest, server base.Permission_LookupEntityStreamServer) (err error) {
294
	ctx, span := tracer.Start(ctx, "lookup-entity-stream", trace.WithAttributes(
295
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
296
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
297
		attribute.KeyValue{Key: "permissions", Value: attribute.StringValue(request.GetPermission())},
298
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
299
	))
300
	defer span.End()
301
302
	start := time.Now()
303
304
	// Set SnapToken if not provided
305
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
306
		var st token.SnapToken
307
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
308
		if err != nil {
309
			span.RecordError(err)
310
			span.SetStatus(otelCodes.Error, err.Error())
311
			return err
312
		}
313
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
314
	}
315
316
	// Set SchemaVersion if not provided
317
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
318
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
319
		if err != nil {
320
			span.RecordError(err)
321
			span.SetStatus(otelCodes.Error, err.Error())
322
			return err
323
		}
324
	}
325
326
	resp := invoker.lo.LookupEntityStream(ctx, request, server)
327
328
	duration := time.Since(start)
329
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
330
331
	// Increase the lookup entity count in the metrics.
332
	invoker.lookupEntityCounter.Add(ctx, 1)
333
334
	return resp
335
}
336
337
// LookupEntities is a method that implements the LookupEntities interface.
338
// It calls the Run method of the LookupEntitiesEngine with the provided context and PermissionsLookupEntityRequest,
339
// and returns a PermissionsLookupEntityResponse and an error if any.
340
func (invoker *DirectInvoker) LookupEntities(ctx context.Context, request *base.PermissionsLookupEntityRequest) (response *base.PermissionsLookupEntityResponse, err error) {
341
	ctx, span := tracer.Start(ctx, "lookup-entities", trace.WithAttributes(
342
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
343
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
344
		attribute.KeyValue{Key: "permissions", Value: attribute.StringSliceValue(request.GetPermissions())},
345
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
346
	))
347
	defer span.End()
348
349
	start := time.Now()
350
351
	// Set SnapToken if not provided
352
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
353
		var st token.SnapToken
354
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
355
		if err != nil {
356
			span.RecordError(err)
357
			span.SetStatus(otelCodes.Error, err.Error())
358
			return response, err
359
		}
360
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
361
	}
362
363
	// Set SchemaVersion if not provided
364
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
365
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
366
		if err != nil {
367
			span.RecordError(err)
368
			span.SetStatus(otelCodes.Error, err.Error())
369
			return response, err
370
		}
371
	}
372
373
	resp, err := invoker.lo.LookupEntities(ctx, request)
374
375
	duration := time.Since(start)
376
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
377
378
	// Increase the lookup entity count in the metrics.
379
	invoker.lookupEntityCounter.Add(ctx, 1)
380
381
	return resp, err
382
}
383
384
// LookupEntitiesStream is a method that implements the LookupEntitiesStream interface.
385
// It calls the Stream method of the LookupEntitiesEngine with the provided context, PermissionsLookupEntityRequest, and Permission_LookupEntitiesStreamServer,
386
// and returns an error if any.
387
func (invoker *DirectInvoker) LookupEntitiesStream(ctx context.Context, request *base.PermissionsLookupEntityRequest, server base.Permission_LookupEntitiesStreamServer) (err error) {
388
	ctx, span := tracer.Start(ctx, "lookup-entities-stream", trace.WithAttributes(
389
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
390
		attribute.KeyValue{Key: "entity_type", Value: attribute.StringValue(request.GetEntityType())},
391
		attribute.KeyValue{Key: "permissions", Value: attribute.StringSliceValue(request.GetPermissions())},
392
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
393
	))
394
	defer span.End()
395
396
	start := time.Now()
397
398
	// Set SnapToken if not provided
399
	if request.GetMetadata().GetSnapToken() == "" { // Check if the request has a SnapToken.
400
		var st token.SnapToken
401
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId()) // Retrieve the head snapshot from the relationship reader.
402
		if err != nil {
403
			span.RecordError(err)
404
			span.SetStatus(otelCodes.Error, err.Error())
405
			return err
406
		}
407
		request.Metadata.SnapToken = st.Encode().String() // Set the SnapToken in the request metadata.
408
	}
409
410
	// Set SchemaVersion if not provided
411
	if request.GetMetadata().GetSchemaVersion() == "" { // Check if the request has a SchemaVersion.
412
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId()) // Retrieve the head schema version from the schema reader.
413
		if err != nil {
414
			span.RecordError(err)
415
			span.SetStatus(otelCodes.Error, err.Error())
416
			return err
417
		}
418
	}
419
420
	resp := invoker.lo.LookupEntitiesStream(ctx, request, server)
421
422
	duration := time.Since(start)
423
	invoker.lookupEntityDurationHistogram.Record(ctx, duration.Microseconds())
424
425
	// Increase the lookup entity count in the metrics.
426
	invoker.lookupEntityCounter.Add(ctx, 1)
427
428
	return resp
429
}
430
431
// LookupSubject is a method of the DirectInvoker structure. It handles the task of looking up subjects
432
// and returning the results in a response.
433
func (invoker *DirectInvoker) LookupSubject(ctx context.Context, request *base.PermissionLookupSubjectRequest) (response *base.PermissionLookupSubjectResponse, err error) {
434
	ctx, span := tracer.Start(ctx, "lookup-subject", trace.WithAttributes(
435
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
436
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
437
		attribute.KeyValue{Key: "permission", Value: attribute.StringValue(request.GetPermission())},
438
		attribute.KeyValue{Key: "subject_reference", Value: attribute.StringValue(tuple.ReferenceToString(request.GetSubjectReference()))},
439
	))
440
	defer span.End()
441
442
	start := time.Now()
443
444
	// Check if the request has a SnapToken. If not, a SnapToken is set.
445
	if request.GetMetadata().GetSnapToken() == "" {
446
		// Create an instance of SnapToken
447
		var st token.SnapToken
448
		// Retrieve the head snapshot from the relationship reader
449
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
450
		// If there's an error retrieving the snapshot, return the response and the error
451
		if err != nil {
452
			span.RecordError(err)
453
			span.SetStatus(otelCodes.Error, err.Error())
454
			return response, err
455
		}
456
		// Set the SnapToken in the request metadata
457
		request.Metadata.SnapToken = st.Encode().String()
458
	}
459
460
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
461
	if request.GetMetadata().GetSchemaVersion() == "" {
462
		// Retrieve the head schema version from the schema reader
463
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
464
		// If there's an error retrieving the schema version, return the response and the error
465
		if err != nil {
466
			span.RecordError(err)
467
			span.SetStatus(otelCodes.Error, err.Error())
468
			return response, err
469
		}
470
	}
471
472
	resp, err := invoker.lo.LookupSubject(ctx, request)
473
474
	duration := time.Now().Sub(start)
475
	invoker.lookupSubjectDurationHistogram.Record(ctx, duration.Microseconds())
476
477
	// Increase the lookup subject count in the metrics.
478
	invoker.lookupSubjectCounter.Add(ctx, 1)
479
480
	// Call the LookupSubject function of the ls field in the invoker, pass the context and request,
481
	// and return its response and error
482
	return resp, err
483
}
484
485
// SubjectPermission is a method of the DirectInvoker structure. It handles the task of subject's permissions
486
// and returning the results in a response.
487
func (invoker *DirectInvoker) SubjectPermission(ctx context.Context, request *base.PermissionSubjectPermissionRequest) (response *base.PermissionSubjectPermissionResponse, err error) {
488
	ctx, span := tracer.Start(ctx, "subject-permission", trace.WithAttributes(
489
		attribute.KeyValue{Key: "tenant_id", Value: attribute.StringValue(request.GetTenantId())},
490
		attribute.KeyValue{Key: "entity", Value: attribute.StringValue(tuple.EntityToString(request.GetEntity()))},
491
		attribute.KeyValue{Key: "subject", Value: attribute.StringValue(tuple.SubjectToString(request.GetSubject()))},
492
	))
493
	defer span.End()
494
495
	start := time.Now()
496
497
	// Check if the request has a SnapToken. If not, a SnapToken is set.
498
	if request.GetMetadata().GetSnapToken() == "" {
499
		// Create an instance of SnapToken
500
		var st token.SnapToken
501
		// Retrieve the head snapshot from the relationship reader
502
		st, err = invoker.dataReader.HeadSnapshot(ctx, request.GetTenantId())
503
		// If there's an error retrieving the snapshot, return the response and the error
504
		if err != nil {
505
			span.RecordError(err)
506
			span.SetStatus(otelCodes.Error, err.Error())
507
			return response, err
508
		}
509
		// Set the SnapToken in the request metadata
510
		request.Metadata.SnapToken = st.Encode().String()
511
	}
512
513
	// Similar to SnapToken, check if the request has a SchemaVersion. If not, a SchemaVersion is set.
514
	if request.GetMetadata().GetSchemaVersion() == "" {
515
		// Retrieve the head schema version from the schema reader
516
		request.Metadata.SchemaVersion, err = invoker.schemaReader.HeadVersion(ctx, request.GetTenantId())
517
		// If there's an error retrieving the schema version, return the response and the error
518
		if err != nil {
519
			span.RecordError(err)
520
			span.SetStatus(otelCodes.Error, err.Error())
521
			return response, err
522
		}
523
	}
524
	resp, err := invoker.sp.SubjectPermission(ctx, request)
525
526
	duration := time.Now().Sub(start)
527
	invoker.subjectPermissionDurationHistogram.Record(ctx, duration.Microseconds())
528
529
	// Increase the subject permission count in the metrics.
530
	invoker.subjectPermissionCounter.Add(ctx, 1)
531
532
	// Call the SubjectPermission function of the ls field in the invoker, pass the context and request,
533
	// and return its response and error
534
	return resp, err
535
}
536