Passed
Pull Request — master (#1349)
by
unknown
02:37
created

servers.*DataServer.RunBundle   B

Complexity

Conditions 5

Size

Total Lines 37
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 2
dl 0
loc 37
rs 8.7653
c 0
b 0
f 0
1
package servers
2
3
import (
4
	"log/slog"
5
	"time"
6
7
	otelCodes "go.opentelemetry.io/otel/codes"
8
	api "go.opentelemetry.io/otel/metric"
9
	"golang.org/x/net/context"
10
	"google.golang.org/grpc/status"
11
12
	"github.com/Permify/permify/internal/storage"
13
	"github.com/Permify/permify/internal/validation"
14
	"github.com/Permify/permify/pkg/attribute"
15
	"github.com/Permify/permify/pkg/database"
16
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
17
	"github.com/Permify/permify/pkg/telemetry"
18
	"github.com/Permify/permify/pkg/tuple"
19
)
20
21
// DataServer - Structure for Data Server
22
type DataServer struct {
23
	v1.UnimplementedDataServer
24
25
	sr                           storage.SchemaReader
26
	dr                           storage.DataReader
27
	br                           storage.BundleReader
28
	dw                           storage.DataWriter
29
	writeDataHistogram           api.Int64Histogram
30
	deleteDataHistogram          api.Int64Histogram
31
	readAttributesHistogram      api.Int64Histogram
32
	readRelationshipsHistogram   api.Int64Histogram
33
	writeRelationshipsHistogram  api.Int64Histogram
34
	deleteRelationshipsHistogram api.Int64Histogram
35
	runBundleHistogram           api.Int64Histogram
36
}
37
38
// NewDataServer - Creates new Data Server
39
func NewDataServer(
40
	dr storage.DataReader,
41
	dw storage.DataWriter,
42
	br storage.BundleReader,
43
	sr storage.SchemaReader,
44
) *DataServer {
45
46
	return &DataServer{
47
		dr:                           dr,
48
		dw:                           dw,
49
		br:                           br,
50
		sr:                           sr,
51
		writeDataHistogram:           telemetry.NewHistogram(meter, "write_data", "microseconds", "Duration of writing data in microseconds"),
52
		deleteDataHistogram:          telemetry.NewHistogram(meter, "delete_data", "microseconds", "Duration of deleting data in microseconds"),
53
		readAttributesHistogram:      telemetry.NewHistogram(meter, "read_attributes", "microseconds", "Duration of reading attributes in microseconds"),
54
		readRelationshipsHistogram:   telemetry.NewHistogram(meter, "read_relationships", "microseconds", "Duration of reading relationships in microseconds"),
55
		writeRelationshipsHistogram:  telemetry.NewHistogram(meter, "write_relationships", "microseconds", "Duration of writing relationships in microseconds"),
56
		deleteRelationshipsHistogram: telemetry.NewHistogram(meter, "delete_relationships", "microseconds", "Duration of deleting relationships in microseconds"),
57
		runBundleHistogram:           telemetry.NewHistogram(meter, "delete_relationships", "run_bundle", "Duration of running bunble in microseconds"),
58
	}
59
}
60
61
// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
62
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
63
	ctx, span := tracer.Start(ctx, "data.read.relationships")
64
	defer span.End()
65
	start := time.Now()
66
67
	v := request.Validate()
68
	if v != nil {
69
		return nil, status.Error(GetStatus(v), v.Error())
70
	}
71
72
	snap := request.GetMetadata().GetSnapToken()
73
	if snap == "" {
74
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
75
		if err != nil {
76
			return nil, status.Error(GetStatus(err), err.Error())
77
		}
78
		snap = st.Encode().String()
79
	}
80
81
	collection, ct, err := r.dr.ReadRelationships(
82
		ctx,
83
		request.GetTenantId(),
84
		request.GetFilter(),
85
		snap,
86
		database.NewPagination(
87
			database.Size(request.GetPageSize()),
88
			database.Token(request.GetContinuousToken()),
89
		),
90
	)
91
	if err != nil {
92
		span.RecordError(err)
93
		span.SetStatus(otelCodes.Error, err.Error())
94
		slog.Error(err.Error())
95
		return nil, status.Error(GetStatus(err), err.Error())
96
	}
97
98
	duration := time.Now().Sub(start)
99
	r.readRelationshipsHistogram.Record(ctx, duration.Microseconds())
100
101
	return &v1.RelationshipReadResponse{
102
		Tuples:          collection.GetTuples(),
103
		ContinuousToken: ct.String(),
104
	}, nil
105
}
106
107
// ReadAttributes - Allows directly querying the stored engines data to display and filter stored attribute tuples
108
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
109
	ctx, span := tracer.Start(ctx, "data.read.attributes")
110
	defer span.End()
111
	start := time.Now()
112
113
	v := request.Validate()
114
	if v != nil {
115
		return nil, status.Error(GetStatus(v), v.Error())
116
	}
117
118
	snap := request.GetMetadata().GetSnapToken()
