internal/storage/postgres/data_reader.go   F
last analyzed

Size/Duplication

Total Lines 585
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
cc 73
eloc 326
dl 0
loc 585
rs 2.56
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A postgres.NewDataReader 0 4 1
D postgres.*DataReader.QueryUniqueSubjectReferences 0 98 13
D postgres.*DataReader.ReadAttributes 0 97 13
A postgres.*DataReader.HeadSnapshot 0 33 4
D postgres.*DataReader.QueryAttributes 0 86 12
B postgres.*DataReader.QuerySingleAttribute 0 56 6
C postgres.*DataReader.QueryRelationships 0 73 11
D postgres.*DataReader.ReadRelationships 0 83 13
1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"log/slog" // Structured logging
7
	"strconv"
8
9
	"github.com/jackc/pgx/v5"
10
11
	"github.com/Masterminds/squirrel"
12
	"google.golang.org/protobuf/encoding/protojson"
13
	"google.golang.org/protobuf/types/known/anypb"
14
15
	"github.com/Permify/permify/internal"
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/utils"
19
	"github.com/Permify/permify/pkg/database"
20
	db "github.com/Permify/permify/pkg/database/postgres"
21
	base "github.com/Permify/permify/pkg/pb/base/v1"
22
	"github.com/Permify/permify/pkg/token"
23
)
24
25
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
26
// It is responsible for reading data from the database.
27
type DataReader struct {
28
	database  *db.Postgres  // database is an instance of the PostgreSQL database
29
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
30
}
31
32
// NewDataReader is a constructor function for DataReader.
33
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
34
func NewDataReader(database *db.Postgres) *DataReader {
35
	return &DataReader{
36
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
37
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
38
	}
39
}
40
41
// QueryRelationships reads relation tuples from the storage based on the given filter.
42
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.CursorPagination) (it *database.TupleIterator, err error) {
43
	// Start a new trace span and end it when the function exits.
44
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-relationships")
45
	defer span.End()
46
	// Log query operation
47
	slog.DebugContext(ctx, "querying relationships for tenant_id", slog.String("tenant_id", tenantID))
48
	// Decode snapshot token
49
	// Decode the snapshot value.
50
	var st token.SnapToken
51
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
52
	if err != nil {
53
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
54
	}
55
56
	// Build the relationships query based on the provided filter and snapshot value.
57
	var args []interface{}
58
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
59
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
60
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
61
62
	if pagination.Cursor() != "" {
63
		var t database.ContinuousToken
64
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
65
		if err != nil {
66
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
67
		}
68
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
69
	}
70
71
	if pagination.Sort() != "" {
72
		builder = builder.OrderBy(pagination.Sort())
73
	}
74
75
	// Apply limit if specified in pagination
76
	limit := pagination.Limit()
77
	if limit > 0 {
78
		builder = builder.Limit(uint64(limit))
79
	}
80
81
	// Generate the SQL query and arguments.
82
	var query string
83
	query, args, err = builder.ToSql()
84
	if err != nil {
85
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
86
	}
87
88
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
89
	// Execute query
90
	// Execute the SQL query and retrieve the result rows.
91
	var rows pgx.Rows
92
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
93
	if err != nil {
94
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
95
	}
96
	defer rows.Close()
97
98
	// Process the result rows and store the relationships in a TupleCollection.
99
	collection := database.NewTupleCollection()
100
	for rows.Next() {
101
		rt := storage.RelationTuple{}
102
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
103
		if err != nil {
104
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
105
		}
106
		collection.Add(rt.ToTuple())
107
	}
108
	if err = rows.Err(); err != nil {
109
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
110
	}
111
112
	slog.DebugContext(ctx, "successfully retrieved relation tuples from the database")
113
	// Return a TupleIterator created from the TupleCollection.
114
	return collection.CreateTupleIterator(), nil
115
}
116
117
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
118
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
119
	// Start a new trace span and end it when the function exits.
120
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-relationships")
121
	defer span.End()
122
	// Log read operation
123
	slog.DebugContext(ctx, "reading relationships for tenant_id", slog.String("tenant_id", tenantID))
124
	// Decode snapshot token
125
	// Decode the snapshot value.
