Passed
Push — master ( 2d3f7e...154da4 )
by Tolga
01:14 queued 13s
created

servers.*DataServer.WriteRelationships   B

Complexity

Conditions 8

Size

Total Lines 51
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 36
nop 2
dl 0
loc 51
rs 7.1493
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package servers
2
3
import (
4
	"log/slog"
5
6
	"google.golang.org/grpc/status"
7
8
	otelCodes "go.opentelemetry.io/otel/codes"
9
	"golang.org/x/net/context"
10
11
	"github.com/Permify/permify/internal/storage"
12
	"github.com/Permify/permify/internal/validation"
13
	"github.com/Permify/permify/pkg/database"
14
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
15
)
16
17
// DataServer - Structure for Data Server
18
type DataServer struct {
19
	v1.UnimplementedDataServer
20
21
	sr storage.SchemaReader
22
	dr storage.DataReader
23
	br storage.BundleReader
24
	dw storage.DataWriter
25
}
26
27
// NewDataServer - Creates new Data Server
28
func NewDataServer(
29
	dr storage.DataReader,
30
	dw storage.DataWriter,
31
	br storage.BundleReader,
32
	sr storage.SchemaReader,
33
) *DataServer {
34
	return &DataServer{
35
		dr: dr,
36
		dw: dw,
37
		br: br,
38
		sr: sr,
39
	}
40
}
41
42
// ReadRelationships - Allows directly querying the stored engines data to display and filter stored relational tuples
43
func (r *DataServer) ReadRelationships(ctx context.Context, request *v1.RelationshipReadRequest) (*v1.RelationshipReadResponse, error) {
44
	ctx, span := tracer.Start(ctx, "data.read.relationships")
45
	defer span.End()
46
47
	v := request.Validate()
48
	if v != nil {
49
		return nil, v
50
	}
51
52
	snap := request.GetMetadata().GetSnapToken()
53
	if snap == "" {
54
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
55
		if err != nil {
56
			return nil, err
57
		}
58
		snap = st.Encode().String()
59
	}
60
61
	collection, ct, err := r.dr.ReadRelationships(
62
		ctx,
63
		request.GetTenantId(),
64
		request.GetFilter(),
65
		snap,
66
		database.NewPagination(
67
			database.Size(request.GetPageSize()),
68
			database.Token(request.GetContinuousToken()),
69
		),
70
	)
71
	if err != nil {
72
		span.RecordError(err)
73
		span.SetStatus(otelCodes.Error, err.Error())
74
		slog.Error(err.Error())
75
		return nil, status.Error(GetStatus(err), err.Error())
76
	}
77
78
	return &v1.RelationshipReadResponse{
79
		Tuples:          collection.GetTuples(),
80
		ContinuousToken: ct.String(),
81
	}, nil
82
}
83
84
// ReadAttributes - Allows directly querying the stored engines data to display and filter stored attribute tuples
85
func (r *DataServer) ReadAttributes(ctx context.Context, request *v1.AttributeReadRequest) (*v1.AttributeReadResponse, error) {
86
	ctx, span := tracer.Start(ctx, "data.read.attributes")
87
	defer span.End()
88
89
	v := request.Validate()
90
	if v != nil {
91
		return nil, v
92
	}
93
94
	snap := request.GetMetadata().GetSnapToken()
95
	if snap == "" {
96
		st, err := r.dr.HeadSnapshot(ctx, request.GetTenantId())
97
		if err != nil {
98
			return nil, err
99
		}
100
		snap = st.Encode().String()
101
	}
102
103
	collection, ct, err := r.dr.ReadAttributes(
104
		ctx,
105
		request.GetTenantId(),
106
		request.GetFilter(),
107
		snap,
108
		database.NewPagination(
109
			database.Size(request.GetPageSize()),
110
			database.Token(request.GetContinuousToken()),
111
		),
112
	)
113
	if err != nil {
114
		span.RecordError(err)
115
		span.SetStatus(otelCodes.Error, err.Error())
116
		slog.Error(err.Error())
117
		return nil, status.Error(GetStatus(err), err.Error())
118
	}
119
120
	return &v1.AttributeReadResponse{
121
		Attributes:      collection.GetAttributes(),
122
		ContinuousToken: ct.String(),
123
	}, nil
124
}
125
126
// Write - Write relationships and attributes to writeDB
127
func (r *DataServer) Write(ctx context.Context, request *v1.DataWriteRequest) (*v1.DataWriteResponse, error) {
128
	ctx, span := tracer.Start(ctx, "data.write")
129
	defer span.End()
130
131
	v := request.Validate()
132
	if v != nil {
133
		return nil, v
134
	}
135
136
	version := request.GetMetadata().GetSchemaVersion()
137
	if version == "" {
138
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
139
		if err != nil {
140
			span.RecordError(err)
141
			span.SetStatus(otelCodes.Error, err.Error())
142
			return nil, err
143
		}
144
		version = v
145
	}
146
147
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
148
149
	for _, tup := range request.GetTuples() {
150
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
151
		if err != nil {
152
			span.RecordError(err)
153
			span.SetStatus(otelCodes.Error, err.Error())
154
			return nil, err
155
		}
156
157
		err = validation.ValidateTuple(definition, tup)
158
		if err != nil {
159
			span.RecordError(err)
160
			span.SetStatus(otelCodes.Error, err.Error())
161
			return nil, err
162
		}
163
164
		relationships = append(relationships, tup)
165
	}
166
167
	attributes := make([]*v1.Attribute, 0, len(request.GetAttributes()))
168
169
	for _, attribute := range request.GetAttributes() {
170
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), attribute.GetEntity().GetType(), version)
171
		if err != nil {
172
			span.RecordError(err)
173
			span.SetStatus(otelCodes.Error, err.Error())
174
			return nil, err
175
		}
