Passed
Push — master ( 64abf6...0f1b55 )
by Tolga
01:31 queued 15s
created

postgres.*DataReader.QueryAttributes   C

Complexity

Conditions 11

Size

Total Lines 82
Code Lines 47

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 47
nop 5
dl 0
loc 82
rs 5.4
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryAttributes often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog"
7
	"strconv"
8
	"strings"
9
10
	"github.com/jackc/pgx/v5"
11
12
	"github.com/Masterminds/squirrel"
13
	"github.com/golang/protobuf/jsonpb"
14
	"google.golang.org/protobuf/types/known/anypb"
15
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/types"
19
	"github.com/Permify/permify/internal/storage/postgres/utils"
20
	"github.com/Permify/permify/pkg/database"
21
	db "github.com/Permify/permify/pkg/database/postgres"
22
	base "github.com/Permify/permify/pkg/pb/base/v1"
23
	"github.com/Permify/permify/pkg/token"
24
)
25
26
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
27
// It is responsible for reading data from the database.
28
type DataReader struct {
29
	database  *db.Postgres  // database is an instance of the PostgreSQL database
30
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
31
}
32
33
// NewDataReader is a constructor function for DataReader.
34
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
35
func NewDataReader(database *db.Postgres) *DataReader {
36
	return &DataReader{
37
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
38
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
39
	}
40
}
41
42
// QueryRelationships reads relation tuples from the storage based on the given filter.
43
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
44
	// Start a new trace span and end it when the function exits.
45
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
46
	defer span.End()
47
48
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
49
50
	// Decode the snapshot value.
51
	var st token.SnapToken
52
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
53
	if err != nil {
54
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
55
	}
56
57
	// Build the relationships query based on the provided filter and snapshot value.
58
	var args []interface{}
59
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
60
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
61
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
62
63
	if pagination.Cursor() != "" {
64
		var t database.ContinuousToken
65
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
66
		if err != nil {
67
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
68
		}
69
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
70
	}
71
72
	if pagination.Sort() != "" {
73
		builder = builder.OrderBy(pagination.Sort())
74
	}
75
76
	// Generate the SQL query and arguments.
77
	var query string
78
	query, args, err = builder.ToSql()
79
	if err != nil {
80
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
81
	}
82
83
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
84
85
	// Execute the SQL query and retrieve the result rows.
86
	var rows pgx.Rows
87
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
88
	if err != nil {
89
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
90
	}
91
	defer rows.Close()
92
93
	// Process the result rows and store the relationships in a TupleCollection.
94
	collection := database.NewTupleCollection()
95
	for rows.Next() {
96
		rt := storage.RelationTuple{}
97
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
98
		if err != nil {
99
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
100
		}
101
		collection.Add(rt.ToTuple())
102
	}
103
	if err = rows.Err(); err != nil {
104
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
105
	}
106
107
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
108
109
	// Return a TupleIterator created from the TupleCollection.
110
	return collection.CreateTupleIterator(), nil
111
}
112
113
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
114
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
115
	// Start a new trace span and end it when the function exits.
116
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
117
	defer span.End()
118
119
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
120
121
	// Decode the snapshot value.
122
	var st token.SnapToken
123
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
124
	if err != nil {
125
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
126
	}
127
128
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
129
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
130
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
131
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
132
133
	// Apply the pagination token and limit to the query.
134
	if pagination.Token() != "" {
135
		var t database.ContinuousToken
136
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
137
		if err != nil {
138
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
139
		}
140
		var v uint64
141
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
142
		if err != nil {
143
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
144
		}
145
		builder = builder.Where(squirrel.GtOrEq{"id": v})
146
	}
147
148
	builder = builder.OrderBy("id")
149
150
	if pagination.PageSize() != 0 {
151
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
152
	}
153
154
	// Generate the SQL query and arguments.
155
	var query string
156
	var args []interface{}
157
	query, args, err = builder.ToSql()
158
	if err != nil {
159
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
160
	}
161
162
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
163
164
	// Execute the query and retrieve the rows.
165
	var rows pgx.Rows
166
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
167
	if err != nil {
168
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
169
	}
170
	defer rows.Close()
171
172
	var lastID uint64
173
174
	// Iterate through the rows and scan the result into a RelationTuple struct.
