Passed
Push — master ( 136f25...d79624 )
by Tolga
01:26 queued 27s
created

circuitBreaker.*SchemaReader.ListSchemas   A

Complexity

Conditions 3

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 3
dl 0
loc 18
rs 9.75
c 0
b 0
f 0
1
package circuitBreaker
2
3
import (
4
	"context"
5
6
	"github.com/sony/gobreaker"
7
8
	"github.com/Permify/permify/internal/storage"
9
	"github.com/Permify/permify/pkg/database"
10
	base "github.com/Permify/permify/pkg/pb/base/v1"
11
)
12
13
// SchemaReader - Add circuit breaker behaviour to schema reader
14
type SchemaReader struct {
15
	delegate storage.SchemaReader
16
	cb       *gobreaker.CircuitBreaker
17
}
18
19
// NewSchemaReader - Add circuit breaker behaviour to new schema reader
20
func NewSchemaReader(delegate storage.SchemaReader, cb *gobreaker.CircuitBreaker) *SchemaReader {
21
	return &SchemaReader{delegate: delegate, cb: cb}
22
}
23
24
// ReadSchema - Read schema from repository
25
func (r *SchemaReader) ReadSchema(ctx context.Context, tenantID, version string) (*base.SchemaDefinition, error) {
26
	response, err := r.cb.Execute(func() (interface{}, error) {
27
		return r.delegate.ReadSchema(ctx, tenantID, version)
28
	})
29
	if err != nil {
30
		return nil, err
31
	}
32
	return response.(*base.SchemaDefinition), nil
33
}
34
35
// ReadEntityDefinition - Read entity definition from repository
36
func (r *SchemaReader) ReadEntityDefinition(ctx context.Context, tenantID, entityName, version string) (*base.EntityDefinition, string, error) {
37
	type circuitBreakerResponse struct {
38
		Definition *base.EntityDefinition
39
		Version    string
40
	}
41
42
	response, err := r.cb.Execute(func() (interface{}, error) {
43
		var err error
44
		var resp circuitBreakerResponse
45
		resp.Definition, resp.Version, err = r.delegate.ReadEntityDefinition(ctx, tenantID, entityName, version)
46
		return resp, err
47
	})
48
	if err != nil {
49
		return nil, "", err
50
	}
51
52
	resp := response.(circuitBreakerResponse)
53
	return resp.Definition, resp.Version, nil
54
}
55
56
// ReadRuleDefinition - Read rule definition from repository
57
func (r *SchemaReader) ReadRuleDefinition(ctx context.Context, tenantID, ruleName, version string) (*base.RuleDefinition, string, error) {
58
	type circuitBreakerResponse struct {
59
		Definition *base.RuleDefinition
60
		Version    string
61
	}
62
63
	response, err := r.cb.Execute(func() (interface{}, error) {
64
		var err error
65
		var resp circuitBreakerResponse
66
		resp.Definition, resp.Version, err = r.delegate.ReadRuleDefinition(ctx, tenantID, ruleName, version)
67
		return resp, err
68
	})
69
	if err != nil {
70
		return nil, "", err
71
	}
72
73
	resp := response.(circuitBreakerResponse)
74
	return resp.Definition, resp.Version, nil
75
}
76
77
// HeadVersion - Finds the latest version of the schema.
78
func (r *SchemaReader) HeadVersion(ctx context.Context, tenantID string) (version string, err error) {
79
	response, err := r.cb.Execute(func() (interface{}, error) {
80
		return r.delegate.HeadVersion(ctx, tenantID)
81
	})
82
	if err != nil {
83
		return "", err
84
	}
85
	return response.(string), nil
86
}
87
88
// ListSchemas - List all Schemas
89
func (r *SchemaReader) ListSchemas(ctx context.Context, tenantID string, pagination database.Pagination) (schemas []*base.SchemaList, ct database.EncodedContinuousToken, err error) {
90
	type circuitBreakerResponse struct {
91
		Schemas []*base.SchemaList
92
		Ct      database.EncodedContinuousToken
93
	}
94
95
	response, err := r.cb.Execute(func() (interface{}, error) {
96
		var err error
97
		var resp circuitBreakerResponse
98
		resp.Schemas, resp.Ct, err = r.delegate.ListSchemas(ctx, tenantID, pagination)
99
		return resp, err
100
	})
101
	if err != nil {
102
		return nil, nil, err
103
	}
104
105
	resp := response.(circuitBreakerResponse)
106
	return resp.Schemas, resp.Ct, nil
107
}
108