Passed
Push — master ( 39bc67...636f33 )
by Tolga
01:10 queued 17s
created

postgres.*DataReader.QueryUniqueSubjectReferences   F

Complexity

Conditions 14

Size

Total Lines 100
Code Lines 60

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 60
nop 5
dl 0
loc 100
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.QueryUniqueSubjectReferences often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"database/sql"
6
	"errors"
7
	"fmt"
8
	"log/slog"
9
	"strconv"
10
	"strings"
11
12
	"github.com/Masterminds/squirrel"
13
	"github.com/golang/protobuf/jsonpb"
14
	"google.golang.org/protobuf/types/known/anypb"
15
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/types"
19
	"github.com/Permify/permify/internal/storage/postgres/utils"
20
	"github.com/Permify/permify/pkg/database"
21
	db "github.com/Permify/permify/pkg/database/postgres"
22
	base "github.com/Permify/permify/pkg/pb/base/v1"
23
	"github.com/Permify/permify/pkg/token"
24
)
25
26
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
27
// It is responsible for reading data from the database.
28
type DataReader struct {
29
	database  *db.Postgres  // database is an instance of the PostgreSQL database
30
	txOptions sql.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
31
}
32
33
// NewDataReader is a constructor function for DataReader.
34
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
35
func NewDataReader(database *db.Postgres) *DataReader {
36
	return &DataReader{
37
		database:  database,                                                          // Set the database to the passed in PostgreSQL instance
38
		txOptions: sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}, // Set the transaction options
39
	}
40
}
41
42
// QueryRelationships reads relation tuples from the storage based on the given filter.
43
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string) (it *database.TupleIterator, err error) {
44
	// Start a new trace span and end it when the function exits.
45
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
46
	defer span.End()
47
48
	slog.Info("Querying relationships for tenantID: ", slog.String("tenant_id", tenantID))
49
50
	// Decode the snapshot value.
51
	var st token.SnapToken
52
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
53
	if err != nil {
54
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
55
	}
56
57
	// Begin a new read-only transaction with the specified isolation level.
58
	var tx *sql.Tx
59
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
60
	if err != nil {
61
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
62
	}
63
64
	defer func() {
65
		_ = tx.Rollback()
66
	}()
67
68
	// Build the relationships query based on the provided filter and snapshot value.
69
	var args []interface{}
70
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
71
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
72
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
73
74
	// Generate the SQL query and arguments.
75
	var query string
76
	query, args, err = builder.ToSql()
77
	if err != nil {
78
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
79
	}
80
81
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
82
83
	// Execute the SQL query and retrieve the result rows.
84
	var rows *sql.Rows
85
	rows, err = tx.QueryContext(ctx, query, args...)
86
	if err != nil {
87
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
88
	}
89
	defer rows.Close()
90
91
	// Process the result rows and store the relationships in a TupleCollection.
92
	collection := database.NewTupleCollection()
93
	for rows.Next() {
94
		rt := storage.RelationTuple{}
95
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
96
		if err != nil {
97
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
98
		}
99
		collection.Add(rt.ToTuple())
100
	}
101
	if err = rows.Err(); err != nil {
102
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
103
	}
104
105
	// Commit the transaction.
106
	err = tx.Commit()
107
	if err != nil {
108
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
109
	}
110
111
	slog.Info("Successfully retrieved relation tuples from the database.")
112
113
	// Return a TupleIterator created from the TupleCollection.
114
	return collection.CreateTupleIterator(), nil
115
}
116
117
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
118
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
119
	// Start a new trace span and end it when the function exits.
120
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
121
	defer span.End()
122
123
	slog.Info("Reading relationships for tenantID: ", slog.String("tenant_id", tenantID))
124
125
	// Decode the snapshot value.
126
	var st token.SnapToken
127
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
128
	if err != nil {
129
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
130
	}
131
132
	// Begin a new read-only transaction with the specified isolation level.
133
	var tx *sql.Tx
134
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
135
	if err != nil {
136
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
137
	}
138
139
	defer func() {
140
		_ = tx.Rollback()
141
	}()
142
143
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
144
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
145
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
146
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
147
148
	// Apply the pagination token and limit to the query.
149
	if pagination.Token() != "" {
150
		var t database.ContinuousToken
151
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
152
		if err != nil {
153
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
154
		}
155
		var v uint64
156
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
157
		if err != nil {
158
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
159
		}
160
		builder = builder.Where(squirrel.GtOrEq{"id": v})
161
	}
