postgres.*DataReader.QueryUniqueSubjectReferences   D
last analyzed

Complexity

Conditions 13

Size

Total Lines 98
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
eloc 55
nop 6
dl 0
loc 98
rs 4.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryUniqueSubjectReferences often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog"
7
	"strconv"
8
9
	"github.com/jackc/pgx/v5"
10
11
	"github.com/Masterminds/squirrel"
12
	"google.golang.org/protobuf/encoding/protojson"
13
	"google.golang.org/protobuf/types/known/anypb"
14
15
	"github.com/Permify/permify/internal"
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/types"
19
	"github.com/Permify/permify/internal/storage/postgres/utils"
20
	"github.com/Permify/permify/pkg/database"
21
	db "github.com/Permify/permify/pkg/database/postgres"
22
	base "github.com/Permify/permify/pkg/pb/base/v1"
23
	"github.com/Permify/permify/pkg/token"
24
)
25
26
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
27
// It is responsible for reading data from the database.
28
type DataReader struct {
29
	database  *db.Postgres  // database is an instance of the PostgreSQL database
30
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
31
}
32
33
// NewDataReader is a constructor function for DataReader.
34
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
35
func NewDataReader(database *db.Postgres) *DataReader {
36
	return &DataReader{
37
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
38
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
39
	}
40
}
41
42
// QueryRelationships reads relation tuples from the storage based on the given filter.
43
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
44
	// Start a new trace span and end it when the function exits.
45
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships")
46
	defer span.End()
47
48
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
49
50
	// Decode the snapshot value.
51
	var st token.SnapToken
52
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
53
	if err != nil {
54
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
55
	}
56
57
	// Build the relationships query based on the provided filter and snapshot value.
58
	var args []interface{}
59
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
60
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
61
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
62
63
	if pagination.Cursor() != "" {
64
		var t database.ContinuousToken
65
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
66
		if err != nil {
67
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
68
		}
69
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
70
	}
71
72
	if pagination.Sort() != "" {
73
		builder = builder.OrderBy(pagination.Sort())
74
	}
75
76
	// Apply limit if specified in pagination
77
	limit := pagination.Limit()
78
	if limit > 0 {
79
		builder = builder.Limit(uint64(limit))
80
	}
81
82
	// Generate the SQL query and arguments.
83
	var query string
84
	query, args, err = builder.ToSql()
85
	if err != nil {
86
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
87
	}
88
89
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
90
91
	// Execute the SQL query and retrieve the result rows.
92
	var rows pgx.Rows
93
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
94
	if err != nil {
95
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
96
	}
97
	defer rows.Close()
98
99
	// Process the result rows and store the relationships in a TupleCollection.
100
	collection := database.NewTupleCollection()
101
	for rows.Next() {
102
		rt := storage.RelationTuple{}
103
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
104
		if err != nil {
105
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
106
		}
107
		collection.Add(rt.ToTuple())
108
	}
109
	if err = rows.Err(); err != nil {
110
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
111
	}
112
113
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
114
115
	// Return a TupleIterator created from the TupleCollection.
116
	return collection.CreateTupleIterator(), nil
117
}
118
119
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
120
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
121
	// Start a new trace span and end it when the function exits.
122
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships")
123
	defer span.End()
124
125
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
126
127
	// Decode the snapshot value.
128
	var st token.SnapToken
129
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
130
	if err != nil {
131
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
132
	}
133
134
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
135
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
136
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
137
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
138
139
	// Apply the pagination token and limit to the query.
140
	if pagination.Token() != "" {
141
		var t database.ContinuousToken
142
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
143
		if err != nil {
144
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
145
		}
146
		var v uint64
147
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
148
		if err != nil {
149
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
150
		}
151
		builder = builder.Where(squirrel.GtOrEq{"id": v})
152
	}
153
154
	builder = builder.OrderBy("id")
155
156
	if pagination.PageSize() != 0 {
157
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
158
	}
159
160
	// Generate the SQL query and arguments.
161
	var query string
162
	var args []interface{}
163
	query, args, err = builder.ToSql()
164
	if err != nil {
165
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
166
	}
167
168
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
169
170
	// Execute the query and retrieve the rows.
171
	var rows pgx.Rows
172
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
173
	if err != nil {
174
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
175
	}
176
	defer rows.Close()
177
178
	var lastID uint64
179
180
	// Iterate through the rows and scan the result into a RelationTuple struct.
