Passed
Push — master ( e75ac7...396e24 )
by Tolga
01:19 queued 26s
created

postgres.*DataReader.QueryRelationships   B

Complexity

Conditions 7

Size

Total Lines 58
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 36
nop 4
dl 0
loc 58
rs 7.616
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
package postgres
2
3
import (
4
	"context"
5
	"errors"
6
	"fmt"
7
	"log/slog"
8
	"strconv"
9
	"strings"
10
11
	"github.com/jackc/pgx/v5"
12
13
	"github.com/Masterminds/squirrel"
14
	"github.com/golang/protobuf/jsonpb"
15
	"google.golang.org/protobuf/types/known/anypb"
16
17
	"github.com/Permify/permify/internal/storage"
18
	"github.com/Permify/permify/internal/storage/postgres/snapshot"
19
	"github.com/Permify/permify/internal/storage/postgres/types"
20
	"github.com/Permify/permify/internal/storage/postgres/utils"
21
	"github.com/Permify/permify/pkg/database"
22
	db "github.com/Permify/permify/pkg/database/postgres"
23
	base "github.com/Permify/permify/pkg/pb/base/v1"
24
	"github.com/Permify/permify/pkg/token"
25
)
26
27
// DataReader is a struct which holds a reference to the database, transaction options and a logger.
28
// It is responsible for reading data from the database.
29
type DataReader struct {
30
	database  *db.Postgres  // database is an instance of the PostgreSQL database
31
	txOptions pgx.TxOptions // txOptions specifies the isolation level for database transaction and sets it as read only
32
}
33
34
// NewDataReader is a constructor function for DataReader.
35
// It initializes a new DataReader with a given database, a logger, and sets transaction options to be read-only with Repeatable Read isolation level.
36
func NewDataReader(database *db.Postgres) *DataReader {
37
	return &DataReader{
38
		database:  database,                                                             // Set the database to the passed in PostgreSQL instance
39
		txOptions: pgx.TxOptions{IsoLevel: pgx.ReadCommitted, AccessMode: pgx.ReadOnly}, // Set the transaction options
40
	}
41
}
42
43
// QueryRelationships reads relation tuples from the storage based on the given filter.
44
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string) (it *database.TupleIterator, err error) {
45
	// Start a new trace span and end it when the function exits.
46
	ctx, span := tracer.Start(ctx, "data-reader.query-relationships")
47
	defer span.End()
48
49
	slog.Debug("querying relationships for tenant_id", slog.String("tenant_id", tenantID))
50
51
	// Decode the snapshot value.
52
	var st token.SnapToken
53
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
54
	if err != nil {
55
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
56
	}
57
58
	// Build the relationships query based on the provided filter and snapshot value.
59
	var args []interface{}
60
	builder := r.database.Builder.Select("entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
61
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
62
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
63
64
	// Generate the SQL query and arguments.
65
	var query string
66
	query, args, err = builder.ToSql()
67
	if err != nil {
68
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
69
	}
70
71
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
72
73
	// Execute the SQL query and retrieve the result rows.
74
	var rows pgx.Rows
75
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
76
	if err != nil {
77
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
78
	}
79
	defer rows.Close()
80
81
	// Process the result rows and store the relationships in a TupleCollection.
82
	collection := database.NewTupleCollection()
83
	for rows.Next() {
84
		rt := storage.RelationTuple{}
85
		err = rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
86
		if err != nil {
87
			fmt.Println("=-=-=-=-=-=-= QueryRelationships SCAN 1 =-=-=-=-=-=-=")
88
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
89
		}
90
		collection.Add(rt.ToTuple())
91
	}
92
	if err = rows.Err(); err != nil {
93
		fmt.Println("=-=-=-=-=-=-= QueryRelationships SCAN 2 =-=-=-=-=-=-=")
94
		fmt.Println(err)
95
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
96
	}
97
98
	slog.Debug("successfully retrieved relation tuples from the database")
99
100
	// Return a TupleIterator created from the TupleCollection.
101
	return collection.CreateTupleIterator(), nil
102
}
103
104
// ReadRelationships reads relation tuples from the storage based on the given filter and pagination.
105
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, snap string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
106
	// Start a new trace span and end it when the function exits.
107
	ctx, span := tracer.Start(ctx, "data-reader.read-relationships")
108
	defer span.End()
109
110
	slog.Debug("reading relationships for tenant_id", slog.String("tenant_id", tenantID))
111
112
	// Decode the snapshot value.
113
	var st token.SnapToken
114
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
115
	if err != nil {
116
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
117
	}
118
119
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
120
	builder := r.database.Builder.Select("id, entity_type, entity_id, relation, subject_type, subject_id, subject_relation").From(RelationTuplesTable).Where(squirrel.Eq{"tenant_id": tenantID})
121
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, filter)
122
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
123
124
	// Apply the pagination token and limit to the query.