162
163
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
164
165
	// Generate the SQL query and arguments.
166
	var query string
167
	var args []interface{}
168
	query, args, err = builder.ToSql()
169
	if err != nil {
170
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
171
	}
172
173
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
174
175
	// Execute the query and retrieve the rows.
176
	var rows *sql.Rows
177
	rows, err = tx.QueryContext(ctx, query, args...)
178
	if err != nil {
179
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
180
	}
181
	defer rows.Close()
182
183
	var lastID uint64
184
185
	// Iterate through the rows and scan the result into a RelationTuple struct.
186
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
187
	for rows.Next() {
188
		rt := storage.RelationTuple{}
189
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
190
		if err != nil {
191
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
192
		}
193
		lastID = rt.ID
194
		tuples = append(tuples, rt.ToTuple())
195
	}
196
	// Check for any errors during iteration.
197
	if err = rows.Err(); err != nil {
198
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
199
	}
200
201
	// Commit the transaction.
202
	err = tx.Commit()
203
	if err != nil {
204
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
205
	}
206
207
	slog.Info("Successfully read relation tuples from database.")
208
209
	// Return the results and encoded continuous token for pagination.
210
	if len(tuples) > int(pagination.PageSize()) {
211
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
212
	}
213
214
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
215
}
216
217
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
218
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
219
	// Start a new trace span and end it when the function exits.
220
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
221
	defer span.End()
222
	slog.Info("Querying single attribute for tenantID: ", slog.String("tenant_id", tenantID))
223
224
	// Decode the snapshot value.
225
	var st token.SnapToken
226
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
227
	if err != nil {
228
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
229
	}
230
231
	// Begin a new read-only transaction with the specified isolation level.
232
	var tx *sql.Tx
233
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
234
	if err != nil {
235
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
236
	}
237
238
	defer func() {
239
		_ = tx.Rollback()
240
	}()
241
242
	// Build the relationships query based on the provided filter and snapshot value.
243
	var args []interface{}
244
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
245
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
246
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
247
248
	// Generate the SQL query and arguments.
249
	var query string
250
	query, args, err = builder.ToSql()
251
	if err != nil {
252
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
253
	}
254
255
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
256
257
	row := tx.QueryRowContext(ctx, query, args...)
258
259
	rt := storage.Attribute{}
260
261
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
262
	var valueStr string
263
264
	// Scan the row from the database into the fields of `rt` and `valueStr`.
265
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
266
	if err != nil {
267
		if errors.Is(err, sql.ErrNoRows) {
268
			return nil, nil
269
		} else {
270
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
271
		}
272
	}
273
274
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
275
	rt.Value = &anypb.Any{}
276
	unmarshaler := &jsonpb.Unmarshaler{}
277
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
278
	if err != nil {
279
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
280
	}
281
282
	// Commit the transaction.
283
	err = tx.Commit()
284
	if err != nil {
285
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
286
	}
287
288
	slog.Info("Successfully retrieved Single attribute from the database.")
289
290
	return rt.ToAttribute(), nil
291
}
292
293
// QueryAttributes reads multiple attributes from the storage based on the given filter.
294
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
295
	// Start a new trace span and end it when the function exits.
296
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
297
	defer span.End()
298
299
	slog.Info("Querying Attributes for tenantID: ", slog.String("tenant_id", tenantID))
300
301
	// Decode the snapshot value.
302
	var st token.SnapToken
303
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
304
	if err != nil {
305
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
306
	}
307
308
	// Begin a new read-only transaction with the specified isolation level.
309
	var tx *sql.Tx
310
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
311
	if err != nil {
312
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
313
	}
314
315
	defer func() {
316
		_ = tx.Rollback()
317
	}()
318
319
	// Build the relationships query based on the provided filter and snapshot value.
320
	var args []interface{}
321
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
322
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
323
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
324
325
	// Generate the SQL query and arguments.
326
	var query string
327
	query, args, err = builder.ToSql()
328
	if err != nil {
329
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
330
	}
331
332
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
333
334
	// Execute the SQL query and retrieve the result rows.
335
	var rows *sql.Rows
336
	rows, err = tx.QueryContext(ctx, query, args...)
337
	if err != nil {
338
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
339
	}
340
	defer rows.Close()
341
342
	// Process the result rows and store the relationships in a TupleCollection.
343
	collection := database.NewAttributeCollection()
