Passed
Push — master ( 529ae6...944870 )
by Tolga
06:00 queued 02:43
created

internal/servers/data_server.go   B

Size/Duplication

Total Lines 417
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 48
eloc 277
dl 0
loc 417
rs 8.5599
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
B servers.*DataServer.RunBundle 0 35 5
A servers.*DataServer.Delete 0 27 4
C servers.*DataServer.WriteRelationships 0 64 9
D servers.*DataServer.Write 0 94 13
A servers.NewDataServer 0 18 1
A servers.*DataServer.DeleteRelationships 0 27 4
B servers.*DataServer.ReadAttributes 0 46 6
B servers.*DataServer.ReadRelationships 0 46 6
1
package servers
2
3
import (
4
	"log/slog"
5
6
	otelCodes "go.opentelemetry.io/otel/codes"
7
	api "go.opentelemetry.io/otel/metric"
8
	"golang.org/x/net/context"
9
	"google.golang.org/grpc/status"
10
11
	"github.com/Permify/permify/internal"
12
	"github.com/Permify/permify/internal/storage"
13
	"github.com/Permify/permify/internal/validation"
14
	"github.com/Permify/permify/pkg/attribute"
15
	"github.com/Permify/permify/pkg/database"
16
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
17
	"github.com/Permify/permify/pkg/telemetry"
18
	"github.com/Permify/permify/pkg/tuple"
19
)
20
21
// DataServer - Structure for Data Server
22
type DataServer struct {
23
	v1.UnimplementedDataServer
24
25
	sr                           storage.SchemaReader
26
	dr                           storage.DataReader
27
	br                           storage.BundleReader
28
	dw                           storage.DataWriter
29
	writeDataHistogram           api.Int64Histogram
30
	deleteDataHistogram          api.Int64Histogram
31
	readAttributesHistogram      api.Int64Histogram
32
	readRelationshipsHistogram   api.Int64Histogram
33
	writeRelationshipsHistogram  api.Int64Histogram
34
	deleteRelationshipsHistogram api.Int64Histogram
35
	runBundleHistogram           api.Int64Histogram
36
}
37
38
// NewDataServer - Creates new Data Server
39
func NewDataServer(
40
	dr storage.DataReader,
41
	dw storage.DataWriter,
42
	br storage.BundleReader,
43
	sr storage.SchemaReader,
44
) *DataServer {
45
	return &DataServer{
46
		dr:                           dr,
47
		dw:                           dw,
48
		br:                           br,
49
		sr:                           sr,
50
		writeDataHistogram:           telemetry.NewHistogram(internal.Meter, "write_data", "amount", "Number of writing data"),
51
		deleteDataHistogram:          telemetry.NewHistogram(internal.Meter, "delete_data", "amount", "Number of deleting data"),
52
		readAttributesHistogram:      telemetry.NewHistogram(internal.Meter, "read_attributes", "amount", "Number of reading attributes"),
53
		readRelationshipsHistogram:   telemetry.NewHistogram(internal.Meter, "read_relationships", "amount", "Number of reading relationships"),
54
		writeRelationshipsHistogram:  telemetry.NewHistogram(internal.Meter, "write_relationships", "amount", "Number of writing relationships"),
55
		deleteRelationshipsHistogram: telemetry.NewHistogram(internal.Meter, "delete_relationships", "amount", "Number of deleting relationships"),
56
		runBundleHistogram:           telemetry.NewHistogram(internal.Meter, "run_bundle", "amount", "Number of running bunble"),
57
	}
58
}
59
60
// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
61
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
62
	ctx, span := internal.Tracer.Start(ctx, "data.read.relationships")
63
	defer span.End()
64
65
	size := request.GetPageSize()
66
	if size == 0 {
67
		size = 50
68
	}
69
70
	v := request.Validate()
71
	if v != nil {
72
		return nil, status.Error(GetStatus(v), v.Error())
73
	}
74
75
	snap := request.GetMetadata().GetSnapToken()
76
	if snap == "" {
77
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
78
		if err != nil {
79
			return nil, status.Error(GetStatus(err), err.Error())
80
		}
81
		snap = st.Encode().String()
82
	}
83
84
	collection, ct, err := r.dr.ReadRelationships(
85
		ctx,
86
		request.GetTenantId(),
87
		request.GetFilter(),
88
		snap,
89
		database.NewPagination(
90
			database.Size(size),
91
			database.Token(request.GetContinuousToken()),
92
		),
93
	)
94
	if err != nil {
95
		span.RecordError(err)
96
		span.SetStatus(otelCodes.Error, err.Error())
97
		slog.ErrorContext(ctx, err.Error())
98
		return nil, status.Error(GetStatus(err), err.Error())
99
	}
100
101
	r.readRelationshipsHistogram.Record(ctx, 1)
102
103
	return &v1.RelationshipReadResponse{
104
		Tuples:          collection.GetTuples(),
105
		ContinuousToken: ct.String(),
106
	}, nil
