Passed
Push — master ( 03c495...6dd4e6 )
by Tolga
01:24 queued 13s
created

postgres.*DataReader.QueryAttributes   D

Complexity

Conditions 12

Size

Total Lines 88
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 50
nop 5
dl 0
loc 88
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryAttributes often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog"
7
	"strconv"
8
	"strings"
9
10
	"github.com/jackc/pgx/v5"
11
12
	"github.com/Masterminds/squirrel"
13
	"github.com/golang/protobuf/jsonpb"
14
	"google.golang.org/protobuf/types/known/anypb"
15
16
	"github.com/Permify/permify/internal"
17
	"github.com/Permify/permify/internal/storage"
18
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
19
	"github.com/Permify/permify/internal/storage/postgres/types"
20
	"github.com/Permify/permify/internal/storage/postgres/utils"
21
	"github.com/Permify/permify/pkg/database"
22
	db "github.com/Permify/permify/pkg/database/postgres"
23
	base "github.com/Permify/permify/pkg/pb/base/v1"
24
	"github.com/Permify/permify/pkg/token"
25
)
26
27
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
28
// It is responsible for reading data from the database.
29
type DataReader struct {
30
	database  *db.Postgres  // database is an instance of the PostgreSQL database
31
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
32
}
33
34
// NewDataReader is a constructor function for DataReader.
35
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
36
func NewDataReader(database *db.Postgres) *DataReader {
37
	return &DataReader{
38
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
39
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
40
	}
41
}
42
43
// QueryRelationships reads relation tuples from the storage based on the given filter.
44
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
45
	// Start a new trace span and end it when the function exits.
46
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships")
47
	defer span.End()
48
49
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
50
51
	// Decode the snapshot value.
52
	var st token.SnapToken
53
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
54
	if err != nil {
55
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
56
	}
57
58
	// Build the relationships query based on the provided filter and snapshot value.
59
	var args []interface{}
60
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
61
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
62
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
63
64
	if pagination.Cursor() != "" {
65
		var t database.ContinuousToken
66
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
67
		if err != nil {
68
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
69
		}
70
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
71
	}
72
73
	if pagination.Sort() != "" {
74
		builder = builder.OrderBy(pagination.Sort())
75
	}
76
77
	// Apply limit if specified in pagination
78
	limit := pagination.Limit()
79
	if limit > 0 {
80
		builder = builder.Limit(uint64(limit))
81
	}
82
83
	// Generate the SQL query and arguments.
84
	var query string
85
	query, args, err = builder.ToSql()
86
	if err != nil {
87
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
88
	}
89
90
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
91
92
	// Execute the SQL query and retrieve the result rows.
93
	var rows pgx.Rows
94
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
95
	if err != nil {
96
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
97
	}
98
	defer rows.Close()
99
100
	// Process the result rows and store the relationships in a TupleCollection.
101
	collection := database.NewTupleCollection()
102
	for rows.Next() {
103
		rt := storage.RelationTuple{}
104
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
105
		if err != nil {
106
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
107
		}
108
		collection.Add(rt.ToTuple())
109
	}
110
	if err = rows.Err(); err != nil {
111
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
112
	}
113
114
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
115
116
	// Return a TupleIterator created from the TupleCollection.
117
	return collection.CreateTupleIterator(), nil
118
}
119
120
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
121
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
122
	// Start a new trace span and end it when the function exits.
123
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships")
124
	defer span.End()
125
126
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
127
128
	// Decode the snapshot value.
129
	var st token.SnapToken
130
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
131
	if err != nil {
132
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
133
	}
134
135
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
136
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
137
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
138
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
139
140
	// Apply the pagination token and limit to the query.
141
	if pagination.Token() != "" {
142
		var t database.ContinuousToken
143
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
144
		if err != nil {
145
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
146
		}
147
		var v uint64
148
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
149
		if err != nil {
150
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
151
		}
152
		builder = builder.Where(squirrel.GtOrEq{"id": v})
153
	}
154
155
	builder = builder.OrderBy("id")
156
157
	if pagination.PageSize() != 0 {
158
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
159
	}
160
161
	// Generate the SQL query and arguments.
162
	var query string
163
	var args []interface{}
164
	query, args, err = builder.ToSql()
165
	if err != nil {
166
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
167
	}
168
169
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
170
171
	// Execute the query and retrieve the rows.
172
	var rows pgx.Rows
173
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
174
	if err != nil {
175
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
176
	}
177
	defer rows.Close()
178
179
	var lastID uint64
180
181
	// Iterate through the rows and scan the result into a RelationTuple struct.
