Passed
Push — master ( 529ae6...944870 )
by Tolga
06:00 queued 02:43
created

servers.*SchemaServer.List   A

Complexity

Conditions 3

Size

Total Lines 24
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 18
nop 2
dl 0
loc 24
rs 9.5
c 0
b 0
f 0
1
package servers
2
3
import (
4
	"log/slog"
5
	"strings"
6
7
	"github.com/rs/xid"
8
	api "go.opentelemetry.io/otel/metric"
9
	"google.golang.org/grpc/status"
10
11
	otelCodes "go.opentelemetry.io/otel/codes"
12
	"golang.org/x/net/context"
13
14
	"github.com/Permify/permify/internal"
15
	"github.com/Permify/permify/internal/storage"
16
	"github.com/Permify/permify/pkg/database"
17
	"github.com/Permify/permify/pkg/dsl/compiler"
18
	"github.com/Permify/permify/pkg/dsl/parser"
19
	v1 "github.com/Permify/permify/pkg/pb/base/v1"
20
	"github.com/Permify/permify/pkg/telemetry"
21
)
22
23
// SchemaServer - Structure for Schema Server
24
type SchemaServer struct {
25
	v1.UnimplementedSchemaServer
26
27
	sw                   storage.SchemaWriter
28
	sr                   storage.SchemaReader
29
	writeSchemaHistogram api.Int64Histogram
30
	readSchemaHistogram  api.Int64Histogram
31
	listSchemaHistogram  api.Int64Histogram
32
}
33
34
// NewSchemaServer - Creates new Schema Server
35
func NewSchemaServer(sw storage.SchemaWriter, sr storage.SchemaReader) *SchemaServer {
36
	return &SchemaServer{
37
		sw:                   sw,
38
		sr:                   sr,
39
		writeSchemaHistogram: telemetry.NewHistogram(internal.Meter, "write_schema", "amount", "Number of writing schema in"),
40
		readSchemaHistogram:  telemetry.NewHistogram(internal.Meter, "read_schema", "amount", "Number of reading schema"),
41
		listSchemaHistogram:  telemetry.NewHistogram(internal.Meter, "list_schema", "amount", "Number of listing schema"),
42
	}
43
}
44
45
// Write - Configure new Permify Schema to Permify
46
func (r *SchemaServer) Write(ctx context.Context, request *v1.SchemaWriteRequest) (*v1.SchemaWriteResponse, error) {
47
	ctx, span := internal.Tracer.Start(ctx, "schemas.write")
48
	defer span.End()
49
50
	sch, err := parser.NewParser(request.GetSchema()).Parse()
51
	if err != nil {
52
		span.RecordError(err)
53
		span.SetStatus(otelCodes.Error, err.Error())
54
		return nil, status.Error(GetStatus(err), err.Error())
55
	}
56
57
	_, _, err = compiler.NewCompiler(true, sch).Compile()
58
	if err != nil {
59
		span.RecordError(err)
60
		span.SetStatus(otelCodes.Error, err.Error())
61
		return nil, status.Error(GetStatus(err), err.Error())
62
	}
63
64
	version := xid.New().String()
65
66
	cnf := make([]storage.SchemaDefinition, 0, len(sch.Statements))
67
	for _, st := range sch.Statements {
68
		cnf = append(cnf, storage.SchemaDefinition{
69
			TenantID:             request.GetTenantId(),
70
			Version:              version,
71
			Name:                 st.GetName(),
72
			SerializedDefinition: []byte(st.String()),
73
		})
74
	}
75
76
	err = r.sw.WriteSchema(ctx, cnf)
77
	if err != nil {
78
		span.RecordError(err)
79
		span.SetStatus(otelCodes.Error, err.Error())
80
		slog.ErrorContext(ctx, err.Error())
81
		return nil, status.Error(GetStatus(err), err.Error())
82
	}
83
84
	r.writeSchemaHistogram.Record(ctx, 1)
85
86
	return &v1.SchemaWriteResponse{
87
		SchemaVersion: version,
88
	}, nil
89
}
90
91
// PartialWrite applies incremental updates to the schema of a specific tenant based on the provided request.
92
func (r *SchemaServer) PartialWrite(ctx context.Context, request *v1.SchemaPartialWriteRequest) (*v1.SchemaPartialWriteResponse, error) {
93
	// Start a new tracing span for monitoring and observability.
94
	ctx, span := internal.Tracer.Start(ctx, "schemas.partial-write")
95
	defer span.End() // Ensure the span is closed at the end of the function.
96
97
	// Retrieve or default the schema version from the request.
98
	version := request.GetMetadata().GetSchemaVersion()
99
	if version == "" { // If not provided, fetch the latest version.
100
		ver, err := r.sr.HeadVersion(ctx, request.GetTenantId())
101
		if err != nil {
102
			return nil, status.Error(GetStatus(err), err.Error()) // Return gRPC status error on failure.
103
		}
104
		version = ver
105
	}
106
107
	// Fetch the current schema definition as a string.
108
	definitions, err := r.sr.ReadSchemaString(ctx, request.GetTenantId(), version)
109
	if err != nil {
110
		span.RecordError(err) // Log and record the error.
111
		return nil, status.Error(GetStatus(err), err.Error())
112
	}
113
114
	// Parse the schema definitions into a structured format.
115
	p := parser.NewParser(strings.Join(definitions, "\n"))
116
	schema, err := p.Parse()
117
	if err != nil {
118
		span.RecordError(err) // Log and record the error.
119
		return nil, status.Error(GetStatus(err), err.Error())
120
	}
121
122
	// Iterate through each partial update in the request and apply changes.
123
	for entityName, partials := range request.GetPartials() {
124
		for _, write := range partials.GetWrite() { // Handle new schema statements.
125
			pr := parser.NewParser(write)
126
			stmt, err := pr.ParsePartial(entityName)
127
			if err != nil {
128
				span.RecordError(err)
129
				return nil, status.Error(GetStatus(err), err.Error())
130
			}
131
			err = schema.AddStatement(entityName, stmt)
132
			if err != nil {
133
				span.RecordError(err)
134
				return nil, status.Error(GetStatus(err), err.Error())
135
			}
136
		}
137
138
		for _, update := range partials.GetUpdate() { // Handle schema updates.
139
			pr := parser.NewParser(update)
140
			stmt, err := pr.ParsePartial(entityName)
141
			if err != nil {
142
				span.RecordError(err)
143
				return nil, status.Error(GetStatus(err), err.Error())
144
			}
145
			err = schema.UpdateStatement(entityName, stmt)
146
			if err != nil {
147
				span.RecordError(err)
148
				return nil, status.Error(GetStatus(err), err.Error())
149
			}
150
		}
151
152
		for _, del := range partials.GetDelete() { // Handle schema deletions.
153
			err = schema.DeleteStatement(entityName, del)
154
			if err != nil {
155
				span.RecordError(err)
156
				return nil, status.Error(GetStatus(err), err.Error())
157
			}
158
		}
159
	}
160
161
	// Re-parse the updated schema to ensure consistency.
162
	sch, err := parser.NewParser(schema.String()).Parse()
163
	if err != nil {
164
		span.RecordError(err)
165
		return nil, status.Error(GetStatus(err), err.Error())
166
	}
167
168
	// Compile the new schema to validate its correctness.
169
	_, _, err = compiler.NewCompiler(true, sch).Compile()
170
	if err != nil {
171
		span.RecordError(err)
172
		return nil, status.Error(GetStatus(err), err.Error())
173
	}
174
175
	// Generate a new version ID for the updated schema.
176
	newVersion := xid.New().String()
177
178
	// Prepare the new schema definition for storage.
179
	cnf := make([]storage.SchemaDefinition, 0, len(sch.Statements))
180
	for _, st := range sch.Statements {
181
		cnf = append(cnf, storage.SchemaDefinition{
182
			TenantID:             request.GetTenantId(),
183
			Version:              newVersion,
184
			Name:                 st.GetName(),
185
			SerializedDefinition: []byte(st.String()),
186
		})
187
	}
188
189
	// Write the updated schema to storage.
190
	err = r.sw.WriteSchema(ctx, cnf)
191
	if err != nil {
192
		span.RecordError(err)
193
		return nil, status.Error(GetStatus(err), err.Error())
194
	}
195
196
	// Return the response with the new schema version.
197
	return &v1.SchemaPartialWriteResponse{
198
		SchemaVersion: newVersion,
199
	}, nil
200
}
201
202
// Read - Read created Schema
203
func (r *SchemaServer) Read(ctx context.Context, request *v1.SchemaReadRequest) (*v1.SchemaReadResponse, error) {
204
	ctx, span := internal.Tracer.Start(ctx, "schemas.read")
205
	defer span.End()
206
207
	version := request.GetMetadata().GetSchemaVersion()
208
	if version == "" {
209
		ver, err := r.sr.HeadVersion(ctx, request.GetTenantId())
210
		if err != nil {
211
			return nil, status.Error(GetStatus(err), err.Error())
212
		}
213
		version = ver
214
	}
215
216
	response, err := r.sr.ReadSchema(ctx, request.GetTenantId(), version)
217
	if err != nil {
218
		span.RecordError(err)
219
		span.SetStatus(otelCodes.Error, err.Error())
220
		slog.ErrorContext(ctx, err.Error())
221
		return nil, status.Error(GetStatus(err), err.Error())
222
	}
223
224
	r.readSchemaHistogram.Record(ctx, 1)
225
226
	return &v1.SchemaReadResponse{
227
		Schema: response,
228
	}, nil
229
}
230
231
// List - List Schemas
232
func (r *SchemaServer) List(ctx context.Context, request *v1.SchemaListRequest) (*v1.SchemaListResponse, error) {
233
	ctx, span := internal.Tracer.Start(ctx, "schemas.list")
234
	defer span.End()
235
236
	schemas, ct, err := r.sr.ListSchemas(ctx, request.GetTenantId(), database.NewPagination(database.Size(request.GetPageSize()), database.Token(request.GetContinuousToken())))
237
	if err != nil {
238
		span.RecordError(err)
239
		span.SetStatus(otelCodes.Error, err.Error())
240
		slog.ErrorContext(ctx, err.Error())
241
		return nil, status.Error(GetStatus(err), err.Error())
242
	}
243
244
	head, err := r.sr.HeadVersion(ctx, request.GetTenantId())
245
	if err != nil {
246
		return nil, status.Error(GetStatus(err), err.Error())
247
	}
248
249
	r.listSchemaHistogram.Record(ctx, 1)
250
251
	return &v1.SchemaListResponse{
252
		Head:            head,
253
		Schemas:         schemas,
254
		ContinuousToken: ct.String(),
255
	}, nil
256
}
257