Passed
Push — master ( 529ae6...944870 )
by Tolga
06:00 queued 02:43
created

internal/storage/postgres/schema_reader.go   A

Size/Duplication

Total Lines 306
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 38
eloc 189
dl 0
loc 306
rs 9.36
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
B postgres.*SchemaReader.ReadEntityDefinition 0 38 5
A postgres.*SchemaReader.HeadVersion 0 29 4
B postgres.*SchemaReader.ReadSchema 0 46 7
B postgres.*SchemaReader.ReadRuleDefinition 0 40 5
C postgres.*SchemaReader.ListSchemas 0 61 10
B postgres.*SchemaReader.ReadSchemaString 0 40 6
A postgres.NewSchemaReader 0 4 1
1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog"
7
8
	"github.com/jackc/pgx/v5"
9
10
	"github.com/Masterminds/squirrel"
11
	"github.com/rs/xid"
12
13
	"github.com/Permify/permify/internal"
14
	"github.com/Permify/permify/internal/schema"
15
	"github.com/Permify/permify/internal/storage"
16
	"github.com/Permify/permify/internal/storage/postgres/utils"
17
	"github.com/Permify/permify/pkg/database"
18
	db "github.com/Permify/permify/pkg/database/postgres"
19
	base "github.com/Permify/permify/pkg/pb/base/v1"
20
)
21
22
// SchemaReader - Structure for SchemaReader
23
type SchemaReader struct {
24
	database *db.Postgres
25
	// options
26
	txOptions pgx.TxOptions
27
}
28
29
// NewSchemaReader - Creates a new SchemaReader
30
func NewSchemaReader(database *db.Postgres) *SchemaReader {
31
	return &SchemaReader{
32
		database:  database,
33
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly},
34
	}
35
}
36
37
// ReadSchema returns the schema definition for a specific tenant and version as a structured object.
38
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (sch *base.SchemaDefinition, err error) {
39
	ctx, span := internal.Tracer.Start(ctx, "schema-reader.read-schema")
40
	defer span.End()
41
42
	slog.DebugContext(ctx, "reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
43
44
	builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
45
46
	var query string
47
	var args []interface{}
48
49
	query, args, err = builder.ToSql()
50
	if err != nil {
51
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
52
	}
53
54
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
55
56
	var rows pgx.Rows
57
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
58
	if err != nil {
59
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
60
	}
61
	defer rows.Close()
62
63
	var definitions []string
64
	for rows.Next() {
65
		sd := storage.SchemaDefinition{}
66
		err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
67
		if err != nil {
68
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
69
		}
70
		definitions = append(definitions, sd.Serialized())
71
	}
72
	if err = rows.Err(); err != nil {
73
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
74
	}
75
76
	slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definitions", len(definitions)))
77
78
	sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
79
	if err != nil {
80
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
81
	}
82
83
	return sch, err
84
}
85
86
// ReadSchemaString returns the schema definition for a specific tenant and version as a string.
87
func (r *SchemaReader) ReadSchemaString(ctx context.Context, tenantID, version string) (definitions []string, err error) {
88
	ctx, span := internal.Tracer.Start(ctx, "schema-reader.read-schema-string")
89
	defer span.End()
90
91
	slog.DebugContext(ctx, "reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
92
93
	builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
94
95
	var query string
96
	var args []interface{}
97
98
	query, args, err = builder.ToSql()
99
	if err != nil {
100
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
101
	}
102
103
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
104
105
	var rows pgx.Rows
106
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
107
	if err != nil {
108
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
109
	}
110
	defer rows.Close()
111
112
	for rows.Next() {
113
		sd := storage.SchemaDefinition{}
114
		err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
115
		if err != nil {
116
			return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
117
		}
118
		definitions = append(definitions, sd.Serialized())
119
	}
120
	if err = rows.Err(); err != nil {
121
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
122
	}
123
124
	slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definitions", len(definitions)))
125
126
	return definitions, err
127
}
128
129
// ReadEntityDefinition - Reads entity config from the repository.
130
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, version string) (definition *base.EntityDefinition, v string, err error) {
131
	ctx, span := internal.Tracer.Start(ctx, "schema-reader.read-entity-definition")
132
	defer span.End()
133
134
	slog.DebugContext(ctx, "reading entity definition", slog.Any("tenant_id", tenantID), slog.Any("version", version))
135
136
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
137
138
	var query string
139
	var args []interface{}
140
141
	query, args, err = builder.ToSql()
142
	if err != nil {
143
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
144
	}
145
146
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
147
148
	var def storage.SchemaDefinition
149
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
150
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
151
		if errors.Is(err, pgx.ErrNoRows) {
152
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
153
		}
154
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
155
	}
156
157
	var sch *base.SchemaDefinition
158
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
159
	if err != nil {
160
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
161
	}
162
163
	definition, err = schema.GetEntityByName(sch, name)
164
165
	slog.DebugContext(ctx, "successfully retrieved", slog.Any("schema definition", definition))
166
167
	return definition, def.Version, err
168
}
169
170
// ReadRuleDefinition - Reads rule config from the repository.
171
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, version string) (definition *base.RuleDefinition, v string, err error) {
172
	ctx, span := internal.Tracer.Start(ctx, "schema-reader.read-rule-definition")
173
	defer span.End()
174
175
	slog.DebugContext(ctx, "reading rule definition", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))
