Passed
Pull Request — master (#1153)
by Tolga
02:32
created

postgres.*SchemaReader.ListSchemas   C

Complexity

Conditions 10

Size

Total Lines 61
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 42
nop 3
dl 0
loc 61
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*SchemaReader.ListSchemas often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"database/sql"
6
	"errors"
7
	"log/slog"
8
9
	"github.com/Masterminds/squirrel"
10
	"github.com/rs/xid"
11
12
	"github.com/Permify/permify/internal/schema"
13
	"github.com/Permify/permify/internal/storage"
14
	"github.com/Permify/permify/internal/storage/postgres/utils"
15
	"github.com/Permify/permify/pkg/database"
16
	db "github.com/Permify/permify/pkg/database/postgres"
17
	base "github.com/Permify/permify/pkg/pb/base/v1"
18
)
19
20
// SchemaReader - Structure for SchemaReader
21
type SchemaReader struct {
22
	database *db.Postgres
23
	// options
24
	txOptions sql.TxOptions
25
}
26
27
// NewSchemaReader - Creates a new SchemaReader
28
func NewSchemaReader(database *db.Postgres) *SchemaReader {
29
	return &SchemaReader{
30
		database:  database,
31
		txOptions: sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: true},
32
	}
33
}
34
35
// ReadSchema returns the schema definition for a specific tenant and version as a structured object.
36
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (sch *base.SchemaDefinition, err error) {
37
	ctx, span := tracer.Start(ctx, "schema-reader.read-schema")
38
	defer span.End()
39
40
	slog.Debug("reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
41
42
	builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
43
44
	var query string
45
	var args []interface{}
46
47
	query, args, err = builder.ToSql()
48
	if err != nil {
49
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
50
	}
51
52
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
53
54
	var rows *sql.Rows
55
	rows, err = r.database.DB.QueryContext(ctx, query, args...)
56
	if err != nil {
57
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
58
	}
59
	defer rows.Close()
60
61
	var definitions []string
62
	for rows.Next() {
63
		sd := storage.SchemaDefinition{}
64
		err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
65
		if err != nil {
66
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
67
		}
68
		definitions = append(definitions, sd.Serialized())
69
	}
70
	if err = rows.Err(); err != nil {
71
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
72
	}
73
74
	slog.Debug("successfully retrieved", slog.Any("schema definitions", len(definitions)))
75
76
	sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
77
	if err != nil {
78
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
79
	}
80
81
	return sch, err
82
}
83
84
// ReadSchemaString returns the schema definition for a specific tenant and version as a string.
85
func (r *SchemaReader) ReadSchemaString(ctx context.Context, tenantID, version string) (definitions []string, err error) {
86
	ctx, span := tracer.Start(ctx, "schema-reader.read-schema-string")
87
	defer span.End()
88
89
	slog.Debug("reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
90
91
	builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
92
93
	var query string
94
	var args []interface{}
95
96
	query, args, err = builder.ToSql()
97
	if err != nil {
98
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
99
	}
100
101
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
102
103
	var rows *sql.Rows
104
	rows, err = r.database.DB.QueryContext(ctx, query, args...)
105
	if err != nil {
106
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
107
	}
108
	defer rows.Close()
109
110
	for rows.Next() {
111
		sd := storage.SchemaDefinition{}
112
		err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
113
		if err != nil {
114
			return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
115
		}
116
		definitions = append(definitions, sd.Serialized())
117
	}
118
	if err = rows.Err(); err != nil {
119
		return []string{}, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
120
	}
121
122
	slog.Debug("successfully retrieved", slog.Any("schema definitions", len(definitions)))
123
124
	return definitions, err
125
}
126
127
// ReadEntityDefinition - Reads entity config from the repository.
128
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, version string) (definition *base.EntityDefinition, v string, err error) {
129
	ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition")
130
	defer span.End()
131
132
	slog.Debug("reading entity definition", slog.Any("tenant_id", tenantID), slog.Any("version", version))
133
134
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
135
136
	var query string
137
	var args []interface{}
138
139
	query, args, err = builder.ToSql()
140
	if err != nil {
141
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
142
	}
143
144
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
145
146
	var def storage.SchemaDefinition
147
	row := r.database.DB.QueryRowContext(ctx, query, args...)
148
	if err = row.Err(); err != nil {
149
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
150
	}
151
152
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
153
		if errors.Is(err, sql.ErrNoRows) {
154
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
155
		}
156
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
157
	}
158
159
	var sch *base.SchemaDefinition
160
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
161
	if err != nil {
162
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
163
	}
164
165
	definition, err = schema.GetEntityByName(sch, name)
166
167
	slog.Debug("successfully retrieved", slog.Any("schema definition", definition))
168
169
	return definition, def.Version, err
170
}
171
172
// ReadRuleDefinition - Reads rule config from the repository.
173
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, version string) (definition *base.RuleDefinition, v string, err error) {
174
	ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition")
175
	defer span.End()
176
177
	slog.Debug("reading rule definition", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))