181
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
182
	for rows.Next() {
183
		rt := storage.RelationTuple{}
184
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
185
		if err != nil {
186
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
187
		}
188
		lastID = rt.ID
189
		tuples = append(tuples, rt.ToTuple())
190
	}
191
	// Check for any errors during iteration.
192
	if err = rows.Err(); err != nil {
193
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
194
	}
195
196
	slog.DebugContext(ctx, "successfully read relation tuples from database")
197
198
	// Return the results and encoded continuous token for pagination.
199
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
200
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
201
	}
202
203
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
204
}
205
206
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
207
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
208
	// Start a new trace span and end it when the function exits.
209
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute")
210
	defer span.End()
211
212
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
213
214
	// Decode the snapshot value.
215
	var st token.SnapToken
216
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
217
	if err != nil {
218
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
219
	}
220
221
	// Build the relationships query based on the provided filter and snapshot value.
222
	var args []interface{}
223
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
224
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
225
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
226
227
	// Generate the SQL query and arguments.
228
	var query string
229
	query, args, err = builder.ToSql()
230
	if err != nil {
231
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
232
	}
233
234
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
235
236
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
237
238
	rt := storage.Attribute{}
239
240
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
241
	var valueStr string
242
243
	// Scan the row from the database into the fields of `rt` and `valueStr`.
244
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
245
	if err != nil {
246
		if errors.Is(err, pgx.ErrNoRows) {
247
			return nil, nil
248
		} else {
249
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
250
		}
251
	}
252
253
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
254
	rt.Value = &anypb.Any{}
255
	err = protojson.Unmarshal([]byte(valueStr), rt.Value)
256
	if err != nil {
257
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
258
	}
259
260
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
261
262
	return rt.ToAttribute(), nil
263
}
264
265
// QueryAttributes reads multiple attributes from the storage based on the given filter.
266
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
267
	// Start a new trace span and end it when the function exits.
268
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes")
269
	defer span.End()
270
271
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
272
273
	// Decode the snapshot value.
274
	var st token.SnapToken
275
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
276
	if err != nil {
277
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
278
	}
279
280
	// Build the attributes query based on the provided filter and snapshot value.
281
	var args []interface{}
282
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
283
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
284
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
285
286
	if pagination.Cursor() != "" {
287
		var t database.ContinuousToken
288
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
289
		if err != nil {
290
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
291
		}
292
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
293
	}
294
295
	if pagination.Sort() != "" {
296
		builder = builder.OrderBy(pagination.Sort())
297
	}
298
299
	// Apply limit if specified in pagination
300
	limit := pagination.Limit()
301
	if limit > 0 {
302
		builder = builder.Limit(uint64(limit))
303
	}
304
305
	// Generate the SQL query and arguments.
306
	var query string
307
	query, args, err = builder.ToSql()
308
	if err != nil {
309
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
310
	}
311
312
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
313
314
	// Execute the SQL query and retrieve the result rows.
315
	var rows pgx.Rows
316
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
317
	if err != nil {
318
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
319
	}
320
	defer rows.Close()
321
322
	// Process the result rows and store the attributes in an AttributeCollection.
323
	collection := database.NewAttributeCollection()
324
	for rows.Next() {
325
		rt := storage.Attribute{}
326
327
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
328
		var valueStr string
329
330
		// Scan the row from the database into the fields of `rt` and `valueStr`.
331
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
332
		if err != nil {
333
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
334
		}
335
336
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
337
		rt.Value = &anypb.Any{}
338
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
339
		if err != nil {
340
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
341
		}
342
343
		collection.Add(rt.ToAttribute())
344
	}
345
	if err = rows.Err(); err != nil {
346
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
347
	}
348
349
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
350
351
	// Return an AttributeIterator created from the AttributeCollection.
352
	return collection.CreateAttributeIterator(), nil
353
}
354
355
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
356
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
357
	// Start a new trace span and end it when the function exits.
358
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes")
359
	defer span.End()
360
361
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
362
363
	// Decode the snapshot value.
364
	var st token.SnapToken
365
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
366
	if err != nil {
367
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
368
	}
369
370
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
371
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
372
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
373
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
374
375
	// Apply the pagination token and limit to the query.
376
	if pagination.Token() != "" {
377
		var t database.ContinuousToken
378
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
379
		if err != nil {
380
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
381
		}
382
		var v uint64
383
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
384
		if err != nil {
385
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
386
		}
387
		builder = builder.Where(squirrel.GtOrEq{"id": v})
388
	}
389
390
	builder = builder.OrderBy("id")
391
392
	if pagination.PageSize() != 0 {
393
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
394
	}
