Passed
Pull Request — master (#1153)
by Tolga
02:32
created

servers.*SchemaServer.PartialWrite   F

Complexity

Conditions 18

Size

Total Lines 108
Code Lines 67

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 67
nop 2
dl 0
loc 108
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like servers.*SchemaServer.PartialWrite often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package servers
2
3
import (
4
	"log/slog"
5
	"strings"
6
7
	"github.com/rs/xid"
8
	"google.golang.org/grpc/status"
9
10
	otelCodes "go.opentelemetry.io/otel/codes"
11
	"golang.org/x/net/context"
12
13
	"github.com/Permify/permify/internal/storage"
14
	"github.com/Permify/permify/pkg/database"
15
	"github.com/Permify/permify/pkg/dsl/compiler"
16
	"github.com/Permify/permify/pkg/dsl/parser"
17
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
18
)
19
20
// SchemaServer - Structure for Schema Server
21
type SchemaServer struct {
22
	v1.UnimplementedSchemaServer
23
24
	sw storage.SchemaWriter
25
	sr storage.SchemaReader
26
}
27
28
// NewSchemaServer - Creates new Schema Server
29
func NewSchemaServer(sw storage.SchemaWriter, sr storage.SchemaReader) *SchemaServer {
30
	return &SchemaServer{
31
		sw: sw,
32
		sr: sr,
33
	}
34
}
35
36
// Write - Configure new Permify Schema to Permify
37
func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest) (*v1.SchemaWriteResponse, error) {
38
	ctx, span := tracer.Start(ctx, "schemas.write")
39
	defer span.End()
40
41
	sch, err := parser.NewParser(request.GetSchema()).Parse()
42
	if err != nil {
43
		span.RecordError(err)
44
		span.SetStatus(otelCodes.Error, err.Error())
45
		return nil, status.Error(GetStatus(err), err.Error())
46
	}
47
48
	_, _, err = compiler.NewCompiler(true, sch).Compile()
49
	if err != nil {
50
		span.RecordError(err)
51
		span.SetStatus(otelCodes.Error, err.Error())
52
		return nil, status.Error(GetStatus(err), err.Error())
53
	}
54
55
	version := xid.New().String()
56
57
	cnf := make([]storage.SchemaDefinition, 0, len(sch.Statements))
58
	for _, st := range sch.Statements {
59
		cnf = append(cnf, storage.SchemaDefinition{
60
			TenantID:             request.GetTenantId(),
61
			Version:              version,
62
			Name:                 st.GetName(),
63
			SerializedDefinition: []byte(st.String()),
64
		})
65
	}
66
67
	err = r.sw.WriteSchema(ctx, cnf)
68
	if err != nil {
69
		span.RecordError(err)
70
		span.SetStatus(otelCodes.Error, err.Error())
71
		slog.Error(err.Error())
72
		return nil, status.Error(GetStatus(err), err.Error())
73
	}
74
75
	return &v1.SchemaWriteResponse{
76
		SchemaVersion: version,
77
	}, nil
78
}
79
80
// PartialWrite applies incremental updates to the schema of a specific tenant based on the provided request.
81
func (r *SchemaServer) PartialWrite(ctx context.Context, request *v1.SchemaPartialWriteRequest) (*v1.SchemaPartialWriteResponse, error) {
82
	// Start a new tracing span for monitoring and observability.
83
	ctx, span := tracer.Start(ctx, "schemas.partial-write")
84
	defer span.End() // Ensure the span is closed at the end of the function.
85
86
	// Retrieve or default the schema version from the request.
87
	version := request.GetMetadata().GetSchemaVersion()
88
	if version == "" { // If not provided, fetch the latest version.
89
		ver, err := r.sr.HeadVersion(ctx, request.GetTenantId())
90
		if err != nil {
91
			return nil, status.Error(GetStatus(err), err.Error()) // Return gRPC status error on failure.
92
		}
93
		version = ver
94
	}
95
96
	// Fetch the current schema definition as a string.
97
	definitions, err := r.sr.ReadSchemaString(ctx, request.GetTenantId(), version)
98
	if err != nil {
99
		span.RecordError(err) // Log and record the error.
100
		return nil, status.Error(GetStatus(err), err.Error())
101
	}
102
103
	// Parse the schema definitions into a structured format.
104
	p := parser.NewParser(strings.Join(definitions, "\n"))
105
	schema, err := p.Parse()
106
	if err != nil {
107
		span.RecordError(err) // Log and record the error.
108
		return nil, status.Error(GetStatus(err), err.Error())
109
	}
110
111
	// Iterate through each partial update in the request and apply changes.
112
	for entityName, partials := range request.GetPartials() {
113
		for _, write := range partials.GetWrite() { // Handle new schema statements.
114
			pr := parser.NewParser(write)
115
			stmt, err := pr.ParsePartial(entityName)
116
			if err != nil {
117
				span.RecordError(err)
118
				return nil, status.Error(GetStatus(err), err.Error())
119
			}
120
			err = schema.AddStatement(entityName, stmt)
121
			if err != nil {
122
				span.RecordError(err)
123
				return nil, status.Error(GetStatus(err), err.Error())
124
			}
125
		}
126
127
		for _, update := range partials.GetUpdate() { // Handle schema updates.
128
			pr := parser.NewParser(update)
129
			stmt, err := pr.ParsePartial(entityName)
130
			if err != nil {
131
				span.RecordError(err)
132
				return nil, status.Error(GetStatus(err), err.Error())
133
			}
134
			err = schema.UpdateStatement(entityName, stmt)
135
			if err != nil {
136
				span.RecordError(err)
137
				return nil, status.Error(GetStatus(err), err.Error())
138
			}
139
		}
140
141
		for _, del := range partials.GetDelete() { // Handle schema deletions.
142
			err = schema.DeleteStatement(entityName, del)
143
			if err != nil {
144
				span.RecordError(err)
145
				return nil, status.Error(GetStatus(err), err.Error())
146
			}
147
		}
148
	}
149
150
	// Re-parse the updated schema to ensure consistency.
151
	sch, err := parser.NewParser(schema.String()).Parse()
152
	if err != nil {
153
		span.RecordError(err)
154
		return nil, status.Error(GetStatus(err), err.Error())
155
	}
156
157
	// Compile the new schema to validate its correctness.
158
	_, _, err = compiler.NewCompiler(true, sch).Compile()
159
	if err != nil {
160
		span.RecordError(err)
161
		return nil, status.Error(GetStatus(err), err.Error())
162
	}
163
164
	// Generate a new version ID for the updated schema.
165
	newVersion := xid.New().String()
166
167
	// Prepare the new schema definition for storage.
168
	cnf := make([]storage.SchemaDefinition, 0, len(sch.Statements))
169
	for _, st := range sch.Statements {
170
		cnf = append(cnf, storage.SchemaDefinition{
171
			TenantID:             request.GetTenantId(),
172
			Version:              newVersion,
173
			Name:                 st.GetName(),
174
			SerializedDefinition: []byte(st.String()),
175
		})
176
	}
177
178
	// Write the updated schema to storage.
179
	err = r.sw.WriteSchema(ctx, cnf)
180
	if err != nil {
181
		span.RecordError(err)
182
		return nil, status.Error(GetStatus(err), err.Error())
183
	}
184
185
	// Return the response with the new schema version.
186
	return &v1.SchemaPartialWriteResponse{
187
		SchemaVersion: newVersion,
188
	}, nil
189
}
190
191
// Read - Read created Schema
192
func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) (*v1.SchemaReadResponse, error) {
193
	ctx, span := tracer.Start(ctx, "schemas.read")
194
	defer span.End()
195
196
	version := request.GetMetadata().GetSchemaVersion()
197
	if version == "" {
198
		ver, err := r.sr.HeadVersion(ctx, request.GetTenantId())
199
		if err != nil {
200
			return nil, status.Error(GetStatus(err), err.Error())
201
		}
202
		version = ver
203
	}
204
205
	response, err := r.sr.ReadSchema(ctx, request.GetTenantId(), version)
206
	if err != nil {
207
		span.RecordError(err)
208
		span.SetStatus(otelCodes.Error, err.Error())
209
		slog.Error(err.Error())
210
		return nil, status.Error(GetStatus(err), err.Error())
211
	}
212
213
	return &v1.SchemaReadResponse{
214
		Schema: response,
215
	}, nil
216
}
217
218
// List - List Schemas
219
func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest) (*v1.SchemaListResponse, error) {
220
	ctx, span := tracer.Start(ctx, "schemas.list")
221
	defer span.End()
222
223
	schemas, ct, err := r.sr.ListSchemas(ctx, request.GetTenantId(), database.NewPagination(database.Size(request.GetPageSize()), database.Token(request.GetContinuousToken())))
224
	if err != nil {
225
		span.RecordError(err)
226
		span.SetStatus(otelCodes.Error, err.Error())
227
		slog.Error(err.Error())
228
		return nil, status.Error(GetStatus(err), err.Error())
229
	}
230
231
	head, err := r.sr.HeadVersion(ctx, request.GetTenantId())
232
	if err != nil {
233
		return nil, status.Error(GetStatus(err), err.Error())
234
	}
235
236
	return &v1.SchemaListResponse{
237
		Head:            head,
238
		Schemas:         schemas,
239
		ContinuousToken: ct.String(),
240
	}, nil
241
}
242