Passed
Pull Request — master (#1470)
by Tolga
02:39
created

postgres.*DataReader.QueryUniqueSubjectReferences   D

Complexity

Conditions 12

Size

Total Lines 84
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 50
nop 5
dl 0
loc 84
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryUniqueSubjectReferences often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"strconv"
9
	"strings"
10
11
	"github.com/jackc/pgx/v5"
12
13
	"github.com/Masterminds/squirrel"
14
	"github.com/golang/protobuf/jsonpb"
15
	"google.golang.org/protobuf/types/known/anypb"
16
17
	"github.com/Permify/permify/internal/storage"
18
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
19
	"github.com/Permify/permify/internal/storage/postgres/types"
20
	"github.com/Permify/permify/internal/storage/postgres/utils"
21
	"github.com/Permify/permify/pkg/database"
22
	db "github.com/Permify/permify/pkg/database/postgres"
23
	base "github.com/Permify/permify/pkg/pb/base/v1"
24
	"github.com/Permify/permify/pkg/token"
25
)
26
27
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
28
// It is responsible for reading data from the database.
29
type DataReader struct {
30
	database  *db.Postgres  // database is an instance of the PostgreSQL database
31
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
32
}
33
34
// NewDataReader is a constructor function for DataReader.
35
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
36
func NewDataReader(database *db.Postgres) *DataReader {
37
	return &DataReader{
38
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
39
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
40
	}
41
}
42
43
// QueryRelationships reads relation tuples from the storage based on the given filter.
44
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
45
	// Start a new trace span and end it when the function exits.
46
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
47
	defer span.End()
48
49
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
50
51
	// Decode the snapshot value.
52
	var st token.SnapToken
53
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
54
	if err != nil {
55
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
56
	}
57
58
	// Build the relationships query based on the provided filter and snapshot value.
59
	var args []interface{}
60
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
61
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
62
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
63
64
	if pagination.Cursor() != "" {
65
		var t database.ContinuousToken
66
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
67
		if err != nil {
68
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
69
		}
70
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
71
	}
72
73
	if pagination.Sort() != "" {
74
		builder = builder.OrderBy(pagination.Sort())
75
	}
76
77
	// Generate the SQL query and arguments.
78
	var query string
79
	query, args, err = builder.ToSql()
80
	if err != nil {
81
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
82
	}
83
84
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
85
86
	// Execute the SQL query and retrieve the result rows.
87
	var rows pgx.Rows
88
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
89
	if err != nil {
90
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
91
	}
92
	defer rows.Close()
93
94
	// Process the result rows and store the relationships in a TupleCollection.
95
	collection := database.NewTupleCollection()
96
	for rows.Next() {
97
		rt := storage.RelationTuple{}
98
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
99
		if err != nil {
100
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
101
		}
102
		collection.Add(rt.ToTuple())
103
	}
104
	if err = rows.Err(); err != nil {
105
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
106
	}
107
108
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
109
110
	// Return a TupleIterator created from the TupleCollection.
111
	return collection.CreateTupleIterator(), nil
112
}
113
114
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
115
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
116
	// Start a new trace span and end it when the function exits.
117
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
118
	defer span.End()
119
120
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
121
122
	// Decode the snapshot value.
123
	var st token.SnapToken
124
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
125
	if err != nil {
126
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
127
	}
128
129
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
130
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
131
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
132
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
133
134
	// Apply the pagination token and limit to the query.
135
	if pagination.Token() != "" {
136
		var t database.ContinuousToken
137
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
138
		if err != nil {
139
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
140
		}
141
		var v uint64
142
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
143
		if err != nil {
144
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
145
		}
146
		builder = builder.Where(squirrel.GtOrEq{"id": v})
147
	}
148
149
	builder = builder.OrderBy("id")
150
151
	if pagination.PageSize() != 0 {
152
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
153
	}
154
155
	// Generate the SQL query and arguments.
156
	var query string
157
	var args []interface{}
158
	query, args, err = builder.ToSql()
159
	if err != nil {
160
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
161
	}
162
163
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
164
165
	// Execute the query and retrieve the rows.
166
	var rows pgx.Rows
167
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
168
	if err != nil {
169
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
170
	}
171
	defer rows.Close()
172
173
	var lastID uint64
174
175
	// Iterate through the rows and scan the result into a RelationTuple struct.
176
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
177
	for rows.Next() {
178
		rt := storage.RelationTuple{}
179
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
180
		if err != nil {
181
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
182
		}
183
		lastID = rt.ID
184
		tuples = append(tuples, rt.ToTuple())
185
	}
186
	// Check for any errors during iteration.
187
	if err = rows.Err(); err != nil {
188
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
189
	}
190
191
	slog.DebugContext(ctx, "successfully read relation tuples from database")
192
193
	// Return the results and encoded continuous token for pagination.
194
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
195
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
196
	}
197
198
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
199
}
200
201
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
202
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
203
	// Start a new trace span and end it when the function exits.