176
177
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
178
179
	var query string
180
	var args []interface{}
181
182
	query, args, err = builder.ToSql()
183
	if err != nil {
184
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
185
	}
186
187
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
188
189
	var def storage.SchemaDefinition
190
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
191
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
192
		if errors.Is(err, pgx.ErrNoRows) {
193
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
194
		}
195
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
196
	}
197
198
	slog.DebugContext(ctx, "successfully retrieved rule definition for", slog.Any("name", name))
199
200
	var sch *base.SchemaDefinition
201
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
202
	if err != nil {
203
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
204
	}
205
206
	definition, err = schema.GetRuleByName(sch, name)
207
208
	slog.DebugContext(ctx, "successfully created rule definition")
209
210
	return definition, def.Version, err
211
}
212
213
// HeadVersion - Finds the latest version of the schema.
214
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
215
	ctx, span := internal.Tracer.Start(ctx, "schema-reader.head-version")
216
	defer span.End()
217
218
	slog.DebugContext(ctx, "finding the latest version fo the schema for", slog.String("tenant_id", tenantID))
219
220
	var query string
221
	var args []interface{}
222
	query, args, err = r.database.Builder.
223
		Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
224
		ToSql()
225
	if err != nil {
226
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
227
	}
228
229
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
230
231
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
232
	err = row.Scan(&version)
233
	if err != nil {
234
		if errors.Is(err, pgx.ErrNoRows) {
235
			return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
236
		}
237
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
238
	}
239
240
	slog.DebugContext(ctx, "successfully found the latest schema version", slog.Any("version", version))
241
242
	return version, nil
243
}
244
245
// ListSchemas - List all Schemas
246
func (r *SchemaReader) ListSchemas(ctx context.Context, tenantID string, pagination database.Pagination) (schemas []*base.SchemaList, ct database.EncodedContinuousToken, err error) {
247
	ctx, span := internal.Tracer.Start(ctx, "tenant-reader.list-tenants")
248
	defer span.End()
249
250
	slog.DebugContext(ctx, "listing schemas with pagination", slog.Any("pagination", pagination))
251
252
	builder := r.database.Builder.Select("DISTINCT version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID})
253
	if pagination.Token() != "" {
254
		var t database.ContinuousToken
255
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
256
		if err != nil {
257
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
258
		}
259
		builder = builder.Where(squirrel.LtOrEq{"version": t.(utils.ContinuousToken).Value})
260
	}
261
262
	builder = builder.OrderBy("version DESC").Limit(uint64(pagination.PageSize() + 1))
263
264
	var query string
265
	var args []interface{}
266
267
	query, args, err = builder.ToSql()
268
	if err != nil {
269
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
270
	}
271
272
	slog.DebugContext(ctx, "executing sql query", slog.Any("query", query), slog.Any("arguments", args))
273
274
	var rows pgx.Rows
275
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
276
	if err != nil {
277
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
278
	}
279
	defer rows.Close()
280
281
	var lastVersion string
282
	schemas = make([]*base.SchemaList, 0, pagination.PageSize()+1)
283
	for rows.Next() {
284
		sch := &base.SchemaList{}
285
		err = rows.Scan(&sch.Version)
286
		if err != nil {
287
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
288
		}
289
		id, err := xid.FromString(sch.Version)
290
		if err != nil {
291
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
292
		}
293
		sch.CreatedAt = id.Time().String()
294
		lastVersion = sch.Version
295
		schemas = append(schemas, sch)
296
	}
297
	if err = rows.Err(); err != nil {
298
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
299
	}
300
301
	slog.DebugContext(ctx, "successfully listed schemas", slog.Any("number_of_schemas", len(schemas)))
302
303
	if len(schemas) > int(pagination.PageSize()) {
304
		return schemas[:pagination.PageSize()], utils.NewContinuousToken(lastVersion).Encode(), nil
305
	}
306
	return schemas, database.NewNoopContinuousToken().Encode(), nil
307
}
308