119
	if snap == "" {
120
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
121
		if err != nil {
122
			return nil, status.Error(GetStatus(err), err.Error())
123
		}
124
		snap = st.Encode().String()
125
	}
126
127
	collection, ct, err := r.dr.ReadAttributes(
128
		ctx,
129
		request.GetTenantId(),
130
		request.GetFilter(),
131
		snap,
132
		database.NewPagination(
133
			database.Size(request.GetPageSize()),
134
			database.Token(request.GetContinuousToken()),
135
		),
136
	)
137
	if err != nil {
138
		span.RecordError(err)
139
		span.SetStatus(otelCodes.Error, err.Error())
140
		slog.Error(err.Error())
141
		return nil, status.Error(GetStatus(err), err.Error())
142
	}
143
144
	duration := time.Now().Sub(start)
145
	r.readAttributesHistogram.Record(ctx, duration.Microseconds())
146
147
	return &v1.AttributeReadResponse{
148
		Attributes:      collection.GetAttributes(),
149
		ContinuousToken: ct.String(),
150
	}, nil
151
}
152
153
// Write - Write relationships and attributes to writeDB
154
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
155
	ctx, span := tracer.Start(ctx, "data.write")
156
	defer span.End()
157
	start := time.Now()
158
159
	v := request.Validate()
160
	if v != nil {
161
		return nil, status.Error(GetStatus(v), v.Error())
162
	}
163
164
	version := request.GetMetadata().GetSchemaVersion()
165
	if version == "" {
166
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
167
		if err != nil {
168
			span.RecordError(err)
169
			span.SetStatus(otelCodes.Error, err.Error())
170
			return nil, status.Error(GetStatus(err), err.Error())
171
		}
172
		version = v
173
	}
174
175
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
176
177
	relationshipsMap := map[string]struct{}{}
178
179
	for _, tup := range request.GetTuples() {
180
181
		key := tuple.ToString(tup)
182
183
		if _, ok := relationshipsMap[key]; ok {
184
			continue
185
		}
186
187
		relationshipsMap[key] = struct{}{}
188
189
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
190
		if err != nil {
191
			span.RecordError(err)
192
			span.SetStatus(otelCodes.Error, err.Error())
193
			return nil, status.Error(GetStatus(err), err.Error())
194
		}
195
196
		err = validation.ValidateTuple(definition, tup)
197
		if err != nil {
198
			span.RecordError(err)
199
			span.SetStatus(otelCodes.Error, err.Error())
200
			return nil, status.Error(GetStatus(err), err.Error())
201
		}
202
203
		relationships = append(relationships, tup)
204
	}
205
206
	attrs := make([]*v1.Attribute, 0, len(request.GetAttributes()))
207
208
	attributesMap := map[string]struct{}{}
209
210
	for _, attr := range request.GetAttributes() {
211
212
		key := attribute.EntityAndAttributeToString(attr.GetEntity(), attr.GetAttribute())
213
214
		if _, ok := attributesMap[key]; ok {
215
			continue
216
		}
217
218
		attributesMap[key] = struct{}{}
219
220
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), attr.GetEntity().GetType(), version)
221
		if err != nil {
222
			span.RecordError(err)
223
			span.SetStatus(otelCodes.Error, err.Error())
224
			return nil, status.Error(GetStatus(err), err.Error())
225
		}
226
227
		err = validation.ValidateAttribute(definition, attr)
228
		if err != nil {
229
			span.RecordError(err)
230
			span.SetStatus(otelCodes.Error, err.Error())
231
			return nil, status.Error(GetStatus(err), err.Error())
232
		}
233
234
		attrs = append(attrs, attr)
235
	}
236
237
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection(attrs...))
238
	if err != nil {
239
		span.RecordError(err)
240
		span.SetStatus(otelCodes.Error, err.Error())
241
		slog.Error(err.Error())
242
		return nil, status.Error(GetStatus(err), err.Error())
243
	}
244
245
	duration := time.Now().Sub(start)
246
	r.writeDataHistogram.Record(ctx, duration.Microseconds())
247
248
	return &v1.DataWriteResponse{
249
		SnapToken: snap.String(),
250
	}, nil
251
}
252
253
// WriteRelationships - Write relation tuples to writeDB
254
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
255
	ctx, span := tracer.Start(ctx, "relationships.write")
256
	defer span.End()
257
	start := time.Now()
258
259
	v := request.Validate()
260
	if v != nil {
261
		return nil, status.Error(GetStatus(v), v.Error())
262
	}
263
264
	version := request.GetMetadata().GetSchemaVersion()
265
	if version == "" {
266
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
267
		if err != nil {
268
			span.RecordError(err)
269
			span.SetStatus(otelCodes.Error, err.Error())
270
			return nil, status.Error(GetStatus(err), err.Error())
271
		}
272
		version = v
273
	}
274
275
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
276
277
	relationshipsMap := map[string]struct{}{}