126
	var st token.SnapToken
127
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
128
	if err != nil {
129
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
130
	}
131
132
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
133
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
134
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
135
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
136
137
	// Apply the pagination token and limit to the query.
138
	if pagination.Token() != "" {
139
		var t database.ContinuousToken
140
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
141
		if err != nil {
142
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
143
		}
144
		var v uint64
145
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
146
		if err != nil {
147
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
148
		}
149
		builder = builder.Where(squirrel.GtOrEq{"id": v})
150
	}
151
152
	builder = builder.OrderBy("id")
153
154
	if pagination.PageSize() != 0 {
155
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
156
	}
157
158
	// Generate the SQL query and arguments.
159
	var query string
160
	var args []interface{}
161
	query, args, err = builder.ToSql()
162
	if err != nil {
163
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
164
	}
165
	// Log generated query
166
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
167
	// Execute query
168
	// Execute the query and retrieve the rows.
169
	var rows pgx.Rows
170
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
171
	if err != nil {
172
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
173
	}
174
	defer rows.Close()
175
176
	var lastID uint64
177
178
	// Iterate through the rows and scan the result into a RelationTuple struct.
179
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
180
	for rows.Next() {
181
		rt := storage.RelationTuple{}
182
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
183
		if err != nil {
184
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
185
		}
186
		lastID = rt.ID
187
		tuples = append(tuples, rt.ToTuple())
188
	}
189
	// Check for any errors during iteration.
190
	if err = rows.Err(); err != nil {
191
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
192
	}
193
194
	slog.DebugContext(ctx, "successfully read relation tuples from database")
195
	// Return the results and encoded continuous token for pagination.
196
	if pagination.PageSize() != 0 && len(tuples) > int(pagination.PageSize()) {
197
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
198
	}
199
200
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
201
}
202
203
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
204
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
205
	// Start a new trace span and end it when the function exits.
206
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-single-attribute")
207
	defer span.End()
