Passed
Push — master ( ec79d3...ff73ef )
by Tolga
01:11 queued 13s
created

servers.NewDataServer   C

Complexity

Conditions 8

Size

Total Lines 89
Code Lines 60

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 60
nop 4
dl 0
loc 89
rs 6.4424
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package servers
2
3
import (
4
	"log/slog"
5
	"time"
6
7
	otelCodes "go.opentelemetry.io/otel/codes"
8
	api "go.opentelemetry.io/otel/metric"
9
	"golang.org/x/net/context"
10
	"google.golang.org/grpc/status"
11
12
	"github.com/Permify/permify/internal/storage"
13
	"github.com/Permify/permify/internal/validation"
14
	"github.com/Permify/permify/pkg/attribute"
15
	"github.com/Permify/permify/pkg/database"
16
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
17
	"github.com/Permify/permify/pkg/tuple"
18
)
19
20
// DataServer - Structure for Data Server
21
type DataServer struct {
22
	v1.UnimplementedDataServer
23
24
	sr                           storage.SchemaReader
25
	dr                           storage.DataReader
26
	br                           storage.BundleReader
27
	dw                           storage.DataWriter
28
	writeDataHistogram           api.Int64Histogram
29
	deleteDataHistogram          api.Int64Histogram
30
	readAttributesHistogram      api.Int64Histogram
31
	readRelationshipsHistogram   api.Int64Histogram
32
	writeRelationshipsHistogram  api.Int64Histogram
33
	deleteRelationshipsHistogram api.Int64Histogram
34
	runBundleHistogram           api.Int64Histogram
35
}
36
37
// NewDataServer - Creates new Data Server
38
func NewDataServer(
39
	dr storage.DataReader,
40
	dw storage.DataWriter,
41
	br storage.BundleReader,
42
	sr storage.SchemaReader,
43
) *DataServer {
44
45
	writeDataHistogram, err := meter.Int64Histogram(
46
		"write_data",
47
		api.WithUnit("microseconds"),
48
		api.WithDescription("Duration of writing data in microseconds"),
49
	)
50
51
	if err != nil {
52
		panic(err)
53
	}
54
55
	deleteDataHistogram, err := meter.Int64Histogram(
56
		"delete_data",
57
		api.WithUnit("microseconds"),
58
		api.WithDescription("Duration of deleting data in microseconds"),
59
	)
60
61
	if err != nil {
62
		panic(err)
63
	}
64
65
	readAttributesHistogram, err := meter.Int64Histogram(
66
		"read_attributes",
67
		api.WithUnit("microseconds"),
68
		api.WithDescription("Duration of reading attributes in microseconds"),
69
	)
70
71
	if err != nil {
72
		panic(err)
73
	}
74
75
	readRelationshipsHistogram, err := meter.Int64Histogram(
76
		"read_relationships",
77
		api.WithUnit("microseconds"),
78
		api.WithDescription("Duration of reading relationships in microseconds"),
79
	)
80
81
	if err != nil {
82
		panic(err)
83
	}
84
85
	writeRelationshipsHistogram, err := meter.Int64Histogram(
86
		"write_relationships",
87
		api.WithUnit("microseconds"),
88
		api.WithDescription("Duration of writing relationships in microseconds"),
89
	)
90
91
	if err != nil {
92
		panic(err)
93
	}
94
95
	deleteRelationshipsHistogram, err := meter.Int64Histogram(
96
		"delete_relationships",
97
		api.WithUnit("microseconds"),
98
		api.WithDescription("Duration of deleting relationships in microseconds"),
99
	)
100
101
	if err != nil {
102
		panic(err)
103
	}
104
105
	runBundleHistogram, err := meter.Int64Histogram(
106
		"run_bundle",
107
		api.WithUnit("microseconds"),
108
		api.WithDescription("Duration of running bunble in microseconds"),
109
	)
110
111
	if err != nil {
112
		panic(err)
113
	}
114
115
	return &DataServer{
116
		dr:                           dr,
117
		dw:                           dw,
118
		br:                           br,
119
		sr:                           sr,
120
		writeDataHistogram:           writeDataHistogram,
121
		deleteDataHistogram:          deleteDataHistogram,
122
		readAttributesHistogram:      readAttributesHistogram,
123
		readRelationshipsHistogram:   readRelationshipsHistogram,
124
		writeRelationshipsHistogram:  writeRelationshipsHistogram,
125
		deleteRelationshipsHistogram: deleteRelationshipsHistogram,
126
		runBundleHistogram:           runBundleHistogram,
127
	}
128
}
129
130
// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
131
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
132
	ctx, span := tracer.Start(ctx, "data.read.relationships")