125
	if pagination.Token() != "" {
126
		var t database.ContinuousToken
127
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
128
		if err != nil {
129
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
130
		}
131
		var v uint64
132
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
133
		if err != nil {
134
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
135
		}
136
		builder = builder.Where(squirrel.GtOrEq{"id": v})
137
	}
138
139
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
140
141
	// Generate the SQL query and arguments.
142
	var query string
143
	var args []interface{}
144
	query, args, err = builder.ToSql()
145
	if err != nil {
146
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
147
	}
148
149
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
150
151
	// Execute the query and retrieve the rows.
152
	var rows pgx.Rows
153
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
154
	if err != nil {
155
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
156
	}
157
	defer rows.Close()
158
159
	var lastID uint64
160
161
	// Iterate through the rows and scan the result into a RelationTuple struct.
162
	tuples := make([]*base.Tuple, 0, pagination.PageSize()+1)
163
	for rows.Next() {
164
		rt := storage.RelationTuple{}
165
		err = rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Relation, &rt.SubjectType, &rt.SubjectID, &rt.SubjectRelation)
166
		if err != nil {
167
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
168
		}
169
		lastID = rt.ID
170
		tuples = append(tuples, rt.ToTuple())
171
	}
172
	// Check for any errors during iteration.
173
	if err = rows.Err(); err != nil {
174
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
175
	}
176
177
	slog.Debug("successfully read relation tuples from database")
178
179
	// Return the results and encoded continuous token for pagination.