107
}
108
109
// ReadAttributes - Allows directly querying the stored engines data to display and filter stored attribute tuples
110
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
111
	ctx, span := internal.Tracer.Start(ctx, "data.read.attributes")
112
	defer span.End()
113
114
	size := request.GetPageSize()
115
	if size == 0 {
116
		size = 50
117
	}
118
119
	v := request.Validate()
120
	if v != nil {
121
		return nil, status.Error(GetStatus(v), v.Error())
122
	}
123
124
	snap := request.GetMetadata().GetSnapToken()
125
	if snap == "" {
126
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
127
		if err != nil {
128
			return nil, status.Error(GetStatus(err), err.Error())
129
		}
130
		snap = st.Encode().String()
131
	}
132
133
	collection, ct, err := r.dr.ReadAttributes(
134
		ctx,
135
		request.GetTenantId(),
136
		request.GetFilter(),
137
		snap,
138
		database.NewPagination(
139
			database.Size(size),
140
			database.Token(request.GetContinuousToken()),
141
		),
142
	)
143
	if err != nil {
144
		span.RecordError(err)
145
		span.SetStatus(otelCodes.Error, err.Error())
146
		slog.ErrorContext(ctx, err.Error())
147
		return nil, status.Error(GetStatus(err), err.Error())
148
	}
149
150
	r.readAttributesHistogram.Record(ctx, 1)
151
152
	return &v1.AttributeReadResponse{
153
		Attributes:      collection.GetAttributes(),
154
		ContinuousToken: ct.String(),
155
	}, nil
156
}
157
158
// Write - Write relationships and attributes to writeDB
159
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
160
	ctx, span := internal.Tracer.Start(ctx, "data.write")
161
	defer span.End()
162
163
	v := request.Validate()
164
	if v != nil {
165
		return nil, status.Error(GetStatus(v), v.Error())
166
	}
167
168
	version := request.GetMetadata().GetSchemaVersion()
169
	if version == "" {
170
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
171
		if err != nil {
172
			span.RecordError(err)
173
			span.SetStatus(otelCodes.Error, err.Error())
174
			return nil, status.Error(GetStatus(err), err.Error())
175
		}
176
		version = v
177
	}
178
179
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
180
181
	relationshipsMap := map[string]struct{}{}
182
183
	for _, tup := range request.GetTuples() {
184
		key := tuple.ToString(tup)
185
186
		if _, ok := relationshipsMap[key]; ok {
187
			continue
188
		}
189
190
		relationshipsMap[key] = struct{}{}
191
192
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
193
		if err != nil {
194
			span.RecordError(err)
195
			span.SetStatus(otelCodes.Error, err.Error())
196
			return nil, status.Error(GetStatus(err), err.Error())
197
		}
198
199
		err = validation.ValidateTuple(definition, tup)
200
		if err != nil {
201
			span.RecordError(err)
202
			span.SetStatus(otelCodes.Error, err.Error())
203
			return nil, status.Error(GetStatus(err), err.Error())
204
		}
205
206
		relationships = append(relationships, tup)
207
	}
208
209
	attrs := make([]*v1.Attribute, 0, len(request.GetAttributes()))
210
211
	attributesMap := map[string]struct{}{}
212
213
	for _, attr := range request.GetAttributes() {
214
215
		key := attribute.EntityAndAttributeToString(attr.GetEntity(), attr.GetAttribute())
216
217
		if _, ok := attributesMap[key]; ok {
218
			continue
219
		}
220
221
		attributesMap[key] = struct{}{}
222
223
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), attr.GetEntity().GetType(), version)
224
		if err != nil {
225
			span.RecordError(err)
226
			span.SetStatus(otelCodes.Error, err.Error())
227
			return nil, status.Error(GetStatus(err), err.Error())
228
		}
229
230
		err = validation.ValidateAttribute(definition, attr)
231
		if err != nil {
232
			span.RecordError(err)
233
			span.SetStatus(otelCodes.Error, err.Error())
234
			return nil, status.Error(GetStatus(err), err.Error())
235
		}
236
237
		attrs = append(attrs, attr)
238
	}
239
240
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection(attrs...))
241
	if err != nil {
242
		span.RecordError(err)
243
		span.SetStatus(otelCodes.Error, err.Error())
244
		slog.ErrorContext(ctx, err.Error())
245
		return nil, status.Error(GetStatus(err), err.Error())
246
	}
247
248
	r.writeDataHistogram.Record(ctx, 1)
249
250
	return &v1.DataWriteResponse{
251
		SnapToken: snap.String(),
252
	}, nil
253
}
254
255
// WriteRelationships - Write relation tuples to writeDB
256
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
257
	ctx, span := internal.Tracer.Start(ctx, "relationships.write")
258
	defer span.End()
259
260
	v := request.Validate()
261
	if v != nil {
262
		return nil, status.Error(GetStatus(v), v.Error())
263
	}
264
265
	version := request.GetMetadata().GetSchemaVersion()
