Passed
Push — master ( 136f25...d79624 )
by Tolga
01:26 queued 27s
created

postgres.*SchemaReader.ListSchemas   C

Complexity

Conditions 10

Size

Total Lines 61
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 42
nop 3
dl 0
loc 61
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*SchemaReader.ListSchemas often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"database/sql"
6
	"errors"
7
	"log/slog"
8
9
	"github.com/Masterminds/squirrel"
10
	"github.com/rs/xid"
11
12
	"github.com/Permify/permify/internal/schema"
13
	"github.com/Permify/permify/internal/storage"
14
	"github.com/Permify/permify/internal/storage/postgres/utils"
15
	"github.com/Permify/permify/pkg/database"
16
	db "github.com/Permify/permify/pkg/database/postgres"
17
	base "github.com/Permify/permify/pkg/pb/base/v1"
18
)
19
20
// SchemaReader - Structure for SchemaReader
21
type SchemaReader struct {
22
	database *db.Postgres
23
	// options
24
	txOptions sql.TxOptions
25
}
26
27
// NewSchemaReader - Creates a new SchemaReader
28
func NewSchemaReader(database *db.Postgres) *SchemaReader {
29
	return &SchemaReader{
30
		database:  database,
31
		txOptions: sql.TxOptions{Isolation: sql.LevelReadCommitted, ReadOnly: true},
32
	}
33
}
34
35
// ReadSchema - Reads entity config from the repository.
36
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (sch *base.SchemaDefinition, err error) {
37
	ctx, span := tracer.Start(ctx, "schema-reader.read-schema")
38
	defer span.End()
39
40
	slog.Debug("reading schema", slog.Any("tenant_id", tenantID), slog.Any("version", version))
41
42
	builder := r.database.Builder.Select("name, serialized_definition, version").From(SchemaDefinitionTable).Where(squirrel.Eq{"version": version, "tenant_id": tenantID})
43
44
	var query string
45
	var args []interface{}
46
47
	query, args, err = builder.ToSql()
48
	if err != nil {
49
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
50
	}
51
52
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
53
54
	var rows *sql.Rows
55
	rows, err = r.database.DB.QueryContext(ctx, query, args...)
56
	if err != nil {
57
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
58
	}
59
	defer rows.Close()
60
61
	var definitions []string
62
	for rows.Next() {
63
		sd := storage.SchemaDefinition{}
64
		err = rows.Scan(&sd.Name, &sd.SerializedDefinition, &sd.Version)
65
		if err != nil {
66
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
67
		}
68
		definitions = append(definitions, sd.Serialized())
69
	}
70
	if err = rows.Err(); err != nil {
71
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
72
	}
73
74
	slog.Debug("successfully retrieved", slog.Any("schema definitions", len(definitions)))
75
76
	sch, err = schema.NewSchemaFromStringDefinitions(false, definitions...)
77
	if err != nil {
78
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
79
	}
80
81
	slog.Debug("successfully created schema")
82
83
	return sch, err
84
}
85
86
// ReadEntityDefinition - Reads entity config from the repository.
87
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, name, version string) (definition *base.EntityDefinition, v string, err error) {
88
	ctx, span := tracer.Start(ctx, "schema-reader.read-entity-definition")
89
	defer span.End()
90
91
	slog.Debug("reading entity definition", slog.Any("tenant_id", tenantID), slog.Any("version", version))
92
93
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
94
95
	var query string
96
	var args []interface{}
97
98
	query, args, err = builder.ToSql()
99
	if err != nil {
100
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
101
	}
102
103
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
104
105
	var def storage.SchemaDefinition
106
	row := r.database.DB.QueryRowContext(ctx, query, args...)
107
	if err = row.Err(); err != nil {
108
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
109
	}
110
111
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
112
		if errors.Is(err, sql.ErrNoRows) {
113
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
114
		}
115
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
116
	}
117
118
	var sch *base.SchemaDefinition
119
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
120
	if err != nil {
121
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
122
	}
123
124
	definition, err = schema.GetEntityByName(sch, name)
125
126
	slog.Debug("successfully retrieved", slog.Any("schema definition", definition))
127
128
	return definition, def.Version, err