204
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
205
	defer span.End()
206
207
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
208
209
	// Decode the snapshot value.
210
	var st token.SnapToken
211
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
212
	if err != nil {
213
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
214
	}
215
216
	// Build the relationships query based on the provided filter and snapshot value.
217
	var args []interface{}
218
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
219
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
220
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
221
222
	// Generate the SQL query and arguments.
223
	var query string
224
	query, args, err = builder.ToSql()
225
	if err != nil {
226
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
227
	}
228
229
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
230
231
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
232
233
	rt := storage.Attribute{}
234
235
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
236
	var valueStr string
237
238
	// Scan the row from the database into the fields of `rt` and `valueStr`.
239
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
240
	if err != nil {
241
		if errors.Is(err, pgx.ErrNoRows) {
242
			return nil, nil
243
		} else {
244
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
245
		}
246
	}
247
248
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
249
	rt.Value = &anypb.Any{}
250
	unmarshaler := &jsonpb.Unmarshaler{}
251
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
252
	if err != nil {
253
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
254
	}
255
256
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
257
258
	return rt.ToAttribute(), nil
259
}
260
261
// QueryAttributes reads multiple attributes from the storage based on the given filter.
262
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
263
	// Start a new trace span and end it when the function exits.
264
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
265
	defer span.End()
266
267
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
268
269
	// Decode the snapshot value.
270
	var st token.SnapToken
271
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
272
	if err != nil {
273
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
274
	}
275
276
	// Build the relationships query based on the provided filter and snapshot value.
277
	var args []interface{}
278
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
279
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
280
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
281
282
	// Generate the SQL query and arguments.
283
	var query string
284
	query, args, err = builder.ToSql()
285
	if err != nil {
286
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
287
	}
288
289
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
290
291
	// Execute the SQL query and retrieve the result rows.
292
	var rows pgx.Rows
293
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
294
	if err != nil {
295
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
296
	}
297
	defer rows.Close()
298
299
	// Process the result rows and store the relationships in a TupleCollection.
300
	collection := database.NewAttributeCollection()
301
	for rows.Next() {
302
		rt := storage.Attribute{}
303
304
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
305
		var valueStr string
306
307
		// Scan the row from the database into the fields of `rt` and `valueStr`.
308
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
309
		if err != nil {
310
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
311
		}
312
313
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
314
		rt.Value = &anypb.Any{}
315
		unmarshaler := &jsonpb.Unmarshaler{}
316
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
317
		if err != nil {
318
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
319
		}
320
321
		collection.Add(rt.ToAttribute())
322
	}
323
	if err = rows.Err(); err != nil {
324
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
325
	}
326
327
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
328
329
	// Return a TupleIterator created from the TupleCollection.
330
	return collection.CreateAttributeIterator(), nil
331
}
332
333
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
334
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
335
	// Start a new trace span and end it when the function exits.
336
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
337
	defer span.End()
338
339
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
340
341
	// Decode the snapshot value.
342
	var st token.SnapToken
343
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
344
	if err != nil {
345
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
346
	}
347
348
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
349
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
350
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
351
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
352
353
	// Apply the pagination token and limit to the query.
354
	if pagination.Token() != "" {
355
		var t database.ContinuousToken
356
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
357
		if err != nil {
358
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
359
		}
360
		var v uint64
361
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
362
		if err != nil {
363
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
364
		}
365
		builder = builder.Where(squirrel.GtOrEq{"id": v})
366
	}
367
368
	builder = builder.OrderBy("id")
369
370
	if pagination.PageSize() != 0 {
371
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
372
	}
373
374
	// Generate the SQL query and arguments.
375
	var query string
376
	var args []interface{}
377
	query, args, err = builder.ToSql()
378
	if err != nil {
379
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
380
	}
381
382
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
383
384
	// Execute the query and retrieve the rows.
385
	var rows pgx.Rows
386
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
387
	if err != nil {
388
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
389
	}
390
	defer rows.Close()
391
392
	var lastID uint64
393
394
	// Iterate through the rows and scan the result into a RelationTuple struct.
395
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
396
	for rows.Next() {
397
		rt := storage.Attribute{}
398
399
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
400
		var valueStr string
401
402
		// Scan the row from the database into the fields of `rt` and `valueStr`.
403
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
404
		if err != nil {
405
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
406
		}
407
		lastID = rt.ID
408
409
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
410
		rt.Value = &anypb.Any{}
411
		unmarshaler := &jsonpb.Unmarshaler{}
412
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
413
		if err != nil {
414
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
415
		}
416
417
		attributes = append(attributes, rt.ToAttribute())
418
	}
419
	// Check for any errors during iteration.
420
	if err = rows.Err(); err != nil {
421
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
422
	}
423
424
	slog.DebugContext(ctx, "successfully read attributes from the database")
425
426
	// Return the results and encoded continuous token for pagination.
