1 | package athena |
||
2 | |||
3 | import ( |
||
4 | "context" |
||
5 | "fmt" |
||
6 | "math" |
||
7 | "math/rand" |
||
8 | "time" |
||
9 | ) |
||
10 | |||
11 | const ( |
||
12 | defaultMaxRetry = 5 |
||
13 | ) |
||
14 | |||
15 | var defaultWaitFunc = func(i int) { |
||
16 | const initialWaitSec = 10 |
||
17 | expo := math.Pow(2, float64(i)) * initialWaitSec |
||
18 | time.Sleep(time.Duration(rand.Float64()*expo) * time.Second) // #nosec G404 |
||
19 | } |
||
20 | |||
21 | // XQueryResults executes a query and waits for fetching complete results. |
||
22 | func (svc *Athena) XQueryResults(ctx context.Context, r XQueryResultsRequest) (*GetQueryResultsResult, error) { |
||
23 | out, err := svc.StartQueryExecution(ctx, StartQueryExecutionRequest{ |
||
24 | QueryString: r.QueryString, |
||
25 | ClientRequestToken: r.ClientRequestToken, |
||
26 | QueryExecutionContext: r.QueryExecutionContext, |
||
27 | ResultConfiguration: r.ResultConfiguration, |
||
28 | WorkGroup: r.WorkGroup, |
||
29 | }) |
||
30 | if err != nil { |
||
31 | return nil, err |
||
32 | } |
||
33 | |||
34 | maxRetry := r.MaxRetry |
||
35 | if maxRetry == 0 { |
||
36 | maxRetry = defaultMaxRetry |
||
37 | } |
||
38 | waitFn := r.WaitFunc |
||
39 | if waitFn == nil { |
||
40 | waitFn = defaultWaitFunc |
||
41 | } |
||
42 | |||
43 | return svc.waitQueryResult(ctx, out.QueryExecutionID, maxRetry, waitFn) |
||
44 | } |
||
45 | |||
46 | type XQueryResultsRequest struct { |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
47 | QueryString string |
||
48 | |||
49 | // optional |
||
50 | ClientRequestToken string |
||
51 | QueryExecutionContext QueryExecutionContext |
||
52 | ResultConfiguration ResultConfiguration |
||
53 | WorkGroup string |
||
54 | |||
55 | // extension |
||
56 | MaxRetry int // default=5 |
||
57 | WaitFunc func(i int) // waiting strategy, default=exponential backoff with full jitter from 10sec |
||
58 | } |
||
59 | |||
60 | func (svc *Athena) waitQueryResult(ctx context.Context, queryID string, maxRetry int, waitFn func(int)) (*GetQueryResultsResult, error) { |
||
61 | for i := 0; i < maxRetry; i++ { |
||
62 | waitFn(i) // wait |
||
63 | |||
64 | res, err := svc.GetQueryExecution(ctx, GetQueryExecutionRequest{ |
||
65 | QueryExecutionID: queryID, |
||
66 | }) |
||
67 | if err != nil { |
||
68 | return nil, err |
||
69 | } |
||
70 | |||
71 | s := res.Status.State |
||
72 | switch { |
||
73 | case s.IsSucceeded(): |
||
74 | return svc.XGetQueryResultsAll(ctx, queryID) |
||
75 | case s.IsRunning(), |
||
76 | s.IsQueued(): |
||
77 | // do nothing; continue loop |
||
78 | default: |
||
79 | return nil, fmt.Errorf("Invalid query status: id:[%s], status:[%s]", queryID, string(s)) |
||
80 | } |
||
81 | } |
||
82 | return nil, fmt.Errorf("Timeout error: id:[%s]", queryID) |
||
83 | } |
||
84 | |||
85 | func (svc *Athena) XGetQueryResultsAll(ctx context.Context, queryID string) (*GetQueryResultsResult, error) { |
||
0 ignored issues
–
show
|
|||
86 | out, err := svc.GetQueryResults(ctx, GetQueryResultsRequest{ |
||
87 | QueryExecutionID: queryID, |
||
88 | }) |
||
89 | if err != nil { |
||
90 | return nil, err |
||
91 | } |
||
92 | |||
93 | rows := out.ResultSet.Rows |
||
94 | for out.NextToken != "" { |
||
95 | out, err = svc.GetQueryResults(ctx, GetQueryResultsRequest{ |
||
96 | QueryExecutionID: queryID, |
||
97 | NextToken: out.NextToken, |
||
98 | }) |
||
99 | if err != nil { |
||
100 | return nil, err |
||
101 | } |
||
102 | rows = append(rows, out.ResultSet.Rows...) |
||
103 | } |
||
104 | out.ResultSet.Rows = rows |
||
105 | return out, nil |
||
106 | } |
||
107 |