Passed
Push — master ( 753a57...fc258c )
by Tolga
01:23 queued 18s
created

postgres.*DataReader.ReadRelationships   D

Complexity

Conditions 13

Size

Total Lines 96
Code Lines 56

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
eloc 56
nop 5
dl 0
loc 96
rs 4.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like postgres.*DataReader.ReadRelationships often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
package postgres
2
3
import (
4
	"context"
5
	"database/sql"
6
	"errors"
7
	"fmt"
8
	"log/slog"
9
	"strconv"
10
	"strings"
11
12
	"github.com/Masterminds/squirrel"
13
	"github.com/golang/protobuf/jsonpb"
14
	"google.golang.org/protobuf/types/known/anypb"
15
16
	"github.com/Permify/permify/internal/storage"
17
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
18
	"github.com/Permify/permify/internal/storage/postgres/types"
19
	"github.com/Permify/permify/internal/storage/postgres/utils"
20
	"github.com/Permify/permify/pkg/database"
21
	db "github.com/Permify/permify/pkg/database/postgres"
22
	base "github.com/Permify/permify/pkg/pb/base/v1"
23
	"github.com/Permify/permify/pkg/token"
24
)
25
26
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
27
// It is responsible for reading data from the database.
28
type DataReader struct {
29
	database  *db.Postgres  // database is an instance of the PostgreSQL database
30
	txOptions sql.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
31
}
32
33
// NewDataReader is a constructor function for DataReader.
34
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
35
func NewDataReader(database *db.Postgres) *DataReader {
36
	return &DataReader{
37
		database:  database,                                                          // Set the database to the passed in PostgreSQL instance
38
		txOptions: sql.TxOptions{Isolation: sql.LevelRepeatableRead, ReadOnly: true}, // Set the transaction options
39
	}
40
}
41
42
// QueryRelationships reads relation tuples from the storage based on the given filter.
43
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string) (it *database.TupleIterator, err error) {
44
	// Start a new trace span and end it when the function exits.
45
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
46
	defer span.End()
47
48
	slog.Info("Querying relationships for tenantID: ", slog.String("tenant_id", tenantID))
49
50
	// Decode the snapshot value.
51
	var st token.SnapToken
52
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
53
	if err != nil {
54
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
55
	}
56
57
	// Begin a new read-only transaction with the specified isolation level.
58
	var tx *sql.Tx
59
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
60
	if err != nil {
61
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
62
	}
63
64
	// Rollback the transaction in case of any error.
65
	defer utils.Rollback(tx)
66
67
	// Build the relationships query based on the provided filter and snapshot value.
68
	var args []interface{}
69
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
70
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
71
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
72
73
	// Generate the SQL query and arguments.
74
	var query string
75
	query, args, err = builder.ToSql()
76
	if err != nil {
77
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
78
	}
79
80
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
81
82
	// Execute the SQL query and retrieve the result rows.
83
	var rows *sql.Rows
84
	rows, err = tx.QueryContext(ctx, query, args...)
85
	if err != nil {
86
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
87
	}
88
	defer rows.Close()
89
90
	// Process the result rows and store the relationships in a TupleCollection.
91
	collection := database.NewTupleCollection()
92
	for rows.Next() {
93
		rt := storage.RelationTuple{}
94
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
95
		if err != nil {
96
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
97
		}
98
		collection.Add(rt.ToTuple())
99
	}
100
	if err = rows.Err(); err != nil {
101
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
102
	}
103
104
	// Commit the transaction.
105
	err = tx.Commit()
106
	if err != nil {
107
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
108
	}
109
110
	slog.Info("Successfully retrieved relationship tuples from the database.")
111
112
	// Return a TupleIterator created from the TupleCollection.
113
	return collection.CreateTupleIterator(), nil
114
}
115
116
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
117
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
118
	// Start a new trace span and end it when the function exits.
119
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
120
	defer span.End()
121
122
	slog.Info("Reading relationships for tenantID: ", slog.String("tenant_id", tenantID))
123
124
	// Decode the snapshot value.
125
	var st token.SnapToken
126
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
127
	if err != nil {
128
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
129
	}
130
131
	// Begin a new read-only transaction with the specified isolation level.
132
	var tx *sql.Tx
133
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
134
	if err != nil {
135
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
136
	}
137
138
	// Rollback the transaction in case of any error.
139
	defer utils.Rollback(tx)
140
141
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
142
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
143
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
144
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
145
146
	// Apply the pagination token and limit to the query.
147
	if pagination.Token() != "" {
148
		var t database.ContinuousToken
149
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
150
		if err != nil {
151
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
152
		}
153
		var v uint64
154
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
155
		if err != nil {
156
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
157
		}
158
		builder = builder.Where(squirrel.GtOrEq{"id": v})
159
	}