427
	if len(attributes) > int(pagination.PageSize()) {
428
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
429
	}
430
431
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
432
}
433
434
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
435
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
436
	// Start a new trace span and end it when the function exits.
437
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
438
	defer span.End()
439
440
	slog.DebugContext(ctx, "querying unique entities for tenant_id", slog.String("tenant_id", tenantID))
441
442
	// Decode the snapshot value.
443
	var st token.SnapToken
444
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
445
	if err != nil {
446
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
447
	}
448
449
	query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
450
451
	// Apply the pagination token and limit to the subQuery.
452
	if pagination.Token() != "" {
453
		var t database.ContinuousToken
454
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
455
		if err != nil {
456
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
457
		}
458
		query = fmt.Sprintf("%s WHERE entity_id >= '%s'", query, t.(utils.ContinuousToken).Value)
459
	}
460
461
	// Append ORDER BY and LIMIT clauses.
462
	query = fmt.Sprintf("%s ORDER BY entity_id", query)
463
464
	if pagination.PageSize() != 0 {
465
		query = fmt.Sprintf("%s LIMIT %d", query, pagination.PageSize()+1)
466
	}
467
468
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query))
469
470
	// Execute the query and retrieve the rows.
471
	var rows pgx.Rows
472
	rows, err = r.database.ReadPool.Query(ctx, query)
473
	if err != nil {
474
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
475
	}
476
	defer rows.Close()
477
478
	var lastID string
479
480
	// Iterate through the rows and scan the result into a RelationTuple struct.
481
	entityIDs := make([]string, 0, pagination.PageSize()+1)
482
	for rows.Next() {
483
		var entityId string
484
		err = rows.Scan(&entityId)
485
		if err != nil {
486
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
487
		}
488
489
		entityIDs = append(entityIDs, entityId)
490
		lastID = entityId
491
	}
492
493
	// Check for any errors during iteration.
494
	if err = rows.Err(); err != nil {
495
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
496
	}
497
498
	slog.DebugContext(ctx, "successfully retrieved unique entities from the database")
499
500
	// Return the results and encoded continuous token for pagination.
501
	if pagination.PageSize() != 0 && len(entityIDs) > int(pagination.PageSize()) {
502
		return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
503
	}
504
505
	return entityIDs, database.NewNoopContinuousToken().Encode(), nil
506
}
507
508
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
509
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
510
	// Start a new trace span and end it when the function exits.
511
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
512
	defer span.End()
513
514
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
515
516
	// Decode the snapshot value.
517
	var st token.SnapToken
518
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
519
	if err != nil {
520
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
521
	}
522
523
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
524
	builder := r.database.Builder.
525
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
526
		From(RelationTuplesTable).
527
		Where(squirrel.Eq{"tenant_id": tenantID}).
528
		GroupBy("subject_id")
529
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
530
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
531
532
	// Apply the pagination token and limit to the query.
533
	if pagination.Token() != "" {
534
		var t database.ContinuousToken
535
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
536
		if err != nil {
537
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
538
		}
539
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
540
	}
541
542
	builder = builder.OrderBy("subject_id")
543
544
	if pagination.PageSize() != 0 {
545
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
546
	}
547
548
	// Generate the SQL query and arguments.
549
	var query string
550
	var args []interface{}
551
	query, args, err = builder.ToSql()
552
	if err != nil {
553
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
554
	}
555
556
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
557
558
	// Execute the query and retrieve the rows.
559
	var rows pgx.Rows
560
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
561
	if err != nil {
562
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
563
	}
564
	defer rows.Close()
565
566
	var lastID string
567
568
	// Iterate through the rows and scan the result into a RelationTuple struct.
569
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
570
	for rows.Next() {
571
		var subjectID string
572
		err = rows.Scan(&subjectID)
573
		if err != nil {
574
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
575
		}
576
577
		subjectIDs = append(subjectIDs, subjectID)
578
		lastID = subjectID
579
	}
580
	// Check for any errors during iteration.
581
	if err = rows.Err(); err != nil {
582
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
583
	}
584
585
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
586
587
	// Return the results and encoded continuous token for pagination.
588
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
589
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
590
	}
591
592
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
593
}
594
595
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
596
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
597
	// Start a new trace span and end it when the function exits.
598
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
599
	defer span.End()
600
601
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
602
603
	var xid types.XID8
604
605
	// Build the query to find the highest transaction ID associated with the tenant.
606
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
607
	query, args, err := builder.ToSql()
608
	if err != nil {
609
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
610
	}
611
612
	// Execute the query and retrieve the highest transaction ID.
613
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
614
	if err != nil {
615
		// If no rows are found, return a snapshot token with a value of 0.
616
		if errors.Is(err, pgx.ErrNoRows) {
617
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
618
		}
619
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
620
	}
621
622
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
623
624
	// Return the latest snapshot token associated with the tenant.
625
	return snapshot.Token{Value: xid}, nil
626
}
627