175
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
176
	for rows.Next() {
177
		rt := storage.RelationTuple{}
178
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
179
		if err != nil {
180
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
181
		}
182
		lastID = rt.ID
183
		tuples = append(tuples, rt.ToTuple())
184
	}
185
	// Check for any errors during iteration.
186
	if err = rows.Err(); err != nil {
187
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
188
	}
189
190
	slog.DebugContext(ctx, "successfully read relation tuples from database")
191
192
	// Return the results and encoded continuous token for pagination.
193
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
194
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
195
	}
196
197
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
198
}
199
200
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
201
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
202
	// Start a new trace span and end it when the function exits.
203
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
204
	defer span.End()
205
206
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
207
208
	// Decode the snapshot value.
209
	var st token.SnapToken
210
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
211
	if err != nil {
212
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
213
	}
214
215
	// Build the relationships query based on the provided filter and snapshot value.
216
	var args []interface{}
217
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
218
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
219
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
220
221
	// Generate the SQL query and arguments.
222
	var query string
223
	query, args, err = builder.ToSql()
224
	if err != nil {
225
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
226
	}
227
228
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
229
230
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
231
232
	rt := storage.Attribute{}
233
234
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
235
	var valueStr string
236
237
	// Scan the row from the database into the fields of `rt` and `valueStr`.
238
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
239
	if err != nil {
240
		if errors.Is(err, pgx.ErrNoRows) {
241
			return nil, nil
242
		} else {
243
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
244
		}
245
	}
246
247
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
248
	rt.Value = &anypb.Any{}
249
	unmarshaler := &jsonpb.Unmarshaler{}
250
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
251
	if err != nil {
252
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
253
	}
254
255
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
256
257
	return rt.ToAttribute(), nil
258
}
259
260
// QueryAttributes reads multiple attributes from the storage based on the given filter.
261
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
262
	// Start a new trace span and end it when the function exits.
263
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
264
	defer span.End()
265
266
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
267
268
	// Decode the snapshot value.
269
	var st token.SnapToken
270
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
271
	if err != nil {
272
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
273
	}
274
275
	// Build the relationships query based on the provided filter and snapshot value.
276
	var args []interface{}
277
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
278
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
279
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
280
281
	if pagination.Cursor() != "" {
282
		var t database.ContinuousToken
283
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
284
		if err != nil {
285
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
286
		}
287
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
288
	}
289
290
	if pagination.Sort() != "" {
291
		builder = builder.OrderBy(pagination.Sort())
292
	}
293
294
	// Generate the SQL query and arguments.
295
	var query string
296
	query, args, err = builder.ToSql()
297
	if err != nil {
298
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
299
	}
300
301
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
302
303
	// Execute the SQL query and retrieve the result rows.
304
	var rows pgx.Rows
305
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
306
	if err != nil {
307
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
308
	}
309
	defer rows.Close()
310
311
	// Process the result rows and store the relationships in a TupleCollection.
312
	collection := database.NewAttributeCollection()
313
	for rows.Next() {
314
		rt := storage.Attribute{}
315
316
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
317
		var valueStr string
318
319
		// Scan the row from the database into the fields of `rt` and `valueStr`.
320
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
321
		if err != nil {
322
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
323
		}
324
325
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
326
		rt.Value = &anypb.Any{}
327
		unmarshaler := &jsonpb.Unmarshaler{}
328
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
329
		if err != nil {
330
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
331
		}
332
333
		collection.Add(rt.ToAttribute())
334
	}
335
	if err = rows.Err(); err != nil {
336
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
337
	}
338
339
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
340
341
	// Return a TupleIterator created from the TupleCollection.
342
	return collection.CreateAttributeIterator(), nil
343
}
344
345
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
346
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
347
	// Start a new trace span and end it when the function exits.
348
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
349
	defer span.End()
350
351
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
352
353
	// Decode the snapshot value.
354
	var st token.SnapToken
355
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
356
	if err != nil {
357
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
358
	}
359
360
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
361
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
362
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
363
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
364
365
	// Apply the pagination token and limit to the query.
366
	if pagination.Token() != "" {
367
		var t database.ContinuousToken
368
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
369
		if err != nil {
370
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
371
		}
372
		var v uint64
373
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
374
		if err != nil {
375
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
376
		}
377
		builder = builder.Where(squirrel.GtOrEq{"id": v})
378
	}