182
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
183
	for rows.Next() {
184
		rt := storage.RelationTuple{}
185
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
186
		if err != nil {
187
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
188
		}
189
		lastID = rt.ID
190
		tuples = append(tuples, rt.ToTuple())
191
	}
192
	// Check for any errors during iteration.
193
	if err = rows.Err(); err != nil {
194
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
195
	}
196
197
	slog.DebugContext(ctx, "successfully read relation tuples from database")
198
199
	// Return the results and encoded continuous token for pagination.
200
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
201
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
202
	}
203
204
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
205
}
206
207
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
208
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
209
	// Start a new trace span and end it when the function exits.
210
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute")
211
	defer span.End()
212
213
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
214
215
	// Decode the snapshot value.
216
	var st token.SnapToken
217
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
218
	if err != nil {
219
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
220
	}
221
222
	// Build the relationships query based on the provided filter and snapshot value.
223
	var args []interface{}
224
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
225
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
226
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
227
228
	// Generate the SQL query and arguments.
229
	var query string
230
	query, args, err = builder.ToSql()
231
	if err != nil {
232
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
233
	}
234
235
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
236
237
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
238
239
	rt := storage.Attribute{}
240
241
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
242
	var valueStr string
243
244
	// Scan the row from the database into the fields of `rt` and `valueStr`.
245
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
246
	if err != nil {
247
		if errors.Is(err, pgx.ErrNoRows) {
248
			return nil, nil
249
		} else {
250
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
251
		}
252
	}
253
254
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
255
	rt.Value = &anypb.Any{}
256
	unmarshaler := &jsonpb.Unmarshaler{}
257
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
258
	if err != nil {
259
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
260
	}
261
262
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
263
264
	return rt.ToAttribute(), nil
265
}
266
267
// QueryAttributes reads multiple attributes from the storage based on the given filter.
268
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
269
	// Start a new trace span and end it when the function exits.
270
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes")
271
	defer span.End()
272
273
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
274
275
	// Decode the snapshot value.
276
	var st token.SnapToken
277
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
278
	if err != nil {
279
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
280
	}
281
282
	// Build the attributes query based on the provided filter and snapshot value.
283
	var args []interface{}
284
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
285
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
286
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
287
288
	if pagination.Cursor() != "" {
289
		var t database.ContinuousToken
290
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
291
		if err != nil {
292
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
293
		}
294
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
295
	}
296
297
	if pagination.Sort() != "" {
298
		builder = builder.OrderBy(pagination.Sort())
299
	}
300
301
	// Apply limit if specified in pagination
302
	limit := pagination.Limit()
303
	if limit > 0 {
304
		builder = builder.Limit(uint64(limit))
305
	}
306
307
	// Generate the SQL query and arguments.
308
	var query string
309
	query, args, err = builder.ToSql()
310
	if err != nil {
311
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
312
	}
313
314
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
315
316
	// Execute the SQL query and retrieve the result rows.
317
	var rows pgx.Rows
318
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
319
	if err != nil {
320
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
321
	}
322
	defer rows.Close()
323
324
	// Process the result rows and store the attributes in an AttributeCollection.
325
	collection := database.NewAttributeCollection()
326
	for rows.Next() {
327
		rt := storage.Attribute{}
328
329
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
330
		var valueStr string
331
332
		// Scan the row from the database into the fields of `rt` and `valueStr`.
333
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
334
		if err != nil {
335
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
336
		}
337
338
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
339
		rt.Value = &anypb.Any{}
340
		unmarshaler := &jsonpb.Unmarshaler{}
341
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
342
		if err != nil {
343
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
344
		}
345
346
		collection.Add(rt.ToAttribute())
347
	}
348
	if err = rows.Err(); err != nil {
349
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
350
	}
351
352
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
353
354
	// Return an AttributeIterator created from the AttributeCollection.
355
	return collection.CreateAttributeIterator(), nil
356
}
357
358
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
359
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
360
	// Start a new trace span and end it when the function exits.
361
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes")
362
	defer span.End()
363
364
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
365
366
	// Decode the snapshot value.
367
	var st token.SnapToken
368
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
369
	if err != nil {
370
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
371
	}
372
373
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
374
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
375
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
376
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
377
378
	// Apply the pagination token and limit to the query.
379
	if pagination.Token() != "" {
380
		var t database.ContinuousToken
381
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
382
		if err != nil {
383
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
384
		}
385
		var v uint64
386
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
387
		if err != nil {
388
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
389
		}
390
		builder = builder.Where(squirrel.GtOrEq{"id": v})
391
	}
392
393
	builder = builder.OrderBy("id")