160
161
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
162
163
	// Generate the SQL query and arguments.
164
	var query string
165
	var args []interface{}
166
	query, args, err = builder.ToSql()
167
	if err != nil {
168
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
169
	}
170
171
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
172
173
	// Execute the query and retrieve the rows.
174
	var rows *sql.Rows
175
	rows, err = tx.QueryContext(ctx, query, args...)
176
	if err != nil {
177
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
178
	}
179
	defer rows.Close()
180
181
	var lastID uint64
182
183
	// Iterate through the rows and scan the result into a RelationTuple struct.
184
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
185
	for rows.Next() {
186
		rt := storage.RelationTuple{}
187
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
188
		if err != nil {
189
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
190
		}
191
		lastID = rt.ID
192
		tuples = append(tuples, rt.ToTuple())
193
	}
194
	// Check for any errors during iteration.
195
	if err = rows.Err(); err != nil {
196
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
197
	}
198
199
	// Commit the transaction.
200
	err = tx.Commit()
201
	if err != nil {
202
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
203
	}
204
205
	slog.Info("Successfully read relationships from database.")
206
207
	// Return the results and encoded continuous token for pagination.
208
	if len(tuples) > int(pagination.PageSize()) {
209
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
210
	}
211
212
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
213
}
214
215
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
216
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
217
	// Start a new trace span and end it when the function exits.
218
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
219
	defer span.End()
220
	slog.Info("Querying single attribute for tenantID: ", slog.String("tenant_id", tenantID))
221
222
	// Decode the snapshot value.
223
	var st token.SnapToken
224
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
225
	if err != nil {
226
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
227
	}
228
229
	// Begin a new read-only transaction with the specified isolation level.
230
	var tx *sql.Tx
231
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
232
	if err != nil {
233
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
234
	}
235
236
	// Rollback the transaction in case of any error.
237
	defer utils.Rollback(tx)
238
239
	// Build the relationships query based on the provided filter and snapshot value.
240
	var args []interface{}
241
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
242
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
243
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
244
245
	// Generate the SQL query and arguments.
246
	var query string
247
	query, args, err = builder.ToSql()
248
	if err != nil {
249
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
250
	}
251
252
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
253
254
	row := tx.QueryRowContext(ctx, query, args...)
255
256
	rt := storage.Attribute{}
257
258
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
259
	var valueStr string
260
261
	// Scan the row from the database into the fields of `rt` and `valueStr`.
262
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
263
	if err != nil {
264
		if errors.Is(err, sql.ErrNoRows) {
265
			return nil, nil
266
		} else {
267
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
268
		}
269
	}
270
271
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
272
	rt.Value = &anypb.Any{}
273
	unmarshaler := &jsonpb.Unmarshaler{}
274
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
275
	if err != nil {
276
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
277
	}
278
279
	// Commit the transaction.
280
	err = tx.Commit()
281
	if err != nil {
282
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
283
	}
284
285
	slog.Info("Successfully retrieved Single attribute from the database.")
286
287
	return rt.ToAttribute(), nil
288
}
289
290
// QueryAttributes reads multiple attributes from the storage based on the given filter.
291
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
292
	// Start a new trace span and end it when the function exits.
293
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
294
	defer span.End()
295
296
	slog.Info("Querying Attributes for tenantID: ", slog.String("tenant_id", tenantID))
297
298
	// Decode the snapshot value.
299
	var st token.SnapToken
300
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
301
	if err != nil {
302
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
303
	}
304
305
	// Begin a new read-only transaction with the specified isolation level.
306
	var tx *sql.Tx
307
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
308
	if err != nil {
309
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
310
	}
311
312
	// Rollback the transaction in case of any error.
313
	defer utils.Rollback(tx)
314
315
	// Build the relationships query based on the provided filter and snapshot value.
316
	var args []interface{}
317
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
318
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
319
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
320
321
	// Generate the SQL query and arguments.
322
	var query string
323
	query, args, err = builder.ToSql()
324
	if err != nil {
325
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
326
	}
327
328
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
329
330
	// Execute the SQL query and retrieve the result rows.
331
	var rows *sql.Rows
332
	rows, err = tx.QueryContext(ctx, query, args...)
333
	if err != nil {
334
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
335
	}
336
	defer rows.Close()
337
338
	// Process the result rows and store the relationships in a TupleCollection.
339
	collection := database.NewAttributeCollection()