208
209
	slog.DebugContext(ctx, "querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
210
211
	// Decode the snapshot value.
212
	var st token.SnapToken
213
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
214
	if err != nil {
215
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
216
	}
217
218
	// Build the relationships query based on the provided filter and snapshot value.
219
	var args []interface{}
220
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
221
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
222
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
223
224
	// Generate the SQL query and arguments.
225
	var query string
226
	query, args, err = builder.ToSql()
227
	if err != nil {
228
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
229
	}
230
231
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
232
	// Execute query
233
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
234
235
	rt := storage.Attribute{}
236
237
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
238
	var valueStr string
239
240
	// Scan the row from the database into the fields of `rt` and `valueStr`.
241
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
242
	if err != nil {
243
		if errors.Is(err, pgx.ErrNoRows) {
244
			return nil, nil
245
		} else {
246
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
247
		}
248
	}
249
250
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
251
	rt.Value = &anypb.Any{}
252
	err = protojson.Unmarshal([]byte(valueStr), rt.Value)
253
	if err != nil {
254
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
255
	}
256
257
	slog.DebugContext(ctx, "successfully retrieved Single attribute from the database")
258
	// Return attribute
259
	return rt.ToAttribute(), nil
260
}
261
262
// QueryAttributes reads multiple attributes from the storage based on the given filter.
263
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.CursorPagination) (it *database.AttributeIterator, err error) {
264
	// Start a new trace span and end it when the function exits.
265
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-attributes")
266
	defer span.End()
267
	// Log query operation
268
	slog.DebugContext(ctx, "querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
269
	// Decode snapshot token
270
	// Decode the snapshot value.
271
	var st token.SnapToken
272
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
273
	if err != nil {
274
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
275
	}
276
277
	// Build the attributes query based on the provided filter and snapshot value.
278
	var args []interface{}
279
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
280
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
281
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
282
283
	if pagination.Cursor() != "" {
284
		var t database.ContinuousToken
285
		t, err = utils.EncodedContinuousToken{Value: pagination.Cursor()}.Decode()
286
		if err != nil {
287
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
288
		}
289
		builder = builder.Where(squirrel.GtOrEq{pagination.Sort(): t.(utils.ContinuousToken).Value})
290
	}
291
292
	if pagination.Sort() != "" {
293
		builder = builder.OrderBy(pagination.Sort())
294
	}
295
296
	// Apply limit if specified in pagination
297
	limit := pagination.Limit()
298
	if limit > 0 {
299
		builder = builder.Limit(uint64(limit))
300
	}
301
302
	// Generate the SQL query and arguments.
303
	var query string
304
	query, args, err = builder.ToSql()
305
	if err != nil {
306
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
307
	}
308
	// Log generated query
309
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
310
	// Execute the SQL query and retrieve the result rows.
311
	var rows pgx.Rows
312
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
313
	if err != nil {
314
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
315
	}
316
	defer rows.Close()
317
318
	// Process the result rows and store the attributes in an AttributeCollection.
319
	collection := database.NewAttributeCollection()
320
	for rows.Next() {
321
		rt := storage.Attribute{}
322
323
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
324
		var valueStr string
325
326
		// Scan the row from the database into the fields of `rt` and `valueStr`.
327
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
328
		if err != nil {
329
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
330
		}
331
332
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
333
		rt.Value = &anypb.Any{}
334
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
335
		if err != nil {
336
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
337
		}
338
339
		collection.Add(rt.ToAttribute())
340
	}
341
	if err = rows.Err(); err != nil {
342
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
343
	}
344
345
	slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")
346
	// Return iterator
347
	// Return an AttributeIterator created from the AttributeCollection.
348
	return collection.CreateAttributeIterator(), nil
349
}
350
351
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
352
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
353
	// Start a new trace span and end it when the function exits.
354
	ctx, span := internal.Tracer.Start(ctx, "data-reader.read-attributes")
355
	defer span.End()
356
	// Log read operation
357
	slog.DebugContext(ctx, "reading attributes for tenant_id", slog.String("tenant_id", tenantID))
358
	// Decode snapshot token
359
	// Decode the snapshot value.
360
	var st token.SnapToken
361
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
362
	if err != nil {
363
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
364
	}
365
	// Build SQL query
366
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
367
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
368
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
369
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
370
371
	// Apply the pagination token and limit to the query.
372
	if pagination.Token() != "" {
373
		var t database.ContinuousToken
374
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
375
		if err != nil {
376
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
377
		}
378
		var v uint64
379
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
380
		if err != nil {
381
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
382
		}
383
		builder = builder.Where(squirrel.GtOrEq{"id": v})
384
	}
385
386
	builder = builder.OrderBy("id")
387
388
	if pagination.PageSize() != 0 {
389
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
390
	}
391
392
	// Generate the SQL query and arguments.
393
	var query string
394
	var args []interface{}
395
	query, args, err = builder.ToSql()
396
	if err != nil {
397
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
398
	}
399
	// Log generated query
400
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
401
	// Execute query
402
	// Execute the query and retrieve the rows.
403
	var rows pgx.Rows
404
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
405
	if err != nil {
406
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
407
	}
408
	defer rows.Close()
409
410
	var lastID uint64
411
412
	// Iterate through the rows and scan the result into a RelationTuple struct.
413
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
414
	for rows.Next() {
415
		rt := storage.Attribute{}
416
417
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
418
		var valueStr string
419
420
		// Scan the row from the database into the fields of `rt` and `valueStr`.
421
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
422
		if err != nil {
423
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
424
		}
425
		lastID = rt.ID
426
427
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
428
		rt.Value = &anypb.Any{}
429
		err = protojson.Unmarshal([]byte(valueStr), rt.Value)
430
		if err != nil {
431
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
432
		}
433
434
		attributes = append(attributes, rt.ToAttribute())
435
	}
436
	// Check for any errors during iteration.
437
	if err = rows.Err(); err != nil {
438
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
439
	}
440
441
	slog.DebugContext(ctx, "successfully read attributes from the database")
442
	// Return results
443
	// Return the results and encoded continuous token for pagination.
444
	if len(attributes) > int(pagination.PageSize()) {
445
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
446
	}
447
448
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
449
}
450
451
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
452
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, excluded []string, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
453
	// Start a new trace span and end it when the function exits.
454
	ctx, span := internal.Tracer.Start(ctx, "data-reader.query-unique-subject-reference")