394
395
	if pagination.PageSize() != 0 {
396
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
397
	}
398
399
	// Generate the SQL query and arguments.
400
	var query string
401
	var args []interface{}
402
	query, args, err = builder.ToSql()
403
	if err != nil {
404
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
405
	}
406
407
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
408
409
	// Execute the query and retrieve the rows.
410
	var rows pgx.Rows
411
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
412
	if err != nil {
413
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
414
	}
415
	defer rows.Close()
416
417
	var lastID uint64
418
419
	// Iterate through the rows and scan the result into a RelationTuple struct.
420
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
421
	for rows.Next() {
422
		rt := storage.Attribute{}
423
424
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
425
		var valueStr string
426
427
		// Scan the row from the database into the fields of `rt` and `valueStr`.
428
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
429
		if err != nil {
430
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
431
		}
432
		lastID = rt.ID
433
434
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
435
		rt.Value = &anypb.Any{}
436
		unmarshaler := &jsonpb.Unmarshaler{}
437
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
438
		if err != nil {
439
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
440
		}
441
442
		attributes = append(attributes, rt.ToAttribute())
443
	}
444
	// Check for any errors during iteration.
445
	if err = rows.Err(); err != nil {
446
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
447
	}
448
449
	slog.DebugContext(ctx, "successfully read attributes from the database")
450
451
	// Return the results and encoded continuous token for pagination.
452
	if len(attributes) > int(pagination.PageSize()) {
453
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
454
	}
455
456
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
457
}
458
459
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
460
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
461
	// Start a new trace span and end it when the function exits.
462
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference")
463
	defer span.End()
464
465
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
466
467
	// Decode the snapshot value.
468
	var st token.SnapToken
469
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
470
	if err != nil {
471
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
472
	}
473
474
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
475
	builder := r.database.Builder.
476
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
477
		From(RelationTuplesTable).
478
		Where(squirrel.Eq{"tenant_id": tenantID}).
479
		GroupBy("subject_id")
480
481
	// Apply subject filter
482
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{
483
		Subject: &base.SubjectFilter{
484
			Type:     subjectReference.GetType(),
485
			Relation: subjectReference.GetRelation(),
486
		},
487
	})
488
489
	// Apply snapshot filter
490
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
491
492
	// Apply exclusion if the list is not empty
493
	if len(excluded) > 0 {
494
		builder = builder.Where(squirrel.NotEq{"subject_id": excluded})
495
	}
496
497
	// Apply the pagination token and limit to the query.
498
	if pagination.Token() != "" {
499
		var t database.ContinuousToken
500
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
501
		if err != nil {
502
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
503
		}
504
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
505
	}
506
507
	builder = builder.OrderBy("subject_id")
508
509
	if pagination.PageSize() != 0 {
510
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
511
	}
512
513
	// Generate the SQL query and arguments.
514
	var query string
515
	var args []interface{}
516
	query, args, err = builder.ToSql()
517
	if err != nil {
518
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
519
	}
520
521
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
522
523
	// Execute the query and retrieve the rows.
524
	var rows pgx.Rows
525
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
526
	if err != nil {
527
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
528
	}
529
	defer rows.Close()
530
531
	var lastID string
532
533
	// Iterate through the rows and scan the result into a RelationTuple struct.
534
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
535
	for rows.Next() {
536
		var subjectID string
537
		err = rows.Scan(&subjectID)
538
		if err != nil {
539
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
540
		}
541
542
		subjectIDs = append(subjectIDs, subjectID)
543
		lastID = subjectID
544
	}
545
	// Check for any errors during iteration.
546
	if err = rows.Err(); err != nil {
547
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
548
	}
549
550
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
551
552
	// Return the results and encoded continuous token for pagination.
553
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
554
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
555
	}
556
557
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
558
}
559
560
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
561
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
562
	// Start a new trace span and end it when the function exits.
563
	ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot")
564
	defer span.End()
565
566
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
567
568
	var xid types.XID8
569
570
	// Build the query to find the highest transaction ID associated with the tenant.
571
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
572
	query, args, err := builder.ToSql()
573
	if err != nil {
574
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
575
	}
576
577
	// TODO: To optimize this query, create the following index concurrently to avoid table locks:
578
	// CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
579
580
	// Execute the query and retrieve the highest transaction ID.
581
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
582
	if err != nil {
583
		// If no rows are found, return a snapshot token with a value of 0.
584
		if errors.Is(err, pgx.ErrNoRows) {
585
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
586
		}
587
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
588
	}
589
590
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
591
592
	// Return the latest snapshot token associated with the tenant.
593
	return snapshot.Token{Value: xid}, nil
594
}
595