Skip to content

Commit 74a719a

Browse files
committed
add basic snapshotting
Signed-off-by: Khiem Nguyen <ppkhiemnguyen@gmail.com>
1 parent e0a299a commit 74a719a

File tree

5 files changed

+211
-8
lines changed

5 files changed

+211
-8
lines changed

contrib/raftsimple/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
run:
22
rm -f raftsimple
33
go build -o raftsimple
4-
./raftsimple -nodes 3
4+
./raftsimple -nodes 3

contrib/raftsimple/main.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"flag"
5+
"fmt"
56
"log"
67
"sync"
78

@@ -27,9 +28,15 @@ func (oc *orchest) createNode(nodeID uint64, peers []uint64) bool {
2728
confChangeC := make(chan raftpb.ConfChange)
2829
// defer close(confChangeC)
2930

31+
snapdir := fmt.Sprintf("raftsimple-%d-snap", nodeID)
32+
ss, err := newSnapshotStorage(snapdir)
33+
if err != nil {
34+
log.Fatalf("raftsimple: %v", err)
35+
}
36+
3037
kvs, fsm := newKVStore(proposeC)
3138

32-
rn := newRaftNode(nodeID, peers, fsm, oc.nw, proposeC, confChangeC)
39+
rn := newRaftNode(nodeID, peers, fsm, ss, oc.nw, proposeC, confChangeC)
3340

3441
oc.nw.register(nodeID, rn)
3542

contrib/raftsimple/network.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"log"
56

67
"go.etcd.io/raft/v3/raftpb"
78
)
@@ -22,7 +23,10 @@ func (nw *network) deregister(nodeID uint64) {
2223
}
2324

2425
func (nw *network) send(m raftpb.Message) {
25-
p := nw.peers[m.To]
26+
p, ok := nw.peers[m.To]
27+
if !ok {
28+
log.Fatalf("node %d: unable to find node %d to send %s\n", m.From, m.To, m.Type.String())
29+
}
2630
_ = p.node.Step(context.TODO(), m)
2731
}
2832

contrib/raftsimple/raft.go

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"log"
7+
"os"
68
"time"
79

810
"go.etcd.io/raft/v3"
@@ -43,7 +45,13 @@ type raftNode struct {
4345
id uint64
4446
peers []raft.Peer
4547
fsm FSM
46-
t transport
48+
49+
ss snapshotStorage
50+
confState raftpb.ConfState
51+
snapshotIndex uint64
52+
appliedIndex uint64
53+
snapCount uint64
54+
t transport
4755

4856
// When serveChannels is done, `err` is set to any error and then
4957
// `done` is closed.
@@ -59,7 +67,9 @@ type raftNode struct {
5967
// httpdonec chan struct{} // signals http server shutdown complete
6068
}
6169

62-
func newRaftNode(id uint64, peers []uint64, fsm FSM, nw *network, proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) *raftNode {
70+
var defaultSnapshotCount uint64 = 10000
71+
72+
func newRaftNode(id uint64, peers []uint64, fsm FSM, ss snapshotStorage, nw *network, proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) *raftNode {
6373
commitC := make(chan *commit)
6474
errorC := make(chan error)
6575

@@ -79,13 +89,15 @@ func newRaftNode(id uint64, peers []uint64, fsm FSM, nw *network, proposeC <-cha
7989
id: id,
8090
peers: rpeers,
8191
fsm: fsm,
92+
ss: ss,
93+
snapCount: defaultSnapshotCount,
8294
t: t,
8395
stopc: make(chan struct{}),
8496
// httpstopc: make(chan struct{}),
8597
// httpdonec: make(chan struct{}),
8698
}
8799

88-
// TODO: load and apply snapshots here
100+
rc.loadAndApplySnapshot()
89101

90102
go rc.startRaft()
91103
return rc
@@ -112,7 +124,92 @@ func (rc *raftNode) startRaft() {
112124
go rc.serveChannels()
113125
}
114126

127+
// loadAndApplySnapshot loads the most recent snapshot from the
128+
// snapshot storage (if any) and applies it to the current state.
129+
func (rc *raftNode) loadAndApplySnapshot() {
130+
snapshot, err := rc.ss.load()
131+
if err != nil {
132+
if errors.Is(err, os.ErrNotExist) {
133+
// No snapshots available; do nothing.
134+
return
135+
}
136+
log.Panic(err)
137+
}
138+
139+
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
140+
if err := rc.fsm.RestoreSnapshot(snapshot.Data); err != nil {
141+
log.Panic(err)
142+
}
143+
}
144+
145+
func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) {
146+
if raft.IsEmptySnap(snapshotToSave) {
147+
return
148+
}
149+
150+
log.Printf("publishing snapshot at index %d", rc.snapshotIndex)
151+
defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex)
152+
153+
if snapshotToSave.Metadata.Index <= rc.appliedIndex {
154+
log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d]", snapshotToSave.Metadata.Index, rc.appliedIndex)
155+
}
156+
rc.commitC <- nil // trigger kvstore to load snapshot
157+
158+
rc.confState = snapshotToSave.Metadata.ConfState
159+
rc.snapshotIndex = snapshotToSave.Metadata.Index
160+
rc.appliedIndex = snapshotToSave.Metadata.Index
161+
}
162+
163+
var snapshotCatchUpEntriesN uint64 = 10000
164+
165+
func (rc *raftNode) maybeTriggerSnapshot(applyDoneC <-chan struct{}) {
166+
if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount {
167+
return
168+
}
169+
170+
// wait until all committed entries are applied (or server is closed)
171+
if applyDoneC != nil {
172+
select {
173+
case <-applyDoneC:
174+
case <-rc.stopc:
175+
return
176+
}
177+
}
178+
179+
log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex)
180+
data, err := rc.fsm.TakeSnapshot()
181+
if err != nil {
182+
log.Panic(err)
183+
}
184+
snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data)
185+
if err != nil {
186+
panic(err)
187+
}
188+
if err := rc.saveSnap(snap); err != nil {
189+
panic(err)
190+
}
191+
192+
compactIndex := uint64(1)
193+
if rc.appliedIndex > snapshotCatchUpEntriesN {
194+
compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN
195+
}
196+
if err := rc.raftStorage.Compact(compactIndex); err != nil {
197+
panic(err)
198+
}
199+
200+
log.Printf("compacted log at index %d", compactIndex)
201+
rc.snapshotIndex = rc.appliedIndex
202+
}
203+
115204
func (rc *raftNode) serveChannels() {
205+
snap, err := rc.raftStorage.Snapshot()
206+
if err != nil {
207+
panic(err)
208+
}
209+
rc.confState = snap.Metadata.ConfState
210+
rc.snapshotIndex = snap.Metadata.Index
211+
rc.appliedIndex = snap.Metadata.Index
212+
116213
ticker := time.NewTicker(100 * time.Millisecond)
117214
defer ticker.Stop()
118215

@@ -151,6 +248,11 @@ func (rc *raftNode) serveChannels() {
151248
// store raft entries, then publish over commit channel
152249
case rd := <-rc.node.Ready():
153250
// saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
251+
if !raft.IsEmptySnap(rd.Snapshot) {
252+
log.Printf("node %d: not empty snapshot\n", rc.id)
253+
rc.saveSnap(rd.Snapshot)
254+
rc.raftStorage.ApplySnapshot(rd.Snapshot)
255+
}
154256
rc.raftStorage.Append(rd.Entries)
155257

156258
rc.t.send(rd.Messages)
@@ -194,7 +296,11 @@ func (rc *raftNode) serveChannels() {
194296
return
195297
}
196298
}
197-
// TODO: after commit, update appliedIndex
299+
// after commit, update appliedIndex
300+
if len(rd.CommittedEntries) > 0 {
301+
rc.appliedIndex = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
302+
}
303+
rc.maybeTriggerSnapshot(applyDoneC)
198304
rc.node.Advance()
199305
case <-rc.stopc:
200306
log.Println("stopping at serveChannels")
@@ -204,10 +310,18 @@ func (rc *raftNode) serveChannels() {
204310
}
205311
}
206312