379
380
	builder = builder.OrderBy("id")
381
382
	if pagination.PageSize() != 0 {
383
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
384
	}
385
386
	// Generate the SQL query and arguments.
387
	var query string
388
	var args []interface{}
389
	query, args, err = builder.ToSql()
390
	if err != nil {
391
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
392
	}
393
394
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
395
396
	// Execute the query and retrieve the rows.
397
	var rows pgx.Rows
398
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
399
	if err != nil {
400
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
401
	}
402
	defer rows.Close()
403
404
	var lastID uint64
405
406
	// Iterate through the rows and scan the result into a RelationTuple struct.
407
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
408
	for rows.Next() {
409
		rt := storage.Attribute{}
410
411
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
412
		var valueStr string
413
414
		// Scan the row from the database into the fields of `rt` and `valueStr`.
415
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
416
		if err != nil {
417
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
418
		}
419
		lastID = rt.ID
420
421
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
422
		rt.Value = &anypb.Any{}
423
		unmarshaler := &jsonpb.Unmarshaler{}
424
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
425
		if err != nil {
426
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
427
		}
428
429
		attributes = append(attributes, rt.ToAttribute())
430
	}
431
	// Check for any errors during iteration.
432
	if err = rows.Err(); err != nil {
433
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
434
	}
435
436
	slog.DebugContext(ctx, "successfully read attributes from the database")
437
438
	// Return the results and encoded continuous token for pagination.
439
	if len(attributes) > int(pagination.PageSize()) {
440
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
441
	}
442
443
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
444
}
445
446
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
447
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
448
	// Start a new trace span and end it when the function exits.
449
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
450
	defer span.End()
451
452
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
453
454
	// Decode the snapshot value.
455
	var st token.SnapToken
456
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
457
	if err != nil {
458
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
459
	}
460
461
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
462
	builder := r.database.Builder.
463
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
464
		From(RelationTuplesTable).
465
		Where(squirrel.Eq{"tenant_id": tenantID}).
466
		GroupBy("subject_id")
467
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
468
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
469
470
	// Apply the pagination token and limit to the query.
471
	if pagination.Token() != "" {
472
		var t database.ContinuousToken
473
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
474
		if err != nil {
475
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
476
		}
477
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
478
	}
479
480
	builder = builder.OrderBy("subject_id")
481
482
	if pagination.PageSize() != 0 {
483
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
484
	}
485
486
	// Generate the SQL query and arguments.
487
	var query string
488
	var args []interface{}
489
	query, args, err = builder.ToSql()
490
	if err != nil {
491
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
492
	}
493
494
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
495
496
	// Execute the query and retrieve the rows.
497
	var rows pgx.Rows
498
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
499
	if err != nil {
500
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
501
	}
502
	defer rows.Close()
503
504
	var lastID string
505
506
	// Iterate through the rows and scan the result into a RelationTuple struct.
507
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
508
	for rows.Next() {
509
		var subjectID string
510
		err = rows.Scan(&subjectID)
511
		if err != nil {
512
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
513
		}
514
515
		subjectIDs = append(subjectIDs, subjectID)
516
		lastID = subjectID
517
	}
518
	// Check for any errors during iteration.
519
	if err = rows.Err(); err != nil {
520
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
521
	}
522
523
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
524
525
	// Return the results and encoded continuous token for pagination.
526
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
527
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
528
	}
529
530
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
531
}
532
533
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
534
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
535
	// Start a new trace span and end it when the function exits.
536
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
537
	defer span.End()
538
539
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
540
541
	var xid types.XID8
542
543
	// Build the query to find the highest transaction ID associated with the tenant.
544
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
545
	query, args, err := builder.ToSql()
546
	if err != nil {
547
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
548
	}
549
550
	// Execute the query and retrieve the highest transaction ID.
551
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
552
	if err != nil {
553
		// If no rows are found, return a snapshot token with a value of 0.
554
		if errors.Is(err, pgx.ErrNoRows) {
555
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
556
		}
557
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
558
	}
559
560
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
561
562
	// Return the latest snapshot token associated with the tenant.
563
	return snapshot.Token{Value: xid}, nil
564
}
565