176
177
		err = validation.ValidateAttribute(definition, attribute)
178
		if err != nil {
179
			span.RecordError(err)
180
			span.SetStatus(otelCodes.Error, err.Error())
181
			return nil, err
182
		}
183
184
		attributes = append(attributes, attribute)
185
	}
186
187
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection(attributes...))
188
	if err != nil {
189
		span.RecordError(err)
190
		span.SetStatus(otelCodes.Error, err.Error())
191
		slog.Error(err.Error())
192
		return nil, status.Error(GetStatus(err), err.Error())
193
	}
194
195
	return &v1.DataWriteResponse{
196
		SnapToken: snap.String(),
197
	}, nil
198
}
199
200
// WriteRelationships - Write relation tuples to writeDB
201
func (r *DataServer) WriteRelationships(ctx context.Context, request *v1.RelationshipWriteRequest) (*v1.RelationshipWriteResponse, error) {
202
	ctx, span := tracer.Start(ctx, "relationships.write")
203
	defer span.End()
204
205
	v := request.Validate()
206
	if v != nil {
207
		return nil, v
208
	}
209
210
	version := request.GetMetadata().GetSchemaVersion()
211
	if version == "" {
212
		v, err := r.sr.HeadVersion(ctx, request.GetTenantId())
213
		if err != nil {
214
			span.RecordError(err)
215
			span.SetStatus(otelCodes.Error, err.Error())
216
			return nil, err
217
		}
218
		version = v
219
	}
220
221
	relationships := make([]*v1.Tuple, 0, len(request.GetTuples()))
222
223
	for _, tup := range request.GetTuples() {
224
		definition, _, err := r.sr.ReadEntityDefinition(ctx, request.GetTenantId(), tup.GetEntity().GetType(), version)
225
		if err != nil {
226
			span.RecordError(err)
227
			span.SetStatus(otelCodes.Error, err.Error())
228
			return nil, err
229
		}
230
231
		err = validation.ValidateTuple(definition, tup)
232
		if err != nil {
233
			span.RecordError(err)
234
			span.SetStatus(otelCodes.Error, err.Error())
235
			return nil, err
236
		}
237
238
		relationships = append(relationships, tup)
239
	}
240
241
	snap, err := r.dw.Write(ctx, request.GetTenantId(), database.NewTupleCollection(relationships...), database.NewAttributeCollection())
242
	if err != nil {
243
		span.RecordError(err)
244
		span.SetStatus(otelCodes.Error, err.Error())
245
		slog.Error(err.Error())
246
		return nil, status.Error(GetStatus(err), err.Error())
247
	}
248
249
	return &v1.RelationshipWriteResponse{
250
		SnapToken: snap.String(),
251
	}, nil
252
}
253
254
// Delete - Delete relationships and attributes from writeDB
255
func (r *DataServer) Delete(ctx context.Context, request *v1.DataDeleteRequest) (*v1.DataDeleteResponse, error) {
256
	ctx, span := tracer.Start(ctx, "data.delete")
257
	defer span.End()
258
259
	v := request.Validate()
260
	if v != nil {
261
		return nil, v
262
	}
263
264
	err := validation.ValidateFilters(request.GetTupleFilter(), request.GetAttributeFilter())
265
	if err != nil {
266
		return nil, v
267
	}
268
269
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetTupleFilter(), request.GetAttributeFilter())
270
	if err != nil {
271
		span.RecordError(err)
272
		span.SetStatus(otelCodes.Error, err.Error())
273
		slog.Error(err.Error())
274
		return nil, status.Error(GetStatus(err), err.Error())
275
	}