278
279
	for _, tup := range request.GetTuples() {
280
281
		key := tuple.ToString(tup)
282
283
		if _, ok := relationshipsMap[key]; ok {
284
			continue
285
		}
286
287
		relationshipsMap[key] = struct{}{}
288
289
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
290
		if err != nil {
291
			span.RecordError(err)
292
			span.SetStatus(otelCodes.Error, err.Error())
293
			return nil, status.Error(GetStatus(err), err.Error())
294
		}
295
296
		err = validation.ValidateTuple(definition, tup)
297
		if err != nil {
298
			span.RecordError(err)
299
			span.SetStatus(otelCodes.Error, err.Error())
300
			return nil, status.Error(GetStatus(err), err.Error())
301
		}
302
303
		relationships = append(relationships, tup)
304
	}
305
306
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection())
307
	if err != nil {
308
		span.RecordError(err)
309
		span.SetStatus(otelCodes.Error, err.Error())
310
		slog.Error(err.Error())
311
		return nil, status.Error(GetStatus(err), err.Error())
312
	}
313
314
	duration := time.Now().Sub(start)
315
	r.writeRelationshipsHistogram.Record(ctx, duration.Microseconds())
316
317
	return &v1.RelationshipWriteResponse{
318
		SnapToken: snap.String(),
319
	}, nil
320
}
321
322
// Delete - Delete relationships and attributes from writeDB
323
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
324
	ctx, span := tracer.Start(ctx, "data.delete")
325
	defer span.End()
326
	start := time.Now()
327
328
	v := request.Validate()
329
	if v != nil {
330
		return nil, status.Error(GetStatus(v), v.Error())
331
	}
332
333
	err := validation.ValidateFilters(request.GetTupleFilter(), request.GetAttributeFilter())
334
	if err != nil {
335
		return nil, status.Error(GetStatus(v), v.Error())
336
	}
337
338
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetTupleFilter(), request.GetAttributeFilter())
339
	if err != nil {
340
		span.RecordError(err)
341
		span.SetStatus(otelCodes.Error, err.Error())
342
		slog.Error(err.Error())
343
		return nil, status.Error(GetStatus(err), err.Error())
344
	}
345
346
	duration := time.Now().Sub(start)
347
	r.deleteDataHistogram.Record(ctx, duration.Microseconds())
348
349
	return &v1.DataDeleteResponse{
350
		SnapToken: snap.String(),
351
	}, nil
352
}
353
354
// DeleteRelationships - Delete relationships from writeDB
355
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
356
	ctx, span := tracer.Start(ctx, "relationships.delete")
357
	defer span.End()
358
	start := time.Now()
359
360
	v := request.Validate()
361
	if v != nil {
362
		return nil, status.Error(GetStatus(v), v.Error())
363
	}
364
365
	err := validation.ValidateTupleFilter(request.GetFilter())
366
	if err != nil {
367
		return nil, status.Error(GetStatus(v), v.Error())
368
	}
369
370
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetFilter(), &v1.AttributeFilter{})
371
	if err != nil {
372
		span.RecordError(err)
373
		span.SetStatus(otelCodes.Error, err.Error())
374
		slog.Error(err.Error())
375
		return nil, status.Error(GetStatus(err), err.Error())
376
	}
377
378
	duration := time.Now().Sub(start)
379
	r.deleteRelationshipsHistogram.Record(ctx, duration.Microseconds())
380
381
	return &v1.RelationshipDeleteResponse{
382
		SnapToken: snap.String(),
383
	}, nil
384
}
385
386
// RunBundle executes a bundle and returns its snapshot token.
387
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
388
	ctx, span := tracer.Start(ctx, "bundle.run")
389
	defer span.End()
390
	start := time.Now()
391
392
	v := request.Validate()
393
	if v != nil {
394
		return nil, status.Error(GetStatus(v), v.Error())
395
	}
396
397
	bundle, err := r.br.Read(ctx, request.GetTenantId(), request.GetName())
398
	if err != nil {
399
		span.RecordError(err)
400
		span.SetStatus(otelCodes.Error, err.Error())
401
		slog.Error(err.Error())
402
		return nil, status.Error(GetStatus(err), err.Error())
403
	}
404
405
	err = validation.ValidateBundleArguments(bundle.GetArguments(), request.GetArguments())
406
	if err != nil {
407
		return nil, status.Error(GetStatus(err), err.Error())
408
	}
409
410
	snap, err := r.dw.RunBundle(ctx, request.GetTenantId(), request.GetArguments(), bundle)
411
	if err != nil {
412
		span.RecordError(err)
413
		span.SetStatus(otelCodes.Error, err.Error())
414
		slog.Error(err.Error())
415
		return nil, status.Error(GetStatus(err), err.Error())
416
	}
417
418
	duration := time.Now().Sub(start)
419
	r.runBundleHistogram.Record(ctx, duration.Microseconds())
420
421
	return &v1.BundleRunResponse{
422
		SnapToken: snap.String(),
423
	}, nil
424
}
425