180
	if len(tuples) > int(pagination.PageSize()) {
181
		return database.NewTupleCollection(tuples[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
182
	}
183
184
	return database.NewTupleCollection(tuples...), database.NewNoopContinuousToken().Encode(), nil
185
}
186
187
// QuerySingleAttribute retrieves a single attribute from the storage based on the given filter.
188
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (attribute *base.Attribute, err error) {
189
	// Start a new trace span and end it when the function exits.
190
	ctx, span := tracer.Start(ctx, "data-reader.query-single-attribute")
191
	defer span.End()
192
193
	slog.Debug("querying single attribute for tenant_id", slog.String("tenant_id", tenantID))
194
195
	// Decode the snapshot value.
196
	var st token.SnapToken
197
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
198
	if err != nil {
199
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
200
	}
201
202
	// Build the relationships query based on the provided filter and snapshot value.
203
	var args []interface{}
204
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
205
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
206
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
207
208
	// Generate the SQL query and arguments.
209
	var query string
210
	query, args, err = builder.ToSql()
211
	if err != nil {
212
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
213
	}
214
215
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
216
217
	row := r.database.ReadPool.QueryRow(ctx, query, args...)
218
219
	rt := storage.Attribute{}
220
221
	// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
222
	var valueStr string
223
224
	// Scan the row from the database into the fields of `rt` and `valueStr`.
225
	err = row.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
226
	if err != nil {
227
		if errors.Is(err, pgx.ErrNoRows) {
228
			return nil, nil
229
		} else {
230
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
231
		}
232
	}
233
234
	// Unmarshal the JSON data from `valueStr` into `rt.Value`.
235
	rt.Value = &anypb.Any{}
236
	unmarshaler := &jsonpb.Unmarshaler{}
237
	err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
238
	if err != nil {
239
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
240
	}
241
242
	slog.Debug("successfully retrieved Single attribute from the database")
243
244
	return rt.ToAttribute(), nil
245
}
246
247
// QueryAttributes reads multiple attributes from the storage based on the given filter.
248
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string) (it *database.AttributeIterator, err error) {
249
	// Start a new trace span and end it when the function exits.
250
	ctx, span := tracer.Start(ctx, "data-reader.query-attributes")
251
	defer span.End()
252
253
	slog.Debug("querying Attributes for tenant_id", slog.String("tenant_id", tenantID))
254
255
	// Decode the snapshot value.
256
	var st token.SnapToken
257
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
258
	if err != nil {
259
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
260
	}
261
262
	// Build the relationships query based on the provided filter and snapshot value.
263
	var args []interface{}
264
	builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
265
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
266
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
267
268
	// Generate the SQL query and arguments.
269
	var query string
270
	query, args, err = builder.ToSql()
271
	if err != nil {
272
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
273
	}
274
275
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
276
277
	// Execute the SQL query and retrieve the result rows.
278
	var rows pgx.Rows
279
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
280
	if err != nil {
281
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
282
	}
283
	defer rows.Close()
284
285
	// Process the result rows and store the relationships in a TupleCollection.
286
	collection := database.NewAttributeCollection()
287
	for rows.Next() {
288
		rt := storage.Attribute{}
289
290
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
291
		var valueStr string
292
293
		// Scan the row from the database into the fields of `rt` and `valueStr`.
294
		err := rows.Scan(&rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
295
		if err != nil {
296
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
297
		}
298
299
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
300
		rt.Value = &anypb.Any{}
301
		unmarshaler := &jsonpb.Unmarshaler{}
302
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
303
		if err != nil {
304
			return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
305
		}
306
307
		collection.Add(rt.ToAttribute())
308
	}
309
	if err = rows.Err(); err != nil {
310
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
311
	}
312
313
	slog.Debug("successfully retrieved attributes tuples from the database")
314
315
	// Return a TupleIterator created from the TupleCollection.
316
	return collection.CreateAttributeIterator(), nil
317
}
318
319
// ReadAttributes reads multiple attributes from the storage based on the given filter and pagination.
320
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, snap string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
321
	// Start a new trace span and end it when the function exits.
322
	ctx, span := tracer.Start(ctx, "data-reader.read-attributes")
323
	defer span.End()
324
325
	slog.Debug("reading attributes for tenant_id", slog.String("tenant_id", tenantID))
326
327
	// Decode the snapshot value.
328
	var st token.SnapToken
329
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
330
	if err != nil {
331
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
332
	}
333
334
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
335
	builder := r.database.Builder.Select("id, entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
336
	builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
337
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
338
339
	// Apply the pagination token and limit to the query.
340
	if pagination.Token() != "" {
341
		var t database.ContinuousToken
342
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
343
		if err != nil {
344
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
345
		}
346
		var v uint64
347
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
348
		if err != nil {
349
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
350
		}
351
		builder = builder.Where(squirrel.GtOrEq{"id": v})
352
	}
353
354
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
355
356
	// Generate the SQL query and arguments.
357
	var query string
358
	var args []interface{}
359
	query, args, err = builder.ToSql()
360
	if err != nil {
361
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
362
	}
363
364
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
365
366
	// Execute the query and retrieve the rows.
367
	var rows pgx.Rows
368
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
369
	if err != nil {
370
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
371
	}
372
	defer rows.Close()
373
374
	var lastID uint64
375
376
	// Iterate through the rows and scan the result into a RelationTuple struct.
377
	attributes := make([]*base.Attribute, 0, pagination.PageSize()+1)
378
	for rows.Next() {
379
		rt := storage.Attribute{}
380
381
		// Suppose you have a struct `rt` with a field `Value` of type `*anypb.Any`.
382
		var valueStr string
383
384
		// Scan the row from the database into the fields of `rt` and `valueStr`.
385
		err := rows.Scan(&rt.ID, &rt.EntityType, &rt.EntityID, &rt.Attribute, &valueStr)
386
		if err != nil {
387
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
388
		}
389
		lastID = rt.ID
390
391
		// Unmarshal the JSON data from `valueStr` into `rt.Value`.
392
		rt.Value = &anypb.Any{}
393
		unmarshaler := &jsonpb.Unmarshaler{}
394
		err = unmarshaler.Unmarshal(strings.NewReader(valueStr), rt.Value)
395
		if err != nil {
396
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
397
		}
398
399
		attributes = append(attributes, rt.ToAttribute())
400
	}
401
	// Check for any errors during iteration.
402
	if err = rows.Err(); err != nil {
403
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
404
	}
405
406
	slog.Debug("successfully read attributes from the database")
407
408
	// Return the results and encoded continuous token for pagination.
409
	if len(attributes) > int(pagination.PageSize()) {
410
		return database.NewAttributeCollection(attributes[:pagination.PageSize()]...), utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
411
	}
412
413
	return database.NewAttributeCollection(attributes...), database.NewNoopContinuousToken().Encode(), nil
414
}
415
416
// QueryUniqueEntities reads unique entities from the storage based on the given filter and pagination.
417
func (r *DataReader) QueryUniqueEntities(ctx context.Context, tenantID, name, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
418
	// Start a new trace span and end it when the function exits.
419
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-entities")
420
	defer span.End()
421
422
	slog.Debug("querying unique entities for tenant_id", slog.String("tenant_id", tenantID))
423
424
	// Decode the snapshot value.
425
	var st token.SnapToken
426
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
427
	if err != nil {
428
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
429
	}
430
431
	query := utils.BulkEntityFilterQuery(tenantID, name, st.(snapshot.Token).Value.Uint)
432
433
	// Apply the pagination token and limit to the subQuery.
434
	if pagination.Token() != "" {
435
		var t database.ContinuousToken
436
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
437
		if err != nil {
438
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
439
		}
440
		var v uint64
441
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
442
		if err != nil {
443
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
444
		}
445
446
		query = fmt.Sprintf("%s WHERE id >= %s", query, strconv.FormatUint(v, 10))
447
	}
448
449
	// Append ORDER BY and LIMIT clauses.
450
	query = fmt.Sprintf("%s ORDER BY id LIMIT %d", query, pagination.PageSize()+1)
451
452
	slog.Debug("generated sql query", slog.String("query", query))
453
454
	// Execute the query and retrieve the rows.
455
	var rows pgx.Rows
456
	rows, err = r.database.ReadPool.Query(ctx, query)
457
	if err != nil {
458
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
459
	}
460
	defer rows.Close()
461
462
	var lastID uint64
463
464
	// Iterate through the rows and scan the result into a RelationTuple struct.
465
	entityIDs := make([]string, 0, pagination.PageSize()+1)
466
	for rows.Next() {
467
		var entityId string
468
		err = rows.Scan(&lastID, &entityId)
469
		if err != nil {
470
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
471
		}
472
473
		entityIDs = append(entityIDs, entityId)
474
	}
475
476
	// Check for any errors during iteration.
477
	if err = rows.Err(); err != nil {
478
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
479
	}
480
481
	slog.Debug("successfully retrieved unique entities from the database")
482
483
	// Return the results and encoded continuous token for pagination.
484
	if len(entityIDs) > int(pagination.PageSize()) {
485
		return entityIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
486
	}
487
488
	return entityIDs, database.NewNoopContinuousToken().Encode(), nil
489
}
490
491
// QueryUniqueSubjectReferences reads unique subject references from the storage based on the given filter and pagination.
492
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, snap string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
493
	// Start a new trace span and end it when the function exits.
494
	ctx, span := tracer.Start(ctx, "data-reader.query-unique-subject-reference")
495
	defer span.End()
496
497
	slog.Debug("querying unique subject references for tenant_id", slog.String("tenant_id", tenantID))
498
499
	// Decode the snapshot value.
500
	var st token.SnapToken
501
	st, err = snapshot.EncodedToken{Value: snap}.Decode()
502
	if err != nil {
503
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
504
	}
505
506
	// Build the relationships query based on the provided filter, snapshot value, and pagination settings.
507
	builder := r.database.Builder.
508
		Select("MIN(id) as id, subject_id"). // This will pick the smallest `id` for each unique `subject_id`.
509
		From(RelationTuplesTable).
510
		Where(squirrel.Eq{"tenant_id": tenantID}).
511
		GroupBy("subject_id")
512
	builder = utils.TuplesFilterQueryForSelectBuilder(builder, &base.TupleFilter{Subject: &base.SubjectFilter{Type: subjectReference.GetType(), Relation: subjectReference.GetRelation()}})
513
	builder = utils.SnapshotQuery(builder, st.(snapshot.Token).Value.Uint)
514
515
	// Apply the pagination token and limit to the query.
516
	if pagination.Token() != "" {
517
		var t database.ContinuousToken
518
		t, err = utils.EncodedContinuousToken{Value: pagination.Token()}.Decode()
519
		if err != nil {
520
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
521
		}
522
		var v uint64
523
		v, err = strconv.ParseUint(t.(utils.ContinuousToken).Value, 10, 64)
524
		if err != nil {
525
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INVALID_CONTINUOUS_TOKEN)
526
		}
527
		builder = builder.Where(squirrel.GtOrEq{"id": v})
528
	}