340
	for rows.Next() {
341
		rt := storage.Attribute{}
342
343
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
344
		var valueStr string
345
346
		// Scan the row from the database into the fields of `rt` and `valueStr`.
347
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
348
		if err != nil {
349
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
350
		}
351
352
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
353
		rt.Value = &anypb.Any{}
354
		unmarshaler := &jsonpb.Unmarshaler{}
355
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
356
		if err != nil {
357
			return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
358
		}
359
360
		collection.Add(rt.ToAttribute())
361
	}
362
	if err = rows.Err(); err != nil {
363
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
364
	}
365
366
	// Commit the transaction.
367
	err = tx.Commit()
368
	if err != nil {
369
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
370
	}
371
372
	slog.Info("Successfully retrieved attributes tuples from the database.")
373
374
	// Return a TupleIterator created from the TupleCollection.
375
	return collection.CreateAttributeIterator(), nil
376
}
377
378
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
379
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
380
	// Start a new trace span and end it when the function exits.
381
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
382
	defer span.End()
383
384
	slog.Info("Reading attributes for tenantID: ", slog.String("tenant_id", tenantID))
385
386
	// Decode the snapshot value.
387
	var st token.SnapToken
388
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
389
	if err != nil {
390
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
391
	}
392
393
	// Begin a new read-only transaction with the specified isolation level.
394
	var tx *sql.Tx
395
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
396
	if err != nil {
397
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
398
	}
399
400
	// Rollback the transaction in case of any error.
401
	defer utils.Rollback(tx)
402
403
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
404
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
405
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
406
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
407
408
	// Apply the pagination token and limit to the query.
409
	if pagination.Token() != "" {
410
		var t database.ContinuousToken
411
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
412
		if err != nil {
413
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
414
		}
415
		var v uint64
416
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
417
		if err != nil {
418
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
419
		}
420
		builder = builder.Where(squirrel.GtOrEq{"id": v})
421
	}
422
423
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
424
425
	// Generate the SQL query and arguments.
426
	var query string
427
	var args []interface{}
428
	query, args, err = builder.ToSql()
429
	if err != nil {
430
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
431
	}
432
433
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
434
435
	// Execute the query and retrieve the rows.
436
	var rows *sql.Rows
437
	rows, err = tx.QueryContext(ctx, query, args...)
438
	if err != nil {
439
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
440
	}
441
	defer rows.Close()
442
443
	var lastID uint64
444
445
	// Iterate through the rows and scan the result into a RelationTuple struct.
446
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
447
	for rows.Next() {
448
		rt := storage.Attribute{}
449
450
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
451
		var valueStr string
452
453
		// Scan the row from the database into the fields of `rt` and `valueStr`.
454
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
455
		if err != nil {
456
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
457
		}
458
		lastID = rt.ID
459
460
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
461
		rt.Value = &anypb.Any{}
462
		unmarshaler := &jsonpb.Unmarshaler{}
463
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
464
		if err != nil {
465
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
466
		}
467
468
		attributes = append(attributes, rt.ToAttribute())
469
	}
470
	// Check for any errors during iteration.
471
	if err = rows.Err(); err != nil {
472
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
473
	}
474
475
	// Commit the transaction.
476
	err = tx.Commit()
477
	if err != nil {
478
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
479
	}
480
481
	slog.Info("Successfully read attributes from the database.")
482
483
	// Return the results and encoded continuous token for pagination.
484
	if len(attributes) > int(pagination.PageSize()) {
485
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
486
	}
487
488
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
489
}
490
491
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
492
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
493
	// Start a new trace span and end it when the function exits.
494
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
495
	defer span.End()
496
497
	// Decode the snapshot value.
498
	var st token.SnapToken
499
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
500
	if err != nil {
501
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
502
	}
503
504
	// Begin a new read-only transaction with the specified isolation level.
505
	var tx *sql.Tx
506
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
507
	if err != nil {
508
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
509
	}
510
511
	// Rollback the transaction in case of any error.
512
	defer utils.Rollback(tx)
513
514
	query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
515
516
	// Apply the pagination token and limit to the subQuery.
517
	if pagination.Token() != "" {
518
		var t database.ContinuousToken
519
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
520
		if err != nil {
521
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
522
		}
523
		var v uint64
524
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
525
		if err != nil {
526
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
527
		}
528
529
		query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10))
530
	}
531
532
	// Append ORDER BY and LIMIT clauses.
533
	query = fmt.Sprintf("%s ORDER BY id LIMIT %d", query, pagination.PageSize()+1)
534
535
	// Execute the query and retrieve the rows.
536
	var rows *sql.Rows
537
	rows, err = tx.QueryContext(ctx, query)
538
	if err != nil {
539
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
540
	}
541
	defer rows.Close()
542
543
	var lastID uint64