266
	if version == "" {
267
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
268
		if err != nil {
269
			span.RecordError(err)
270
			span.SetStatus(otelCodes.Error, err.Error())
271
			return nil, status.Error(GetStatus(err), err.Error())
272
		}
273
		version = v
274
	}
275
276
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
277
278
	relationshipsMap := map[string]struct{}{}
279
280
	for _, tup := range request.GetTuples() {
281
282
		key := tuple.ToString(tup)
283
284
		if _, ok := relationshipsMap[key]; ok {
285
			continue
286
		}
287
288
		relationshipsMap[key] = struct{}{}
289
290
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
291
		if err != nil {
292
			span.RecordError(err)
293
			span.SetStatus(otelCodes.Error, err.Error())
294
			return nil, status.Error(GetStatus(err), err.Error())
295
		}
296
297
		err = validation.ValidateTuple(definition, tup)
298
		if err != nil {
299
			span.RecordError(err)
300
			span.SetStatus(otelCodes.Error, err.Error())
301
			return nil, status.Error(GetStatus(err), err.Error())
302
		}
303
304
		relationships = append(relationships, tup)
305
	}
306
307
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection())
308
	if err != nil {
309
		span.RecordError(err)
310
		span.SetStatus(otelCodes.Error, err.Error())
311
		slog.ErrorContext(ctx, err.Error())
312
		return nil, status.Error(GetStatus(err), err.Error())
313
	}
314
315
	r.writeRelationshipsHistogram.Record(ctx, 1)
316
317
	return &v1.RelationshipWriteResponse{
318
		SnapToken: snap.String(),
319
	}, nil
320
}
321
322
// Delete - Delete relationships and attributes from writeDB
323
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
324
	ctx, span := internal.Tracer.Start(ctx, "data.delete")
325
	defer span.End()
326
327
	v := request.Validate()
328
	if v != nil {
329
		return nil, status.Error(GetStatus(v), v.Error())
330
	}
331
332
	err := validation.ValidateFilters(request.GetTupleFilter(), request.GetAttributeFilter())
333
	if err != nil {
334
		return nil, status.Error(GetStatus(v), err.Error())
335
	}
336
337
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetTupleFilter(), request.GetAttributeFilter())
338
	if err != nil {
339
		span.RecordError(err)
340
		span.SetStatus(otelCodes.Error, err.Error())
341
		slog.ErrorContext(ctx, err.Error())
342
		return nil, status.Error(GetStatus(err), err.Error())
343
	}
344
345
	r.deleteDataHistogram.Record(ctx, 1)
346
347
	return &v1.DataDeleteResponse{
348
		SnapToken: snap.String(),
349
	}, nil
350
}
351
352
// DeleteRelationships - Delete relationships from writeDB
353
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
354
	ctx, span := internal.Tracer.Start(ctx, "relationships.delete")
355
	defer span.End()
356
357
	v := request.Validate()
358
	if v != nil {
359
		return nil, status.Error(GetStatus(v), v.Error())
360
	}
361
362
	err := validation.ValidateTupleFilter(request.GetFilter())
363
	if err != nil {
364
		return nil, status.Error(GetStatus(v), err.Error())
365
	}
366
367
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetFilter(), &v1.AttributeFilter{})
368
	if err != nil {
369
		span.RecordError(err)
370
		span.SetStatus(otelCodes.Error, err.Error())
371
		slog.ErrorContext(ctx, err.Error())
372
		return nil, status.Error(GetStatus(err), err.Error())
373
	}
374
375
	r.deleteRelationshipsHistogram.Record(ctx, 1)
376
377
	return &v1.RelationshipDeleteResponse{
378
		SnapToken: snap.String(),
379
	}, nil
380
}
381
382
// RunBundle executes a bundle and returns its snapshot token.
383
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
384
	ctx, span := internal.Tracer.Start(ctx, "bundle.run")
385
	defer span.End()
386
387
	v := request.Validate()
388
	if v != nil {
389
		return nil, status.Error(GetStatus(v), v.Error())
390
	}
391
392
	bundle, err := r.br.Read(ctx, request.GetTenantId(), request.GetName())
393
	if err != nil {
394
		span.RecordError(err)
395
		span.SetStatus(otelCodes.Error, err.Error())
396
		slog.ErrorContext(ctx, err.Error())
397
		return nil, status.Error(GetStatus(err), err.Error())
398
	}
399
400
	err = validation.ValidateBundleArguments(bundle.GetArguments(), request.GetArguments())
401
	if err != nil {
402
		return nil, status.Error(GetStatus(err), err.Error())
403
	}
404
405
	snap, err := r.dw.RunBundle(ctx, request.GetTenantId(), request.GetArguments(), bundle)
406
	if err != nil {
407
		span.RecordError(err)
408
		span.SetStatus(otelCodes.Error, err.Error())
409
		slog.ErrorContext(ctx, err.Error())
410
		return nil, status.Error(GetStatus(err), err.Error())
411
	}
412
413
	r.runBundleHistogram.Record(ctx, 1)
414
415
	return &v1.BundleRunResponse{
416
		SnapToken: snap.String(),
417
	}, nil
418
}
419