529
530
	builder = builder.OrderBy("id").Limit(uint64(pagination.PageSize() + 1))
531
532
	// Generate the SQL query and arguments.
533
	var query string
534
	var args []interface{}
535
	query, args, err = builder.ToSql()
536
	if err != nil {
537
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
538
	}
539
540
	slog.Debug("generated sql query", slog.String("query", query), "with args", slog.Any("arguments", args))
541
542
	// Execute the query and retrieve the rows.
543
	var rows pgx.Rows
544
	rows, err = r.database.ReadPool.Query(ctx, query, args...)
545
	if err != nil {
546
		return nil, database.NewNoopContinuousToken().Encode(), utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_EXECUTION)
547
	}
548
	defer rows.Close()
549
550
	var lastID uint64
551
552
	// Iterate through the rows and scan the result into a RelationTuple struct.
553
	subjectIDs := make([]string, 0, pagination.PageSize()+1)
554
	for rows.Next() {
555
		var subjectID string
556
		err = rows.Scan(&lastID, &subjectID)
557
		if err != nil {
558
			return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
559
		}
560
		subjectIDs = append(subjectIDs, subjectID)
561
	}
562
	// Check for any errors during iteration.
563
	if err = rows.Err(); err != nil {
564
		return nil, nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
565
	}