544
545
	// Iterate through the rows and scan the result into a RelationTuple struct.
546
	entityIDs := make([]string, 0, pagination.PageSize()+1)
547
	for rows.Next() {
548
		var entityId string
549
		err = rows.Scan(&lastID, &entityId)
550
		if err != nil {
551
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
552
		}
553
554
		entityIDs = append(entityIDs, entityId)
555
	}
556
557
	// Check for any errors during iteration.
558
	if err = rows.Err(); err != nil {
559
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
560
	}
561
562
	// Commit the transaction.
563
	err = tx.Commit()
564
	if err != nil {
565
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
566
	}
567
568
	// Return the results and encoded continuous token for pagination.
569
	if len(entityIDs) > int(pagination.PageSize()) {
570
		return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
571
	}
572
573
	return entityIDs, database.NewNoopContinuousToken().Encode(), nil
574
}
575
576
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
577
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
578
	// Start a new trace span and end it when the function exits.
579
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
580
	defer span.End()
581
582
	slog.Info("Querying unique subject references for tenantID: ", slog.String("tenant_id", tenantID))
583
584
	// Decode the snapshot value.
585
	var st token.SnapToken
586
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
587
	if err != nil {
588
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
589
	}
590
591
	// Begin a new read-only transaction with the specified isolation level.
592
	var tx *sql.Tx
593
	tx, err = r.database.DB.BeginTx(ctx, &r.txOptions)
594
	if err != nil {
595
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
596
	}
597
598
	// Rollback the transaction in case of any error.
599
	defer utils.Rollback(tx)
600
601
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
602
	builder := r.database.Builder.
603
		Select("MIN(id) as id, subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
604
		From(RelationTuplesTable).
605
		Where(squirrel.Eq{"tenant_id": tenantID}).
606
		GroupBy("subject_id")
607
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
608
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
609
610
	// Apply the pagination token and limit to the query.
611
	if pagination.Token() != "" {
612
		var t database.ContinuousToken
613
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
614
		if err != nil {
615
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
616
		}
617
		var v uint64
618
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
619
		if err != nil {
620
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
621
		}
622
		builder = builder.Where(squirrel.GtOrEq{"id": v})
623
	}
624
625
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
626
627
	// Generate the SQL query and arguments.
628
	var query string
629
	var args []interface{}
630
	query, args, err = builder.ToSql()
631
	if err != nil {
632
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
633
	}
634
635
	slog.Debug("Generated SQL query: ", slog.String("query", query), "with args", slog.Any("arguments", args))
636
637
	// Execute the query and retrieve the rows.
638
	var rows *sql.Rows
639
	rows, err = tx.QueryContext(ctx, query, args...)
640
	if err != nil {
641
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
642
	}
643
	defer rows.Close()
644
645
	var lastID uint64
646
647
	// Iterate through the rows and scan the result into a RelationTuple struct.
648
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
649
	for rows.Next() {
650
		var subjectID string
651
		err = rows.Scan(&lastID, &subjectID)
652
		if err != nil {
653
			return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
654
		}
655
		subjectIDs = append(subjectIDs, subjectID)
656
	}
657
	// Check for any errors during iteration.
658
	if err = rows.Err(); err != nil {
659
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SCAN)
660
	}
661
662
	// Commit the transaction.
663
	err = tx.Commit()
664
	if err != nil {
665
		return nil, nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
666
	}
667
668
	slog.Info("Successfully retrieved unique subject references from the database.")
669
670
	// Return the results and encoded continuous token for pagination.
671
	if len(subjectIDs) > int(pagination.PageSize()) {
672
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
673
	}
674
675
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
676
}
677
678
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
679
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
680
	// Start a new trace span and end it when the function exits.
681
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
682
	defer span.End()
683
684
	slog.Info("Getting head snapshot for tenantID: ", slog.String("tenant_id", tenantID))
685
686
	var xid types.XID8
687
688
	// Build the query to find the highest transaction ID associated with the tenant.
689
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
690
	query, args, err := builder.ToSql()
691
	if err != nil {
692
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
693
	}
694
695
	// Execute the query and retrieve the highest transaction ID.
696
	row := r.database.DB.QueryRowContext(ctx, query, args...)
697
	err = row.Scan(&xid)
698
	if err != nil {
699
		// If no rows are found, return a snapshot token with a value of 0.
700
		if errors.Is(err, sql.ErrNoRows) {
701
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
702
		}
703
		return nil, utils.HandleError(span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
704
	}
705
706
	slog.Info("Successfully retrieved latest snapshot token")
707
708
	// Return the latest snapshot token associated with the tenant.
709
	return snapshot.Token{Value: xid}, nil
710
}
711