Passed
Push — master ( 529ae6...944870 )
by Tolga
06:00 queued 02:43
created

internal/storage/postgres/data_reader.go   F

Size/Duplication

Total Lines 588
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 73
eloc 326
dl 0
loc 588
rs 2.56
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
D postgres.*DataReader.QueryUniqueSubjectReferences 0 98 13
D postgres.*DataReader.ReadAttributes 0 97 13
A postgres.*DataReader.HeadSnapshot 0 33 4
D postgres.*DataReader.QueryAttributes 0 87 12
B postgres.*DataReader.QuerySingleAttribute 0 56 6
A postgres.NewDataReader 0 4 1
C postgres.*DataReader.QueryRelationships 0 74 11
D postgres.*DataReader.ReadRelationships 0 84 13
1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog"
7
	"strconv"
8
9
	"github.com/jackc/pgx/v5"
10
11
	"github.com/Masterminds/squirrel"
12
	"google.golang.org/protobuf/encoding/protojson"
13
	"google.golang.org/protobuf/types/known/anypb"
14
15
	"github.com/Permify/permify/internal"
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/utils"
19
	"github.com/Permify/permify/pkg/database"
20
	db "github.com/Permify/permify/pkg/database/postgres"
21
	base "github.com/Permify/permify/pkg/pb/base/v1"
22
	"github.com/Permify/permify/pkg/token"
23
)
24
25
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
26
// It is responsible for reading data from the database.
27
type DataReader struct {
28
	database  *db.Postgres  // database is an instance of the PostgreSQL database
29
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
30
}
31
32
// NewDataReader is a constructor function for DataReader.
33
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
34
func NewDataReader(database *db.Postgres) *DataReader {
35
	return &DataReader{
36
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
37
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
38
	}
39
}
40
41
// QueryRelationships reads relation tuples from the storage based on the given filter.
42
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
43
	// Start a new trace span and end it when the function exits.
44
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships")
45
	defer span.End()
46
47
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
48
49
	// Decode the snapshot value.
50
	var st token.SnapToken
51
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
52
	if err != nil {
53
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
54
	}
55
56
	// Build the relationships query based on the provided filter and snapshot value.
57
	var args []interface{}
58
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
59
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
60
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
61
62
	if pagination.Cursor() != "" {
63
		var t database.ContinuousToken
64
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
65
		if err != nil {
66
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
67
		}
68
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
69
	}
70
71
	if pagination.Sort() != "" {
72
		builder = builder.OrderBy(pagination.Sort())
73
	}
74
75
	// Apply limit if specified in pagination
76
	limit := pagination.Limit()
77
	if limit > 0 {
78
		builder = builder.Limit(uint64(limit))
79
	}
80
81
	// Generate the SQL query and arguments.
82
	var query string
83
	query, args, err = builder.ToSql()
84
	if err != nil {
85
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
86
	}
87
88
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
89
90
	// Execute the SQL query and retrieve the result rows.
91
	var rows pgx.Rows
92
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
93
	if err != nil {
94
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
95
	}
96
	defer rows.Close()
97
98
	// Process the result rows and store the relationships in a TupleCollection.
99
	collection := database.NewTupleCollection()
100
	for rows.Next() {
101
		rt := storage.RelationTuple{}
102
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
103
		if err != nil {
104
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
105
		}
106
		collection.Add(rt.ToTuple())
107
	}
108
	if err = rows.Err(); err != nil {
109
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
110
	}
111
112
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
113
114
	// Return a TupleIterator created from the TupleCollection.
115
	return collection.CreateTupleIterator(), nil
116
}
117
118
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
119
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
120
	// Start a new trace span and end it when the function exits.
121
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships")
122
	defer span.End()
123
124
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
125
126
	// Decode the snapshot value.
127
	var st token.SnapToken
128
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
129
	if err != nil {
130
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
131
	}
132
133
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
134
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
135
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
136
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
137
138
	// Apply the pagination token and limit to the query.
139
	if pagination.Token() != "" {
140
		var t database.ContinuousToken
141
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
142
		if err != nil {
143
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
144
		}
145
		var v uint64
146
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
147
		if err != nil {
148
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
149
		}
150
		builder = builder.Where(squirrel.GtOrEq{"id": v})
151
	}
152
153
	builder = builder.OrderBy("id")
154
155
	if pagination.PageSize() != 0 {
156
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
157
	}
158
159
	// Generate the SQL query and arguments.
160
	var query string
161
	var args []interface{}
162
	query, args, err = builder.ToSql()
163
	if err != nil {
164
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
165
	}
166
167
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
168
169
	// Execute the query and retrieve the rows.
170
	var rows pgx.Rows