566
567
	slog.Debug("successfully retrieved unique subject references from the database")
568
569
	// Return the results and encoded continuous token for pagination.
570
	if len(subjectIDs) > int(pagination.PageSize()) {
571
		return subjectIDs[:pagination.PageSize()], utils.NewContinuousToken(strconv.FormatUint(lastID, 10)).Encode(), nil
572
	}
573
574
	return subjectIDs, database.NewNoopContinuousToken().Encode(), nil
575
}
576
577
// HeadSnapshot retrieves the latest snapshot token associated with the tenant.
578
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
579
	// Start a new trace span and end it when the function exits.
580
	ctx, span := tracer.Start(ctx, "data-reader.head-snapshot")
581
	defer span.End()
582
583
	slog.Debug("getting head snapshot for tenant_id", slog.String("tenant_id", tenantID))
584
585
	var xid types.XID8
586
587
	// Build the query to find the highest transaction ID associated with the tenant.
588
	builder := r.database.Builder.Select("id").From(TransactionsTable).Where(squirrel.Eq{"tenant_id": tenantID}).OrderBy("id DESC").Limit(1)
589
	query, args, err := builder.ToSql()
590
	if err != nil {
591
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SQL_BUILDER)
592
	}
593
594
	// Execute the query and retrieve the highest transaction ID.
595
	err = r.database.ReadPool.QueryRow(ctx, query, args...).Scan(&xid)
596
	if err != nil {
597
		// If no rows are found, return a snapshot token with a value of 0.
598
		if errors.Is(err, pgx.ErrNoRows) {
599
			return snapshot.Token{Value: types.XID8{Uint: 0}}, nil
600
		}
601
		fmt.Println("=-=-=-=-=-=-= HeadSnapshot SCAN 1 =-=-=-=-=-=-=")
602
		return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_SCAN)
603
	}
604
605
	slog.Debug("successfully retrieved latest snapshot token")
606
607
	// Return the latest snapshot token associated with the tenant.
608
	return snapshot.Token{Value: xid}, nil
609
}
610