133
	defer span.End()
134
	start := time.Now()
135
136
	v := request.Validate()
137
	if v != nil {
138
		return nil, status.Error(GetStatus(v), v.Error())
139
	}
140
141
	snap := request.GetMetadata().GetSnapToken()
142
	if snap == "" {
143
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
144
		if err != nil {
145
			return nil, status.Error(GetStatus(err), err.Error())
146
		}
147
		snap = st.Encode().String()
148
	}
149
150
	collection, ct, err := r.dr.ReadRelationships(
151
		ctx,
152
		request.GetTenantId(),
153
		request.GetFilter(),
154
		snap,
155
		database.NewPagination(
156
			database.Size(request.GetPageSize()),
157
			database.Token(request.GetContinuousToken()),
158
		),
159
	)
160
	if err != nil {
161
		span.RecordError(err)
162
		span.SetStatus(otelCodes.Error, err.Error())
163
		slog.Error(err.Error())
164
		return nil, status.Error(GetStatus(err), err.Error())
165
	}
166
167
	duration := time.Now().Sub(start)
168
	r.readRelationshipsHistogram.Record(ctx, duration.Microseconds())
169
170
	return &v1.RelationshipReadResponse{
171
		Tuples:          collection.GetTuples(),
172
		ContinuousToken: ct.String(),
173
	}, nil
174
}
175
176
// ReadAttributes - Allows directly querying the stored engines data to display and filter stored attribute tuples
177
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
178
	ctx, span := tracer.Start(ctx, "data.read.attributes")
179
	defer span.End()
180
	start := time.Now()
181
182
	v := request.Validate()
183
	if v != nil {
184
		return nil, status.Error(GetStatus(v), v.Error())
185
	}
186
187
	snap := request.GetMetadata().GetSnapToken()
188
	if snap == "" {
189
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
190
		if err != nil {
191
			return nil, status.Error(GetStatus(err), err.Error())
192
		}
193
		snap = st.Encode().String()
194
	}
195
196
	collection, ct, err := r.dr.ReadAttributes(
197
		ctx,
198
		request.GetTenantId(),
199
		request.GetFilter(),
200
		snap,
201
		database.NewPagination(
202
			database.Size(request.GetPageSize()),
203
			database.Token(request.GetContinuousToken()),
204
		),
205
	)
206
	if err != nil {
207
		span.RecordError(err)
208
		span.SetStatus(otelCodes.Error, err.Error())
209
		slog.Error(err.Error())
210
		return nil, status.Error(GetStatus(err), err.Error())
211
	}
212
213
	duration := time.Now().Sub(start)
214
	r.readAttributesHistogram.Record(ctx, duration.Microseconds())
215
216
	return &v1.AttributeReadResponse{
217
		Attributes:      collection.GetAttributes(),
218
		ContinuousToken: ct.String(),
219
	}, nil
220
}
221
222
// Write - Write relationships and attributes to writeDB
223
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
224
	ctx, span := tracer.Start(ctx, "data.write")
225
	defer span.End()
226
	start := time.Now()
227
228
	v := request.Validate()
229
	if v != nil {
230
		return nil, status.Error(GetStatus(v), v.Error())
231
	}
232
233
	version := request.GetMetadata().GetSchemaVersion()
234
	if version == "" {
235
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
236
		if err != nil {
237
			span.RecordError(err)
238
			span.SetStatus(otelCodes.Error, err.Error())
239
			return nil, status.Error(GetStatus(err), err.Error())
240
		}
241
		version = v
242
	}