129
}
130
131
// ReadRuleDefinition - Reads rule config from the repository.
132
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, name, version string) (definition *base.RuleDefinition, v string, err error) {
133
	ctx, span := tracer.Start(ctx, "schema-reader.read-rule-definition")
134
	defer span.End()
135
136
	slog.Debug("reading rule definition", slog.Any("tenant_id", tenantID), slog.Any("name", name), slog.Any("version", version))
137
138
	builder := r.database.Builder.Select("name, serialized_definition, version").Where(squirrel.Eq{"name": name, "version": version, "tenant_id": tenantID}).From(SchemaDefinitionTable).Limit(1)
139
140
	var query string
141
	var args []interface{}
142
143
	query, args, err = builder.ToSql()
144
	if err != nil {
145
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
146
	}
147
148
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
149
150
	var def storage.SchemaDefinition
151
	row := r.database.DB.QueryRowContext(ctx, query, args...)
152
	if err = row.Err(); err != nil {
153
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
154
	}
155
156
	if err = row.Scan(&def.Name, &def.SerializedDefinition, &def.Version); err != nil {
157
		if errors.Is(err, sql.ErrNoRows) {
158
			return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
159
		}
160
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
161
	}
162
163
	slog.Debug("successfully retrieved rule definition for", slog.Any("name", name))
164
165
	var sch *base.SchemaDefinition
166
	sch, err = schema.NewSchemaFromStringDefinitions(false, def.Serialized())
167
	if err != nil {
168
		return nil, "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
169
	}
170
171
	definition, err = schema.GetRuleByName(sch, name)
172
173
	slog.Debug("successfully created rule definition")
174
175
	return definition, def.Version, err
176
}
177
178
// HeadVersion - Finds the latest version of the schema.
179
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
180
	ctx, span := tracer.Start(ctx, "schema-reader.head-version")
181
	defer span.End()
182
183
	slog.Debug("finding the latest version fo the schema for", slog.String("tenant_id", tenantID))
184
185
	var query string
186
	var args []interface{}
187
	query, args, err = r.database.Builder.
188
		Select("version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("version DESC").Limit(1).
189
		ToSql()
190
	if err != nil {
191
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
192
	}
193
194
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
195
196
	row := r.database.DB.QueryRowContext(ctx, query, args...)
197
	err = row.Scan(&version)
198
	if err != nil {
199
		if errors.Is(err, sql.ErrNoRows) {
200
			return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCHEMA_NOT_FOUND)
201
		}
202
		return "", utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
203
	}
204
205
	slog.Debug("successfully found the latest schema version", slog.Any("version", version))
206
207
	return version, nil
208
}
209
210
// ListSchemas - List all Schemas
211
func (r *SchemaReader) ListSchemas(ctx context.Context, tenantID string, pagination database.Pagination) (schemas []*base.SchemaList, ct database.EncodedContinuousToken, err error) {
212
	ctx, span := tracer.Start(ctx, "tenant-reader.list-tenants")
213
	defer span.End()
214
	
215
	slog.Debug("listing schemas with pagination", slog.Any("pagination", pagination))
216
217
	builder := r.database.Builder.Select("DISTINCT version").From(SchemaDefinitionTable).Where(squirrel.Eq{"tenant_id": tenantID})
218
	if pagination.Token() != "" {
219
		var t database.ContinuousToken
220
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
221
		if err != nil {
222
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
223
		}
224
		builder = builder.Where(squirrel.GtOrEq{"version": t.(utils.ContinuousToken).Value})
225
	}
226
227
	builder = builder.OrderBy("version").Limit(uint64(pagination.PageSize() + 1))
228
229
	var query string
230
	var args []interface{}
231
232
	query, args, err = builder.ToSql()
233
	if err != nil {
234
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
235
	}
236
237
	slog.Debug("executing sql query", slog.Any("query", query), slog.Any("arguments", args))
238
239
	var rows *sql.Rows
240
	rows, err = r.database.DB.QueryContext(ctx, query, args...)
241
	if err != nil {
242
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
243
	}
244
	defer rows.Close()
245
246
	var lastVersion string
247
	schemas = make([]*base.SchemaList, 0, pagination.PageSize()+1)
248
	for rows.Next() {
249
		schema := &base.SchemaList{}
250
		err = rows.Scan(&schema.Version)
251
		if err != nil {
252
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
253
		}
254
		id, err := xid.FromString(schema.Version)
255
		if err != nil {
256
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
257
		}
258
		schema.CreatedAt = id.Time().String()
259
		lastVersion = schema.Version
260
		schemas = append(schemas, schema)
261
	}
262
	if err = rows.Err(); err != nil {
263
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
264
	}
265
266
	slog.Debug("successfully listed schemas", slog.Any("number_of_schemas", len(schemas)))
267
268
	if len(schemas) > int(pagination.PageSize()) {
269
		return schemas[:pagination.PageSize()], utils.NewContinuousToken(lastVersion).Encode(), nil
270
	}
271
	return schemas, database.NewNoopContinuousToken().Encode(), nil
272
}
273