178
179
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
180
181
	var query string
182
	var args []interface{}
183
184
	query, args, err = builder.ToSql()
185
	if err != nil {
186
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
187
	}
188
189
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
190
191
	var def storage.SchemaDefinition
192
	row := r.database.DB.QueryRowContext(ctx, query, args...)
193
	if err = row.Err(); err != nil {
194
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
195
	}
196
197
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
198
		if errors.Is(err, sql.ErrNoRows) {
199
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
200
		}
201
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
202
	}
203
204
	slog.Debug("successfully retrieved rule definition for", slog.Any("name", name))
205
206
	var sch *base.SchemaDefinition
207
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
208
	if err != nil {
209
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
210
	}
211
212
	definition, err = schema.GetRuleByName(sch, name)
213
214
	slog.Debug("successfully created rule definition")
215
216
	return definition, def.Version, err
217
}
218
219
// HeadVersion - Finds the latest version of the schema.
220
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
221
	ctx, span := tracer.Start(ctx, "schema-reader.head-version")
222
	defer span.End()
223
224
	slog.Debug("finding the latest version fo the schema for", slog.String("tenant_id", tenantID))
225
226
	var query string
227
	var args []interface{}
228
	query, args, err = r.database.Builder.
229
		Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
230
		ToSql()
231
	if err != nil {
232
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
233
	}
234
235
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
236
237
	row := r.database.DB.QueryRowContext(ctx, query, args...)
238
	err = row.Scan(&version)
239
	if err != nil {
240
		if errors.Is(err, sql.ErrNoRows) {
241
			return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
242
		}
243
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
244
	}
245
246
	slog.Debug("successfully found the latest schema version", slog.Any("version", version))
247
248
	return version, nil
249
}
250
251
// ListSchemas - List all Schemas
252
func (r *SchemaReader) ListSchemas(ctx context.Context, tenantID string, pagination database.Pagination) (schemas []*base.SchemaList, ct database.EncodedContinuousToken, err error) {
253
	ctx, span := tracer.Start(ctx, "tenant-reader.list-tenants")
254
	defer span.End()
255
256
	slog.Debug("listing schemas with pagination", slog.Any("pagination", pagination))
257
258
	builder := r.database.Builder.Select("DISTINCT version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID})
259
	if pagination.Token() != "" {
260
		var t database.ContinuousToken
261
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
262
		if err != nil {
263
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
264
		}
265
		builder = builder.Where(squirrel.GtOrEq{"version": t.(utils.ContinuousToken).Value})
266
	}
267
268
	builder = builder.OrderBy("version").Limit(uint64(pagination.PageSize() + 1))
269
270
	var query string
271
	var args []interface{}
272
273
	query, args, err = builder.ToSql()
274
	if err != nil {
275
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
276
	}
277
278
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
279
280
	var rows *sql.Rows
281
	rows, err = r.database.DB.QueryContext(ctx, query, args...)
282
	if err != nil {
283
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
284
	}
285
	defer rows.Close()
286
287
	var lastVersion string
288
	schemas = make([]*base.SchemaList, 0, pagination.PageSize()+1)
289
	for rows.Next() {
290
		schema := &base.SchemaList{}
291
		err = rows.Scan(&schema.Version)
292
		if err != nil {
293
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
294
		}
295
		id, err := xid.FromString(schema.Version)
296
		if err != nil {
297
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
298
		}
299
		schema.CreatedAt = id.Time().String()
300
		lastVersion = schema.Version
301
		schemas = append(schemas, schema)
302
	}
303
	if err = rows.Err(); err != nil {
304
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
305
	}
306
307
	slog.Debug("successfully listed schemas", slog.Any("number_of_schemas", len(schemas)))
308
309
	if len(schemas) > int(pagination.PageSize()) {
310
		return schemas[:pagination.PageSize()], utils.NewContinuousToken(lastVersion).Encode(), nil
311
	}
312
	return schemas, database.NewNoopContinuousToken().Encode(), nil
313
}
314