243
244
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
245
246
	relationshipsMap := map[string]struct{}{}
247
248
	for _, tup := range request.GetTuples() {
249
250
		key := tuple.ToString(tup)
251
252
		if _, ok := relationshipsMap[key]; ok {
253
			continue
254
		}
255
256
		relationshipsMap[key] = struct{}{}
257
258
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
259
		if err != nil {
260
			span.RecordError(err)
261
			span.SetStatus(otelCodes.Error, err.Error())
262
			return nil, status.Error(GetStatus(err), err.Error())
263
		}
264
265
		err = validation.ValidateTuple(definition, tup)
266
		if err != nil {
267
			span.RecordError(err)
268
			span.SetStatus(otelCodes.Error, err.Error())
269
			return nil, status.Error(GetStatus(err), err.Error())
270
		}
271
272
		relationships = append(relationships, tup)
273
	}
274
275
	attrs := make([]*v1.Attribute, 0, len(request.GetAttributes()))
276
277
	attributesMap := map[string]struct{}{}
278
279
	for _, attr := range request.GetAttributes() {
280
281
		key := attribute.EntityAndAttributeToString(attr.GetEntity(), attr.GetAttribute())
282
283
		if _, ok := attributesMap[key]; ok {
284
			continue
285
		}
286
287
		attributesMap[key] = struct{}{}
288
289
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), attr.GetEntity().GetType(), version)
290
		if err != nil {
291
			span.RecordError(err)
292
			span.SetStatus(otelCodes.Error, err.Error())
293
			return nil, status.Error(GetStatus(err), err.Error())
294
		}
295
296
		err = validation.ValidateAttribute(definition, attr)
297
		if err != nil {
298
			span.RecordError(err)
299
			span.SetStatus(otelCodes.Error, err.Error())
300
			return nil, status.Error(GetStatus(err), err.Error())
301
		}
302
303
		attrs = append(attrs, attr)
304
	}
305
306
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection(attrs...))
307
	if err != nil {
308
		span.RecordError(err)
309
		span.SetStatus(otelCodes.Error, err.Error())
310
		slog.Error(err.Error())
311
		return nil, status.Error(GetStatus(err), err.Error())
312
	}
313
314
	duration := time.Now().Sub(start)
315
	r.writeDataHistogram.Record(ctx, duration.Microseconds())
316
317
	return &v1.DataWriteResponse{
318
		SnapToken: snap.String(),
319
	}, nil
320
}
321
322
// WriteRelationships - Write relation tuples to writeDB
323
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
324
	ctx, span := tracer.Start(ctx, "relationships.write")
325
	defer span.End()
326
	start := time.Now()
327
328
	v := request.Validate()
329
	if v != nil {
330
		return nil, status.Error(GetStatus(v), v.Error())
331
	}
332
333
	version := request.GetMetadata().GetSchemaVersion()
334
	if version == "" {
335
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
336
		if err != nil {
337
			span.RecordError(err)
338
			span.SetStatus(otelCodes.Error, err.Error())
339
			return nil, status.Error(GetStatus(err), err.Error())
340
		}
341
		version = v
342
	}
343
344
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
345
346
	relationshipsMap := map[string]struct{}{}
347
348
	for _, tup := range request.GetTuples() {
349
350
		key := tuple.ToString(tup)
351
352
		if _, ok := relationshipsMap[key]; ok {
353
			continue
354
		}
355
356
		relationshipsMap[key] = struct{}{}
357
358
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
359
		if err != nil {
360
			span.RecordError(err)
361
			span.SetStatus(otelCodes.Error, err.Error())
362
			return nil, status.Error(GetStatus(err), err.Error())
363
		}
364
365
		err = validation.ValidateTuple(definition, tup)
366
		if err != nil {
367
			span.RecordError(err)
368
			span.SetStatus(otelCodes.Error, err.Error())
369
			return nil, status.Error(GetStatus(err), err.Error())
370
		}
371
372
		relationships = append(relationships, tup)
373
	}
