Passed
Pull Request — master (#1603)
by Tolga
03:57
created

circuitBreaker.*DataReader.QueryUniqueEntities   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 5
dl 0
loc 18
rs 9.75
c 0
b 0
f 0
1
package circuitBreaker
2
3
import (
4
	"context"
5
6
	"github.com/sony/gobreaker"
7
8
	"github.com/Permify/permify/internal/storage"
9
	"github.com/Permify/permify/pkg/database"
10
	base "github.com/Permify/permify/pkg/pb/base/v1"
11
	"github.com/Permify/permify/pkg/token"
12
)
13
14
// DataReader - Add circuit breaker behaviour to data reader
15
type DataReader struct {
16
	delegate storage.DataReader
17
	cb       *gobreaker.CircuitBreaker
18
}
19
20
// NewDataReader - Add circuit breaker behaviour to new data reader
21
func NewDataReader(delegate storage.DataReader, cb *gobreaker.CircuitBreaker) *DataReader {
22
	return &DataReader{delegate: delegate, cb: cb}
23
}
24
25
// QueryRelationships - Reads relation tuples from the repository
26
func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, token string, pagination database.CursorPagination) (*database.TupleIterator, error) {
27
	response, err := r.cb.Execute(func() (interface{}, error) {
28
		return r.delegate.QueryRelationships(ctx, tenantID, filter, token, pagination)
29
	})
30
	if err != nil {
31
		return nil, err
32
	}
33
	return response.(*database.TupleIterator), nil
34
}
35
36
// ReadRelationships - Reads relation tuples from the repository with different options.
37
func (r *DataReader) ReadRelationships(ctx context.Context, tenantID string, filter *base.TupleFilter, token string, pagination database.Pagination) (collection *database.TupleCollection, ct database.EncodedContinuousToken, err error) {
38
	type circuitBreakerResponse struct {
39
		Collection      *database.TupleCollection
40
		ContinuousToken database.EncodedContinuousToken
41
	}
42
43
	response, err := r.cb.Execute(func() (interface{}, error) {
44
		var err error
45
		var resp circuitBreakerResponse
46
		resp.Collection, resp.ContinuousToken, err = r.delegate.ReadRelationships(ctx, tenantID, filter, token, pagination)
47
		return resp, err
48
	})
49
	if err != nil {
50
		return nil, nil, err
51
	}
52
53
	resp := response.(circuitBreakerResponse)
54
	return resp.Collection, resp.ContinuousToken, nil
55
}
56
57
// QuerySingleAttribute - Reads a single attribute from the repository.
58
func (r *DataReader) QuerySingleAttribute(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string) (*base.Attribute, error) {
59
	response, err := r.cb.Execute(func() (interface{}, error) {
60
		return r.delegate.QuerySingleAttribute(ctx, tenantID, filter, token)
61
	})
62
	if err != nil {
63
		return nil, err
64
	}
65
	return response.(*base.Attribute), nil
66
}
67
68
// QueryAttributes - Reads multiple attributes from the repository.
69
func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string, pagination database.CursorPagination) (*database.AttributeIterator, error) {
70
	response, err := r.cb.Execute(func() (interface{}, error) {
71
		return r.delegate.QueryAttributes(ctx, tenantID, filter, token, pagination)
72
	})
73
	if err != nil {
74
		return nil, err
75
	}
76
	return response.(*database.AttributeIterator), nil
77
}
78
79
// ReadAttributes - Reads multiple attributes from the repository with different options.
80
func (r *DataReader) ReadAttributes(ctx context.Context, tenantID string, filter *base.AttributeFilter, token string, pagination database.Pagination) (collection *database.AttributeCollection, ct database.EncodedContinuousToken, err error) {
81
	type circuitBreakerResponse struct {
82
		Collection      *database.AttributeCollection
83
		ContinuousToken database.EncodedContinuousToken
84
	}
85
86
	response, err := r.cb.Execute(func() (interface{}, error) {
87
		var err error
88
		var resp circuitBreakerResponse
89
		resp.Collection, resp.ContinuousToken, err = r.delegate.ReadAttributes(ctx, tenantID, filter, token, pagination)
90
		return resp, err
91
	})
92
	if err != nil {
93
		return nil, nil, err
94
	}
95
96
	resp := response.(circuitBreakerResponse)
97
	return resp.Collection, resp.ContinuousToken, nil
98
}
99
100
// QueryUniqueSubjectReferences - Reads unique subject references from the repository with different options.
101
func (r *DataReader) QueryUniqueSubjectReferences(ctx context.Context, tenantID string, subjectReference *base.RelationReference, token string, pagination database.Pagination) (ids []string, ct database.EncodedContinuousToken, err error) {
102
	type circuitBreakerResponse struct {
103
		IDs             []string
104
		ContinuousToken database.EncodedContinuousToken
105
	}
106
107
	response, err := r.cb.Execute(func() (interface{}, error) {
108
		var err error
109
		var resp circuitBreakerResponse
110
		resp.IDs, resp.ContinuousToken, err = r.delegate.QueryUniqueSubjectReferences(ctx, tenantID, subjectReference, token, pagination)
111
		return resp, err
112
	})
113
	if err != nil {
114
		return nil, nil, err
115
	}
116
117
	resp := response.(circuitBreakerResponse)
118
	return resp.IDs, resp.ContinuousToken, nil
119
}
120
121
// HeadSnapshot - Reads the latest version of the snapshot from the repository.
122
func (r *DataReader) HeadSnapshot(ctx context.Context, tenantID string) (token.SnapToken, error) {
123
	response, err := r.cb.Execute(func() (interface{}, error) {
124
		return r.delegate.HeadSnapshot(ctx, tenantID)
125
	})
126
	if err != nil {
127
		return nil, err
128
	}
129
	return response.(token.SnapToken), nil
130
}
131