455
	defer span.End()
456
	// Log query operation
457
	slog.DebugContext(ctx, "querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
458
	// Decode snapshot token
459
	// Decode the snapshot value.
460
	var st token.SnapToken
461
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
462
	if err != nil {
463
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
464
	}
465
466
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
467
	builder := r.database.Builder.
468
		Select("subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
469
		From(RelationTuplesTable).
470
		Where(squirrel.Eq{"tenant_id": tenantID}).
471
		GroupBy("subject_id")
472
473
	// Apply subject filter
474
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{
475
		Subject: &base.SubjectFilter{
476
			Type:     subjectReference.GetType(),
477
			Relation: subjectReference.GetRelation(),
478
		},
479
	})
480
481
	// Apply snapshot filter
482
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
483
484
	// Apply exclusion if the list is not empty
485
	if len(excluded) > 0 {
486
		builder = builder.Where(squirrel.NotEq{"subject_id": excluded})
487
	}
488
489
	// Apply the pagination token and limit to the query.
490
	if pagination.Token() != "" {
491
		var t database.ContinuousToken
492
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
493
		if err != nil {
494
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
495
		}
496
		builder = builder.Where(squirrel.GtOrEq{"subject_id": t.(utils.ContinuousToken).Value})
497
	}
498
499
	builder = builder.OrderBy("subject_id")
500
501
	if pagination.PageSize() != 0 {
502
		builder = builder.Limit(uint64(pagination.PageSize() + 1))
503
	}
504
505
	// Generate the SQL query and arguments.
506
	var query string
507
	var args []interface{}
508
	query, args, err = builder.ToSql()
509
	if err != nil {
510
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
511
	}
512
	// Log generated query
513
	slog.DebugContext(ctx, "generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
514
	// Execute query
515
	// Execute the query and retrieve the rows.
516
	var rows pgx.Rows
517
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
518
	if err != nil {
519
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
520
	}
521
	defer rows.Close()
522
523
	var lastID string
524
525
	// Iterate through the rows and scan the result into a RelationTuple struct.
526
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
527
	for rows.Next() {
528
		var subjectID string
529
		err = rows.Scan(&subjectID)
530
		if err != nil {
531
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
532
		}
533
534
		subjectIDs = append(subjectIDs, subjectID)
535
		lastID = subjectID
536
	}
537
	// Check for any errors during iteration.
538
	if err = rows.Err(); err != nil {
539
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
540
	}
541
542
	slog.DebugContext(ctx, "successfully retrieved unique subject references from the database")
543
	// Return results
544
	// Return the results and encoded continuous token for pagination.
545
	if pagination.PageSize() != 0 && len(subjectIDs) > int(pagination.PageSize()) {
546
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(lastID).Encode(), nil
547
	}
548
549
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
550
}
551
552
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
553
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
554
	// Start a new trace span and end it when the function exits.
555
	ctx, span := internal.Tracer.Start(ctx, "data-reader.head-snapshot")
556
	defer span.End()
557
	// Log snapshot operation
558
	slog.DebugContext(ctx, "getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
559
	// Declare transaction ID variable
560
	var xid db.XID8
561
562
	// Build the query to find the highest transaction ID associated with the tenant.
563
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
564
	query, args, err := builder.ToSql()
565
	if err != nil {
566
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
567
	}
568
569
	// TODO: To optimize this query, create the following index concurrently to avoid table locks:
570
	// CREATE INDEX CONCURRENTLY idx_transactions_tenant_id_id ON transactions(tenant_id, id DESC);
571
572
	// Execute the query and retrieve the highest transaction ID.
573
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
574
	if err != nil {
575
		// If no rows are found, return a snapshot token with a value of 0.
576
		if errors.Is(err, pgx.ErrNoRows) {
577
			return snapshot.Token{Value: db.XID8{Uint: 0}}, nil
578
		}
579
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
580
	}
581
582
	slog.DebugContext(ctx, "successfully retrieved latest snapshot token")
583
	// Return snapshot token
584
	// Return the latest snapshot token associated with the tenant.
585
	return snapshot.Token{Value: xid}, nil
586
}
587