GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 8cec0e...e8c327 )
by Amir
13:23
created

raftproxy.*RaftProxy.Delete   A

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 12
dl 0
loc 19
rs 9.8
c 0
b 0
f 0
nop 1
1
package raftproxy
2
3
import (
4
	"bytes"
5
	"encoding/json"
6
	"errors"
7
	"fmt"
8
	"github.com/arazmj/gerdu/cache"
9
	"github.com/hashicorp/raft"
10
	raftboltdb "github.com/hashicorp/raft-boltdb"
11
	log "github.com/sirupsen/logrus"
12
	"net"
13
	"net/http"
14
	"os"
15
	"path/filepath"
16
	"time"
17
)
18
19
const (
20
	raftTimeout         = 10 * time.Second
21
	retainSnapshotCount = 2
22
	tcpTimeout          = 10 * time.Second
23
)
24
25
type RaftCache interface {
26
	raft.FSM
27
	cache.UnImplementedCache
28
	OpenRaft(storage string) error
29
}
30
31
type RaftProxy struct {
32
	raft     *raft.Raft
33
	Imp      cache.UnImplementedCache
34
	raftAddr string
35
	joinAddr string
36
	localId  string
37
	RaftCache
38
}
39
40
type command struct {
41
	Op    string `json:"op,omitempty"`
42
	Key   string `json:"key,omitempty"`
43
	Value string `json:"value,omitempty"`
44
}
45
46
func NewRaftProxy(imp cache.UnImplementedCache, raftAddr, joinAddr, localId string) *RaftProxy {
47
	return &RaftProxy{
48
		Imp:      imp,
49
		raftAddr: raftAddr,
50
		joinAddr: joinAddr,
51
		localId:  localId,
52
	}
53
}
54
55
// Put updates or insert a new entry, evicts the old entry
56
// if cache size is larger than capacity
57
func (c *RaftProxy) Put(key string, value string) (created bool) {
58
	cmd := &command{
59
		Op:    "put",
60
		Key:   key,
61
		Value: value,
62
	}
63
64
	future, err := c.applyCommand(cmd)
65
66
	if err != nil {
67
		log.Errorf("Error applyCommand %v", err)
68
		return false
69
	}
70
71
	if future.Error() != nil {
72
		log.Errorf("Error in raft apply future %v", future.Error())
73
		return false
74
	}
75
76
	return future.Response().(bool)
77
}
78
79
func (c *RaftProxy) Delete(key string) (ok bool) {
80
	cmd := &command{
81
		Op:  "delete",
82
		Key: key,
83
	}
84
85
	future, err := c.applyCommand(cmd)
86
87
	if err != nil {
88
		log.Errorf("Error applyCommand %v", err)
89
		return false
90
	}
91
92
	if future.Error() != nil {
93
		log.Fatalf("Error in raft apply future %v", future.Error())
94
		return false
95
	}
96
97
	return future.Response().(bool)
98
}
99
100
func (c *RaftProxy) Get(key string) (value string, ok bool) {
101
	cmd := &command{
102
		Op:  "get",
103
		Key: key,
104
	}
105
106
	future, err := c.applyCommand(cmd)
107
108
	if err != nil {
109
		log.Errorf("Error applyCommand %v", err)
110
		return "", false
111
	}
112
113
	if future.Error() != nil {
114
		log.Fatalf("Error in raft apply future %v", future.Error())
115
		return "", false
116
	}
117
118
	response := future.Response().(getResponse)
119
	return response.value, response.ok
120
}
121
122
func (c *RaftProxy) applyCommand(cmd *command) (raft.ApplyFuture, error) {
123
	if c.raft.State() != raft.Leader {
124
		return nil, errors.New(fmt.Sprintf("not a leader but a %v %p", c.raft.State(), c.raft))
125
	}
126
127
	b, err := json.Marshal(cmd)
128
	if err != nil {
129
		return nil, err
130
	}
131
132
	return c.raft.Apply(b, raftTimeout), nil
133
}
134
135
type getResponse struct {
136
	value string
137
	ok    bool
138
}
139
140
type fsm RaftProxy
141
142
func (f *fsm) Apply(l *raft.Log) interface{} {
143
	var cmd command
144
	if err := json.Unmarshal(l.Data, &cmd); err != nil {
145
		log.Fatalf("failed to unmarshal command: %s", err.Error())
146
	}
147
148
	log.Infof("Apply command: %v", cmd)
149
	switch cmd.Op {
150
	case "get":
151
		value, ok := f.Imp.Get(cmd.Key)
152
		response := getResponse{
153
			value: value,
154
			ok:    ok,
155
		}
156
		return response
157
	case "put":
158
		return f.Imp.Put(cmd.Key, cmd.Value)
159
	case "delete":
160
		return f.Imp.Delete(cmd.Key)
161
	default:
162
		log.Fatalf("unrecognized command op: %s", cmd.Op)
163
	}
164
	return nil
165
}
166
167
func (c *RaftProxy) OpenRaft(storage string) error {
168
	// Setup Raft configuration.
169
	config := raft.DefaultConfig()
170
	config.LocalID = raft.ServerID(c.localId)
171
172
	// Setup Raft communication.
173
	addr, err := net.ResolveTCPAddr("tcp", c.raftAddr)
174
	if err != nil {
175
		return err
176
	}
177
	transport, err := raft.NewTCPTransport(c.raftAddr, addr, 3, tcpTimeout, os.Stderr)
178
	if err != nil {
179
		return err
180
	}
181
182
	// Create the snapshot store. This allows the Raft to truncate the log.
183
	snapshots, err := raft.NewFileSnapshotStore(storage, retainSnapshotCount, os.Stderr)
184
	if err != nil {
185
		return fmt.Errorf("file snapshot store: %s", err)
186
	}
187
188
	// Create the log store and stable store.
189
	var logStore raft.LogStore
190
	var stableStore raft.StableStore
191
	if storage == "" {
192
		logStore = raft.NewInmemStore()
193
		stableStore = raft.NewInmemStore()
194
	} else {
195
		boltDB, err := raftboltdb.NewBoltStore(filepath.Join(storage, "raft.db"))
196
		if err != nil {
197
			return fmt.Errorf("new bolt store: %s", err)
198
		}
199
		logStore = boltDB
200
		stableStore = boltDB
201
	}
202
203
	// Instantiate the Raft systems.
204
	ra, err := raft.NewRaft(config, (*fsm)(c), logStore, stableStore, snapshots, transport)
205
	if err != nil {
206
		return fmt.Errorf("new raft: %s", err)
207
	}
208
209
	c.raft = ra
210
211
	if c.joinAddr == "" {
212
		configuration := raft.Configuration{
213
			Servers: []raft.Server{
214
				{
215
					ID:      config.LocalID,
216
					Address: transport.LocalAddr(),
217
				},
218
			},
219
		}
220
		ra.BootstrapCluster(configuration)
221
	} else {
222
		b, err := json.Marshal(map[string]string{"addr": c.raftAddr, "id": c.localId})
223
		if err != nil {
224
			return err
225
		}
226
		resp, err := http.Post(fmt.Sprintf("http://%s/join", c.joinAddr), "", bytes.NewReader(b))
227
		if err != nil {
228
			return err
229
		}
230
		defer resp.Body.Close()
231
232
		return nil
233
234
	}
235
236
	return nil
237
}
238
239
// Join joins a node, identified by nodeID and located at addr, to this store.
240
// The node must be ready to respond to Raft communications at that address.
241
func (c *RaftProxy) Join(nodeID, addr string) error {
242
	log.Infof("received join request for remote node %s at %s", nodeID, addr)
243
244
	configFuture := c.raft.GetConfiguration()
245
	if err := configFuture.Error(); err != nil {
246
		log.Errorf("failed to get raft configuration: %v", err)
247
		return err
248
	}
249
250
	for _, srv := range configFuture.Configuration().Servers {
251
		// If a node already exists with either the joining node's ID or address,
252
		// that node may need to be removed from the config first.
253
		if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) {
254
			// However if *both* the ID and the address are the same, then nothing -- not even
255
			// a join operation -- is needed.
256
			if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) {
257
				log.Warnf("node %s at %s already member of cluster, ignoring join request", nodeID, addr)
258
				return nil
259
			}
260
261
			future := c.raft.RemoveServer(srv.ID, 0, 0)
262
			if err := future.Error(); err != nil {
263
				return fmt.Errorf("error removing existing node %s at %s: %s", nodeID, addr, err)
264
			}
265
		}
266
	}
267
268
	f := c.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
269
	if f.Error() != nil {
270
		return f.Error()
271
	}
272
	log.Infof("node %s at %s joined successfully", nodeID, addr)
273
	return nil
274
}
275