374
375
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection())
376
	if err != nil {
377
		span.RecordError(err)
378
		span.SetStatus(otelCodes.Error, err.Error())
379
		slog.Error(err.Error())
380
		return nil, status.Error(GetStatus(err), err.Error())
381
	}
382
383
	duration := time.Now().Sub(start)
384
	r.writeRelationshipsHistogram.Record(ctx, duration.Microseconds())
385
386
	return &v1.RelationshipWriteResponse{
387
		SnapToken: snap.String(),
388
	}, nil
389
}
390
391
// Delete - Delete relationships and attributes from writeDB
392
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
393
	ctx, span := tracer.Start(ctx, "data.delete")
394
	defer span.End()
395
	start := time.Now()
396
397
	v := request.Validate()
398
	if v != nil {
399
		return nil, status.Error(GetStatus(v), v.Error())
400
	}
401
402
	err := validation.ValidateFilters(request.GetTupleFilter(), request.GetAttributeFilter())
403
	if err != nil {
404
		return nil, status.Error(GetStatus(v), v.Error())
405
	}
406
407
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetTupleFilter(), request.GetAttributeFilter())
408
	if err != nil {
409
		span.RecordError(err)
410
		span.SetStatus(otelCodes.Error, err.Error())
411
		slog.Error(err.Error())
412
		return nil, status.Error(GetStatus(err), err.Error())
413
	}
414
415
	duration := time.Now().Sub(start)
416
	r.deleteDataHistogram.Record(ctx, duration.Microseconds())
417
418
	return &v1.DataDeleteResponse{
419
		SnapToken: snap.String(),
420
	}, nil
421
}
422
423
// DeleteRelationships - Delete relationships from writeDB
424
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
425
	ctx, span := tracer.Start(ctx, "relationships.delete")
426
	defer span.End()
427
	start := time.Now()
428
429
	v := request.Validate()
430
	if v != nil {
431
		return nil, status.Error(GetStatus(v), v.Error())
432
	}
433
434
	err := validation.ValidateTupleFilter(request.GetFilter())
435
	if err != nil {
436
		return nil, status.Error(GetStatus(v), v.Error())
437
	}
438
439
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetFilter(), &v1.AttributeFilter{})
440
	if err != nil {
441
		span.RecordError(err)
442
		span.SetStatus(otelCodes.Error, err.Error())
443
		slog.Error(err.Error())
444
		return nil, status.Error(GetStatus(err), err.Error())
445
	}
446
447
	duration := time.Now().Sub(start)
448
	r.deleteRelationshipsHistogram.Record(ctx, duration.Microseconds())
449
450
	return &v1.RelationshipDeleteResponse{
451
		SnapToken: snap.String(),
452
	}, nil
453
}
454
455
// RunBundle executes a bundle and returns its snapshot token.
456
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
457
	ctx, span := tracer.Start(ctx, "bundle.run")
458
	defer span.End()
459
	start := time.Now()
460
461
	v := request.Validate()
462
	if v != nil {
463
		return nil, status.Error(GetStatus(v), v.Error())
464
	}
465
466
	bundle, err := r.br.Read(ctx, request.GetTenantId(), request.GetName())
467
	if err != nil {
468
		span.RecordError(err)
469
		span.SetStatus(otelCodes.Error, err.Error())
470
		slog.Error(err.Error())
471
		return nil, status.Error(GetStatus(err), err.Error())
472
	}
473
474
	err = validation.ValidateBundleArguments(bundle.GetArguments(), request.GetArguments())
475
	if err != nil {
476
		return nil, status.Error(GetStatus(err), err.Error())
477
	}
478
479
	snap, err := r.dw.RunBundle(ctx, request.GetTenantId(), request.GetArguments(), bundle)
480
	if err != nil {
481
		span.RecordError(err)
482
		span.SetStatus(otelCodes.Error, err.Error())
483
		slog.Error(err.Error())
484
		return nil, status.Error(GetStatus(err), err.Error())
485
	}
486
487
	duration := time.Now().Sub(start)
488
	r.runBundleHistogram.Record(ctx, duration.Microseconds())
489
490
	return &v1.BundleRunResponse{
491
		SnapToken: snap.String(),
492
	}, nil
493
}
494