344
	for rows.Next() {
345
		rt := storage.Attribute{}
346
347
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
348
		var valueStr string
349
350
		// Scan the row from the database into the fields of `rt` and `valueStr`.
351
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
352
		if err != nil {
353
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
354
		}
355
356
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
357
		rt.Value = &anypb.Any{}
358
		unmarshaler := &jsonpb.Unmarshaler{}
359
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
360
		if err != nil {
361
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
362
		}
363
364
		collection.Add(rt.ToAttribute())
365
	}
366
	if err = rows.Err(); err != nil {
367
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
368
	}
369
370
	// Commit the transaction.
371
	err = tx.Commit()
372
	if err != nil {
373
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
374
	}
375
376
	slog.Info("Successfully retrieved attributes tuples from the database.")
377
378
	// Return a TupleIterator created from the TupleCollection.
379
	return collection.CreateAttributeIterator(), nil
380
}
381
382
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
383
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
384
	// Start a new trace span and end it when the function exits.
385
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
386
	defer span.End()
387
388
	slog.Info("Reading attributes for tenantID: ", slog.String("tenant_id", tenantID))
389
390
	// Decode the snapshot value.
391
	var st token.SnapToken
392
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
393
	if err != nil {
394
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
395
	}
396
397
	// Begin a new read-only transaction with the specified isolation level.
398
	var tx *sql.Tx
399
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
400
	if err != nil {
401
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
402
	}
403
404
	defer func() {
405
		_ = tx.Rollback()
406
	}()
407
408
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
409
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
410
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
411
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
412
413
	// Apply the pagination token and limit to the query.
414
	if pagination.Token() != "" {
415
		var t database.ContinuousToken
416
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
417
		if err != nil {
418
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
419
		}
420
		var v uint64
421
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
422
		if err != nil {
423
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
424
		}
425
		builder = builder.Where(squirrel.GtOrEq{"id": v})
426
	}
427
428
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
429
430
	// Generate the SQL query and arguments.
431
	var query string
432
	var args []interface{}
433
	query, args, err = builder.ToSql()
434
	if err != nil {
435
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
436
	}
437
438
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
439
440
	// Execute the query and retrieve the rows.
441
	var rows *sql.Rows
442
	rows, err = tx.QueryContext(ctx, query, args...)
443
	if err != nil {
444
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
445
	}
446
	defer rows.Close()
447
448
	var lastID uint64
449
450
	// Iterate through the rows and scan the result into a RelationTuple struct.
451
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
452
	for rows.Next() {
453
		rt := storage.Attribute{}
454
455
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
456
		var valueStr string
457
458
		// Scan the row from the database into the fields of `rt` and `valueStr`.
459
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
460
		if err != nil {
461
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
462
		}
463
		lastID = rt.ID
464
465
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
466
		rt.Value = &anypb.Any{}
467
		unmarshaler := &jsonpb.Unmarshaler{}
468
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
469
		if err != nil {
470
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
471
		}
472
473
		attributes = append(attributes, rt.ToAttribute())
474
	}
475
	// Check for any errors during iteration.
476
	if err = rows.Err(); err != nil {
477
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
478
	}
479
480
	// Commit the transaction.
481
	err = tx.Commit()
482
	if err != nil {
483
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
484
	}
485
486
	slog.Info("Successfully read attributes from the database.")
487
488
	// Return the results and encoded continuous token for pagination.
489
	if len(attributes) > int(pagination.PageSize()) {
490
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
491
	}
492
493
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
494
}
495
496
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
497
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
498
	// Start a new trace span and end it when the function exits.
499
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
500
	defer span.End()
501
502
	// Decode the snapshot value.
503
	var st token.SnapToken
504
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
505
	if err != nil {
506
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
507
	}
508
509
	// Begin a new read-only transaction with the specified isolation level.
510
	var tx *sql.Tx
511
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
512
	if err != nil {
513
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
514
	}
515
516
	defer func() {
517
		_ = tx.Rollback()
518
	}()
519
520
	query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
521
522
	// Apply the pagination token and limit to the subQuery.
523
	if pagination.Token() != "" {
524
		var t database.ContinuousToken
525
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
526
		if err != nil {
527
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
528
		}
529
		var v uint64
530
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
531
		if err != nil {
532
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
533
		}
534
535
		query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10))
536
	}
537
538
	// Append ORDER BY and LIMIT clauses.
539
	query = fmt.Sprintf("%s ORDER BY id LIMIT %d", query, pagination.PageSize()+1)
540
541
	// Execute the query and retrieve the rows.
542
	var rows *sql.Rows
543
	rows, err = tx.QueryContext(ctx, query)