171
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
172
	if err != nil {
173
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
174
	}
175
	defer rows.Close()
176
177
	var lastID uint64
178
179
	// Iterate through the rows and scan the result into a RelationTuple struct.
180
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
181
	for rows.Next() {
182
		rt := storage.RelationTuple{}
183
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
184
		if err != nil {
185
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
186
		}
187
		lastID = rt.ID
188
		tuples = append(tuples, rt.ToTuple())
189
	}
190
	// Check for any errors during iteration.
191
	if err = rows.Err(); err != nil {
192
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
193
	}
194
195
	slog.DebugContext(ctx, "successfully read relation tuples from database")
196
197
	// Return the results and encoded continuous token for pagination.
198
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
199
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
200
	}
201
202
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
203
}
204
205
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
206
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
207
	// Start a new trace span and end it when the function exits.
208
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute")
209
	defer span.End()
210
211
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
212
213
	// Decode the snapshot value.
214
	var st token.SnapToken
215
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
216
	if err != nil {
217
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
218
	}
219
220
	// Build the relationships query based on the provided filter and snapshot value.
221
	var args []interface{}
222
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
223
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
224
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
225
226
	// Generate the SQL query and arguments.
227
	var query string
228
	query, args, err = builder.ToSql()
229
	if err != nil {
230
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
231
	}
232
233
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
234
235
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
236
237
	rt := storage.Attribute{}
238
239
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
240
	var valueStr string
241
242
	// Scan the row from the database into the fields of `rt` and `valueStr`.
243
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
244
	if err != nil {
245
		if errors.Is(err, pgx.ErrNoRows) {
246
			return nil, nil
247
		} else {
248
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
249
		}
250
	}
251
252
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
253
	rt.Value = &anypb.Any{}
254
	err = protojson.Unmarshal([]byte(valueStr), rt.Value)
255
	if err != nil {
256
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
257
	}
258
259
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
260
261
	return rt.ToAttribute(), nil
262
}
263
264
// QueryAttributes reads multiple attributes from the storage based on the given filter.
265
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
266
	// Start a new trace span and end it when the function exits.
267
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes")
268
	defer span.End()
269
270
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
271
272
	// Decode the snapshot value.
273
	var st token.SnapToken
274
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
275
	if err != nil {
276
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
277
	}
278
279
	// Build the attributes query based on the provided filter and snapshot value.
280
	var args []interface{}
281
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
282
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
283
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
284
285
	if pagination.Cursor() != "" {
286
		var t database.ContinuousToken
287
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
288
		if err != nil {
289
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
290
		}
291
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
292
	}
293
294
	if pagination.Sort() != "" {
295
		builder = builder.OrderBy(pagination.Sort())
296
	}
297
298
	// Apply limit if specified in pagination
299
	limit := pagination.Limit()
300
	if limit > 0 {
301
		builder = builder.Limit(uint64(limit))
302
	}
303
304
	// Generate the SQL query and arguments.
305
	var query string
306
	query, args, err = builder.ToSql()
307
	if err != nil {
308
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
309
	}
310
311
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
312
313
	// Execute the SQL query and retrieve the result rows.
314
	var rows pgx.Rows
315
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
316
	if err != nil {
317
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
318
	}
319
	defer rows.Close()
320
321
	// Process the result rows and store the attributes in an AttributeCollection.
322
	collection := database.NewAttributeCollection()
323
	for rows.Next() {
324
		rt := storage.Attribute{}
325
326
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
327
		var valueStr string
328
329
		// Scan the row from the database into the fields of `rt` and `valueStr`.
330
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
331
		if err != nil {
332
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
333
		}
334
335
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
336
		rt.Value = &anypb.Any{}
337
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
338
		if err != nil {
339
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
340
		}
341
342
		collection.Add(rt.ToAttribute())
343
	}
344
	if err = rows.Err(); err != nil {
345
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
346
	}
347
348
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
349
350
	// Return an AttributeIterator created from the AttributeCollection.
351
	return collection.CreateAttributeIterator(), nil
352
}
353
354
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
355
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
356
	// Start a new trace span and end it when the function exits.
357
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes")
358
	defer span.End()
359
360
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
361
362
	// Decode the snapshot value.
363
	var st token.SnapToken
364
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
365
	if err != nil {
366
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
367
	}
368
369
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
370
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
371
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
372
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
373
374
	// Apply the pagination token and limit to the query.
375
	if pagination.Token() != "" {
376
		var t database.ContinuousToken
377
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
378
		if err != nil {
379
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
380
		}
381
		var v uint64
382
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
383
		if err != nil {
384
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
385
		}
386
		builder = builder.Where(squirrel.GtOrEq{"id": v})
387
	}