395
396
	// Generate the SQL query and arguments.
397
	var query string
398
	var args []interface{}
399
	query, args, err = builder.ToSql()
400
	if err != nil {
401
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
402
	}
403
404
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
405
406
	// Execute the query and retrieve the rows.
407
	var rows pgx.Rows
408
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
409
	if err != nil {
410
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
411
	}
412
	defer rows.Close()
413
414
	var lastID uint64
415
416
	// Iterate through the rows and scan the result into a RelationTuple struct.
417
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
418
	for rows.Next() {
419
		rt := storage.Attribute{}
420
421
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
422
		var valueStr string
423
424
		// Scan the row from the database into the fields of `rt` and `valueStr`.
425
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
426
		if err != nil {
427
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
428
		}
429
		lastID = rt.ID
430
431
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
432
		rt.Value = &anypb.Any{}
433
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
434
		if err != nil {
435
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
436
		}
437
438
		attributes = append(attributes, rt.ToAttribute())
439
	}
440
	// Check for any errors during iteration.
441
	if err = rows.Err(); err != nil {
442
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
443
	}
444
445
	slog.DebugContext(ctx, "successfully read attributes from the database")
446
447
	// Return the results and encoded continuous token for pagination.
448
	if len(attributes) > int(pagination.PageSize()) {
449
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
450
	}
451
452
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
453
}
454
455
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
456
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
457
	// Start a new trace span and end it when the function exits.
458
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference")
459
	defer span.End()
460
461
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
462
463
	// Decode the snapshot value.
464
	var st token.SnapToken
465
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
466
	if err != nil {
467
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
468
	}
469
470
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
471
	builder := r.database.Builder.
472
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
473
		From(RelationTuplesTable).
474
		Where(squirrel.Eq{"tenant_id": tenantID}).
475
		GroupBy("subject_id")
476
477
	// Apply subject filter
478
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{
479
		Subject: &base.SubjectFilter{
480
			Type:     subjectReference.GetType(),
481
			Relation: subjectReference.GetRelation(),
482
		},
483
	})
484
485
	// Apply snapshot filter
486
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
487
488
	// Apply exclusion if the list is not empty
489
	if len(excluded) > 0 {
490
		builder = builder.Where(squirrel.NotEq{"subject_id": excluded})
491
	}
492
493
	// Apply the pagination token and limit to the query.
494
	if pagination.Token() != "" {
495
		var t database.ContinuousToken
496
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
497
		if err != nil {
498
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
499
		}
500
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
501
	}
502
503
	builder = builder.OrderBy("subject_id")
504
505
	if pagination.PageSize() != 0 {
506
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
507
	}
508
509
	// Generate the SQL query and arguments.
510
	var query string
511
	var args []interface{}
512
	query, args, err = builder.ToSql()
513
	if err != nil {
514
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
515
	}
516
517
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
518
519
	// Execute the query and retrieve the rows.
520
	var rows pgx.Rows
521
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
522
	if err != nil {
523
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
524
	}
525
	defer rows.Close()
526
527
	var lastID string
528
529
	// Iterate through the rows and scan the result into a RelationTuple struct.
530
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
531
	for rows.Next() {
532
		var subjectID string
533
		err = rows.Scan(&subjectID)
534
		if err != nil {
535
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
536
		}
537
538
		subjectIDs = append(subjectIDs, subjectID)
539
		lastID = subjectID
540
	}
541
	// Check for any errors during iteration.
542
	if err = rows.Err(); err != nil {
543
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
544
	}
545
546
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
547
548
	// Return the results and encoded continuous token for pagination.
549
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
550
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
551
	}
552
553
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
554
}
555
556
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
557
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
558
	// Start a new trace span and end it when the function exits.
559
	ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot")
560
	defer span.End()
561
562
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
563
564
	var xid types.XID8
565
566
	// Build the query to find the highest transaction ID associated with the tenant.
567
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
568
	query, args, err := builder.ToSql()
569
	if err != nil {
570
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
571
	}
572
573
	// TODO: To optimize this query, create the following index concurrently to avoid table locks:
574
	// CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
575
576
	// Execute the query and retrieve the highest transaction ID.
577
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
578
	if err != nil {
579
		// If no rows are found, return a snapshot token with a value of 0.
580
		if errors.Is(err, pgx.ErrNoRows) {
581
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
582
		}
583
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
584
	}
585
586
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
587
588
	// Return the latest snapshot token associated with the tenant.
589
	return snapshot.Token{Value: xid}, nil
590
}
591