276
277
	return &v1.DataDeleteResponse{
278
		SnapToken: snap.String(),
279
	}, nil
280
}
281
282
// DeleteRelationships - Delete relationships from writeDB
283
func (r *DataServer) DeleteRelationships(ctx context.Context, request *v1.RelationshipDeleteRequest) (*v1.RelationshipDeleteResponse, error) {
284
	ctx, span := tracer.Start(ctx, "relationships.delete")
285
	defer span.End()
286
287
	v := request.Validate()
288
	if v != nil {
289
		return nil, v
290
	}
291
292
	err := validation.ValidateTupleFilter(request.GetFilter())
293
	if err != nil {
294
		return nil, v
295
	}
296
297
	snap, err := r.dw.Delete(ctx, request.GetTenantId(), request.GetFilter(), &v1.AttributeFilter{})
298
	if err != nil {
299
		span.RecordError(err)
300
		span.SetStatus(otelCodes.Error, err.Error())
301
		slog.Error(err.Error())
302
		return nil, status.Error(GetStatus(err), err.Error())
303
	}
304
305
	return &v1.RelationshipDeleteResponse{
306
		SnapToken: snap.String(),
307
	}, nil
308
}
309
310
// RunBundle executes a bundle and returns its snapshot token.
311
func (r *DataServer) RunBundle(ctx context.Context, request *v1.BundleRunRequest) (*v1.BundleRunResponse, error) {
312
	ctx, span := tracer.Start(ctx, "bundle.run")
313
	defer span.End()
314
315
	v := request.Validate()
316
	if v != nil {
317
		return nil, v
318
	}
319
320
	bundle, err := r.br.Read(ctx, request.GetTenantId(), request.GetName())
321
	if err != nil {
322
		span.RecordError(err)
323
		span.SetStatus(otelCodes.Error, err.Error())
324
		slog.Error(err.Error())
325
		return nil, status.Error(GetStatus(err), err.Error())
326
	}
327
328
	err = validation.ValidateBundleArguments(bundle.GetArguments(), request.GetArguments())
329
	if err != nil {
330
		return nil, err
331
	}
332
333
	snap, err := r.dw.RunBundle(ctx, request.GetTenantId(), request.GetArguments(), bundle)
334
	if err != nil {
335
		span.RecordError(err)
336
		span.SetStatus(otelCodes.Error, err.Error())
337
		slog.Error(err.Error())
338
		return nil, status.Error(GetStatus(err), err.Error())
339
	}
340
341
	return &v1.BundleRunResponse{
342
		SnapToken: snap.String(),
343
	}, nil
344
}
345