313+
func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error {
314+
if err := rc.ss.saveSnap(snap); err != nil {
315+
return err
316+
}
317+
return nil
318+
}
319+
207320
func (rc *raftNode) processCommits() error {
208321
for commit := range rc.commitC {
209322
if commit == nil {
210-
// TODO: load snapshot
323+
// a request to load snapshot
324+
rc.loadAndApplySnapshot()
211325
continue
212326
}
213327
if err := rc.fsm.ApplyCommits(commit); err != nil {

contrib/raftsimple/storage.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"os"
7+
8+
"go.etcd.io/raft/v3/raftpb"
9+
)
10+
11+
type snapshotStorage interface {
12+
// SaveSnap saves `snapshot` to persistent storage.
13+
saveSnap(snapshot raftpb.Snapshot) error
14+
15+
// saveHardState(hardstate raftpb.HardState) error
16+
17+
// saveEntries(entries []raftpb.Entry) error
18+
19+
// Load reads and returns the newest snapshot that is
20+
// available.
21+
load() (*raftpb.Snapshot, error)
22+
23+
// // LoadNewestAvailable loads the newest available snapshot
24+
// // whose term and index matches one of those in walSnaps.
25+
// LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error)
26+
}
27+
28+
type snapStore struct {
29+
dir string
30+
}
31+
32+
func (ss *snapStore) saveSnap(snapshot raftpb.Snapshot) error {
33+
log.Println("ss: saveSnap is being called")
34+
snap, err := snapshot.Marshal()
35+
if err != nil {
36+
return err
37+
}
38+
// name := fmt.Sprintf("snap_%d_%d", snapshot.Metadata.Term, snapshot.Metadata.Index)
39+
return ss.saveToFile("snap", snap)
40+
}
41+
42+
// func (ss *snapStore) saveHardState(hardstate raftpb.HardState) error {
43+
// hs, err := hardstate.Marshal()
44+
// if err != nil {
45+
// return err
46+
// }
47+
// name := fmt.Sprintf("hardstate_%d_%d", hardstate.Term, hardstate)
48+
// return ss.saveToFile(name, snap)
49+
// }
50+
51+
func (ss *snapStore) saveToFile(name string, data []byte) error {
52+
if err := os.WriteFile(fmt.Sprintf("%s/%s", ss.dir, name), data, 0644); err != nil {
53+
return err
54+
}
55+
return nil
56+
}
57+
58+
func (ss *snapStore) load() (*raftpb.Snapshot, error) {
59+
data, err := os.ReadFile(fmt.Sprintf("%s/snap", ss.dir))
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
var newSnap raftpb.Snapshot
65+
if err := newSnap.Unmarshal(data); err != nil {
66+
return nil, err
67+
}
68+
return &newSnap, nil
69+
}
70+
71+
func newSnapshotStorage(dir string) (snapshotStorage, error) {
72+
err := os.MkdirAll(dir, 0750)
73+
if err != nil {
74+
return nil, err
75+
}
76+
ss := snapStore{dir: dir}
77+
return &ss, nil
78+
}

0 commit comments

Comments
 (0)