388
389
	builder = builder.OrderBy("id")
390
391
	if pagination.PageSize() != 0 {
392
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
393
	}
394
395
	// Generate the SQL query and arguments.
396
	var query string
397
	var args []interface{}
398
	query, args, err = builder.ToSql()
399
	if err != nil {
400
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
401
	}
402
403
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
404
405
	// Execute the query and retrieve the rows.
406
	var rows pgx.Rows
407
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
408
	if err != nil {
409
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
410
	}
411
	defer rows.Close()
412
413
	var lastID uint64
414
415
	// Iterate through the rows and scan the result into a RelationTuple struct.
416
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
417
	for rows.Next() {
418
		rt := storage.Attribute{}
419
420
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
421
		var valueStr string
422
423
		// Scan the row from the database into the fields of `rt` and `valueStr`.
424
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
425
		if err != nil {
426
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
427
		}
428
		lastID = rt.ID
429
430
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
431
		rt.Value = &anypb.Any{}
432
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
433
		if err != nil {
434
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
435
		}
436
437
		attributes = append(attributes, rt.ToAttribute())
438
	}
439
	// Check for any errors during iteration.
440
	if err = rows.Err(); err != nil {
441
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
442
	}
443
444
	slog.DebugContext(ctx, "successfully read attributes from the database")
445
446
	// Return the results and encoded continuous token for pagination.
447
	if len(attributes) > int(pagination.PageSize()) {
448
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
449
	}
450
451
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
452
}
453
454
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
455
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
456
	// Start a new trace span and end it when the function exits.
457
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference")
458
	defer span.End()
459
460
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
461
462
	// Decode the snapshot value.
463
	var st token.SnapToken
464
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
465
	if err != nil {
466
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
467
	}
468
469
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
470
	builder := r.database.Builder.
471
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
472
		From(RelationTuplesTable).
473
		Where(squirrel.Eq{"tenant_id": tenantID}).
474
		GroupBy("subject_id")
475
476
	// Apply subject filter
477
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{
478
		Subject: &base.SubjectFilter{
479
			Type:     subjectReference.GetType(),
480
			Relation: subjectReference.GetRelation(),
481
		},
482
	})
483
484
	// Apply snapshot filter
485
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
486
487
	// Apply exclusion if the list is not empty
488
	if len(excluded) > 0 {
489
		builder = builder.Where(squirrel.NotEq{"subject_id": excluded})
490
	}
491
492
	// Apply the pagination token and limit to the query.
493
	if pagination.Token() != "" {
494
		var t database.ContinuousToken
495
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
496
		if err != nil {
497
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
498
		}
499
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
500
	}
501
502
	builder = builder.OrderBy("subject_id")
503
504
	if pagination.PageSize() != 0 {
505
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
506
	}
507
508
	// Generate the SQL query and arguments.
509
	var query string
510
	var args []interface{}
511
	query, args, err = builder.ToSql()
512
	if err != nil {
513
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
514
	}
515
516
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
517
518
	// Execute the query and retrieve the rows.
519
	var rows pgx.Rows
520
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
521
	if err != nil {
522
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
523
	}
524
	defer rows.Close()
525
526
	var lastID string
527
528
	// Iterate through the rows and scan the result into a RelationTuple struct.
529
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
530
	for rows.Next() {
531
		var subjectID string
532
		err = rows.Scan(&subjectID)
533
		if err != nil {
534
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
535
		}
536
537
		subjectIDs = append(subjectIDs, subjectID)
538
		lastID = subjectID
539
	}
540
	// Check for any errors during iteration.
541
	if err = rows.Err(); err != nil {
542
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
543
	}
544
545
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
546
547
	// Return the results and encoded continuous token for pagination.
548
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
549
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
550
	}
551
552
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
553
}
554
555
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
556
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
557
	// Start a new trace span and end it when the function exits.
558
	ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot")
559
	defer span.End()
560
561
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
562
563
	var xid db.XID8
564
565
	// Build the query to find the highest transaction ID associated with the tenant.
566
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
567
	query, args, err := builder.ToSql()
568
	if err != nil {
569
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
570
	}
571
572
	// TODO: To optimize this query, create the following index concurrently to avoid table locks:
573
	// CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
574
575
	// Execute the query and retrieve the highest transaction ID.
576
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
577
	if err != nil {
578
		// If no rows are found, return a snapshot token with a value of 0.
579
		if errors.Is(err, pgx.ErrNoRows) {
580
			return snapshot.Token{Value: db.XID8{Uint: 0}}, nil
581
		}
582
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
583
	}
584
585
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
586
587
	// Return the latest snapshot token associated with the tenant.
588
	return snapshot.Token{Value: xid}, nil
589
}
590