Issues (1008)

athena/client_xapi.go (2 issues)

Severity
1
package athena
2
3
import (
4
	"context"
5
	"fmt"
6
	"math"
7
	"math/rand"
8
	"time"
9
)
10
11
const (
12
	defaultMaxRetry = 5
13
)
14
15
var defaultWaitFunc = func(i int) {
16
	const initialWaitSec = 10
17
	expo := math.Pow(2, float64(i)) * initialWaitSec
18
	time.Sleep(time.Duration(rand.Float64()*expo) * time.Second) // #nosec G404
19
}
20
21
// XQueryResults executes a query and waits for fetching complete results.
22
func (svc *Athena) XQueryResults(ctx context.Context, r XQueryResultsRequest) (*GetQueryResultsResult, error) {
23
	out, err := svc.StartQueryExecution(ctx, StartQueryExecutionRequest{
24
		QueryString:           r.QueryString,
25
		ClientRequestToken:    r.ClientRequestToken,
26
		QueryExecutionContext: r.QueryExecutionContext,
27
		ResultConfiguration:   r.ResultConfiguration,
28
		WorkGroup:             r.WorkGroup,
29
	})
30
	if err != nil {
31
		return nil, err
32
	}
33
34
	maxRetry := r.MaxRetry
35
	if maxRetry == 0 {
36
		maxRetry = defaultMaxRetry
37
	}
38
	waitFn := r.WaitFunc
39
	if waitFn == nil {
40
		waitFn = defaultWaitFunc
41
	}
42
43
	return svc.waitQueryResult(ctx, out.QueryExecutionID, maxRetry, waitFn)
44
}
45
46
type XQueryResultsRequest struct {
0 ignored issues
show
exported type XQueryResultsRequest should have comment or be unexported
Loading history...
47
	QueryString string
48
49
	// optional
50
	ClientRequestToken    string
51
	QueryExecutionContext QueryExecutionContext
52
	ResultConfiguration   ResultConfiguration
53
	WorkGroup             string
54
55
	// extension
56
	MaxRetry int         // default=5
57
	WaitFunc func(i int) // waiting strategy, default=exponential backoff with full jitter from 10sec
58
}
59
60
func (svc *Athena) waitQueryResult(ctx context.Context, queryID string, maxRetry int, waitFn func(int)) (*GetQueryResultsResult, error) {
61
	for i := 0; i < maxRetry; i++ {
62
		waitFn(i) // wait
63
64
		res, err := svc.GetQueryExecution(ctx, GetQueryExecutionRequest{
65
			QueryExecutionID: queryID,
66
		})
67
		if err != nil {
68
			return nil, err
69
		}
70
71
		s := res.Status.State
72
		switch {
73
		case s.IsSucceeded():
74
			return svc.XGetQueryResultsAll(ctx, queryID)
75
		case s.IsRunning(),
76
			s.IsQueued():
77
			// do nothing; continue loop
78
		default:
79
			return nil, fmt.Errorf("Invalid query status: id:[%s], status:[%s]", queryID, string(s))
80
		}
81
	}
82
	return nil, fmt.Errorf("Timeout error: id:[%s]", queryID)
83
}
84
85
func (svc *Athena) XGetQueryResultsAll(ctx context.Context, queryID string) (*GetQueryResultsResult, error) {
0 ignored issues
show
exported method Athena.XGetQueryResultsAll should have comment or be unexported
Loading history...
86
	out, err := svc.GetQueryResults(ctx, GetQueryResultsRequest{
87
		QueryExecutionID: queryID,
88
	})
89
	if err != nil {
90
		return nil, err
91
	}
92
93
	rows := out.ResultSet.Rows
94
	for out.NextToken != "" {
95
		out, err = svc.GetQueryResults(ctx, GetQueryResultsRequest{
96
			QueryExecutionID: queryID,
97
			NextToken:        out.NextToken,
98
		})
99
		if err != nil {
100
			return nil, err
101
		}
102
		rows = append(rows, out.ResultSet.Rows...)
103
	}
104
	out.ResultSet.Rows = rows
105
	return out, nil
106
}
107