544
	if err != nil {
545
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
546
	}
547
	defer rows.Close()
548
549
	var lastID uint64
550
551
	// Iterate through the rows and scan the result into a RelationTuple struct.
552
	entityIDs := make([]string, 0, pagination.PageSize()+1)
553
	for rows.Next() {
554
		var entityId string
555
		err = rows.Scan(&lastID, &entityId)
556
		if err != nil {
557
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
558
		}
559
560
		entityIDs = append(entityIDs, entityId)
561
	}
562
563
	// Check for any errors during iteration.
564
	if err = rows.Err(); err != nil {
565
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
566
	}
567
568
	// Commit the transaction.
569
	err = tx.Commit()
570
	if err != nil {
571
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
572
	}
573
574
	// Return the results and encoded continuous token for pagination.
575
	if len(entityIDs) > int(pagination.PageSize()) {
576
		return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
577
	}
578
579
	return entityIDs, database.NewNoopContinuousToken().Encode(), nil
580
}
581
582
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
583
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
584
	// Start a new trace span and end it when the function exits.
585
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
586
	defer span.End()
587
588
	slog.Info("Querying unique subject references for tenantID: ", slog.String("tenant_id", tenantID))
589
590
	// Decode the snapshot value.
591
	var st token.SnapToken
592
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
593
	if err != nil {
594
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
595
	}
596
597
	// Begin a new read-only transaction with the specified isolation level.
598
	var tx *sql.Tx
599
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
600
	if err != nil {
601
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
602
	}
603
604
	defer func() {
605
		_ = tx.Rollback()
606
	}()
607
608
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
609
	builder := r.database.Builder.
610
		Select("MIN(id) as id, subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
611
		From(RelationTuplesTable).
612
		Where(squirrel.Eq{"tenant_id": tenantID}).
613
		GroupBy("subject_id")
614
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
615
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
616
617
	// Apply the pagination token and limit to the query.
618
	if pagination.Token() != "" {
619
		var t database.ContinuousToken
620
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
621
		if err != nil {
622
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
623
		}
624
		var v uint64
625
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
626
		if err != nil {
627
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
628
		}
629
		builder = builder.Where(squirrel.GtOrEq{"id": v})
630
	}
631
632
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
633
634
	// Generate the SQL query and arguments.
635
	var query string
636
	var args []interface{}
637
	query, args, err = builder.ToSql()
638
	if err != nil {
639
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
640
	}
641
642
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
643
644
	// Execute the query and retrieve the rows.
645
	var rows *sql.Rows
646
	rows, err = tx.QueryContext(ctx, query, args...)
647
	if err != nil {
648
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
649
	}
650
	defer rows.Close()
651
652
	var lastID uint64
653
654
	// Iterate through the rows and scan the result into a RelationTuple struct.
655
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
656
	for rows.Next() {
657
		var subjectID string
658
		err = rows.Scan(&lastID, &subjectID)
659
		if err != nil {
660
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
661
		}
662
		subjectIDs = append(subjectIDs, subjectID)
663
	}
664
	// Check for any errors during iteration.
665
	if err = rows.Err(); err != nil {
666
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
667
	}
668
669
	// Commit the transaction.
670
	err = tx.Commit()
671
	if err != nil {
672
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
673
	}
674
675
	slog.Info("Successfully retrieved unique subject references from the database.")
676
677
	// Return the results and encoded continuous token for pagination.
678
	if len(subjectIDs) > int(pagination.PageSize()) {
679
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
680
	}
681
682
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
683
}
684
685
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
686
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
687
	// Start a new trace span and end it when the function exits.
688
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
689
	defer span.End()
690
691
	slog.Info("Getting head snapshot for tenantID: ", slog.String("tenant_id", tenantID))
692
693
	var xid types.XID8
694
695
	// Build the query to find the highest transaction ID associated with the tenant.
696
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
697
	query, args, err := builder.ToSql()
698
	if err != nil {
699
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
700
	}
701
702
	// Execute the query and retrieve the highest transaction ID.
703
	err = r.database.DB.QueryRowContext(ctx, query, args...).Scan(&xid)
704
	if err != nil {
705
		// If no rows are found, return a snapshot token with a value of 0.
706
		if errors.Is(err, sql.ErrNoRows) {
707
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
708
		}
709
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
710
	}
711
712
	slog.Info("Successfully retrieved latest snapshot token")
713
714
	// Return the latest snapshot token associated with the tenant.
715
	return snapshot.Token{Value: xid}, nil
716
}
717