@@ -2,7 +2,9 @@ package main
22
33import (
44 "context"
5+ "errors"
56 "log"
7+ "os"
68 "time"
79
810 "go.etcd.io/raft/v3"
@@ -43,7 +45,13 @@ type raftNode struct {
4345 id uint64
4446 peers []raft.Peer
4547 fsm FSM
46- t transport
48+
49+ ss snapshotStorage
50+ confState raftpb.ConfState
51+ snapshotIndex uint64
52+ appliedIndex uint64
53+ snapCount uint64
54+ t transport
4755
4856 // When serveChannels is done, `err` is set to any error and then
4957 // `done` is closed.
@@ -59,7 +67,9 @@ type raftNode struct {
5967 // httpdonec chan struct{} // signals http server shutdown complete
6068}
6169
62- func newRaftNode (id uint64 , peers []uint64 , fsm FSM , nw * network , proposeC <- chan string , confChangeC <- chan raftpb.ConfChange ) * raftNode {
70+ var defaultSnapshotCount uint64 = 10000
71+
72+ func newRaftNode (id uint64 , peers []uint64 , fsm FSM , ss snapshotStorage , nw * network , proposeC <- chan string , confChangeC <- chan raftpb.ConfChange ) * raftNode {
6373 commitC := make (chan * commit )
6474 errorC := make (chan error )
6575
@@ -79,13 +89,15 @@ func newRaftNode(id uint64, peers []uint64, fsm FSM, nw *network, proposeC <-cha
7989 id : id ,
8090 peers : rpeers ,
8191 fsm : fsm ,
92+ ss : ss ,
93+ snapCount : defaultSnapshotCount ,
8294 t : t ,
8395 stopc : make (chan struct {}),
8496 // httpstopc: make(chan struct{}),
8597 // httpdonec: make(chan struct{}),
8698 }
8799
88- // TODO: load and apply snapshots here
100+ rc . loadAndApplySnapshot ()
89101
90102 go rc .startRaft ()
91103 return rc
@@ -112,7 +124,92 @@ func (rc *raftNode) startRaft() {
112124 go rc .serveChannels ()
113125}
114126
127+ // loadAndApplySnapshot loads the most recent snapshot from the
128+ // snapshot storage (if any) and applies it to the current state.
129+ func (rc * raftNode ) loadAndApplySnapshot () {
130+ snapshot , err := rc .ss .load ()
131+ if err != nil {
132+ if errors .Is (err , os .ErrNotExist ) {
133+ // No snapshots available; do nothing.
134+ return
135+ }
136+ log .Panic (err )
137+ }
138+
139+ log .Printf ("loading snapshot at term %d and index %d" , snapshot .Metadata .Term , snapshot .Metadata .Index )
140+ if err := rc .fsm .RestoreSnapshot (snapshot .Data ); err != nil {
141+ log .Panic (err )
142+ }
143+ }
144+
145+ func (rc * raftNode ) publishSnapshot (snapshotToSave raftpb.Snapshot ) {
146+ if raft .IsEmptySnap (snapshotToSave ) {
147+ return
148+ }
149+
150+ log .Printf ("publishing snapshot at index %d" , rc .snapshotIndex )
151+ defer log .Printf ("finished publishing snapshot at index %d" , rc .snapshotIndex )
152+
153+ if snapshotToSave .Metadata .Index <= rc .appliedIndex {
154+ log .Fatalf ("snapshot index [%d] should > progress.appliedIndex [%d]" , snapshotToSave .Metadata .Index , rc .appliedIndex )
155+ }
156+ rc .commitC <- nil // trigger kvstore to load snapshot
157+
158+ rc .confState = snapshotToSave .Metadata .ConfState
159+ rc .snapshotIndex = snapshotToSave .Metadata .Index
160+ rc .appliedIndex = snapshotToSave .Metadata .Index
161+ }
162+
163+ var snapshotCatchUpEntriesN uint64 = 10000
164+
165+ func (rc * raftNode ) maybeTriggerSnapshot (applyDoneC <- chan struct {}) {
166+ if rc .appliedIndex - rc .snapshotIndex <= rc .snapCount {
167+ return
168+ }
169+
170+ // wait until all committed entries are applied (or server is closed)
171+ if applyDoneC != nil {
172+ select {
173+ case <- applyDoneC :
174+ case <- rc .stopc :
175+ return
176+ }
177+ }
178+
179+ log .Printf ("start snapshot [applied index: %d | last snapshot index: %d]" , rc .appliedIndex , rc .snapshotIndex )
180+ data , err := rc .fsm .TakeSnapshot ()
181+ if err != nil {
182+ log .Panic (err )
183+ }
184+ snap , err := rc .raftStorage .CreateSnapshot (rc .appliedIndex , & rc .confState , data )
185+ if err != nil {
186+ panic (err )
187+ }
188+ if err := rc .saveSnap (snap ); err != nil {
189+ panic (err )
190+ }
191+
192+ compactIndex := uint64 (1 )
193+ if rc .appliedIndex > snapshotCatchUpEntriesN {
194+ compactIndex = rc .appliedIndex - snapshotCatchUpEntriesN
195+ }
196+ if err := rc .raftStorage .Compact (compactIndex ); err != nil {
197+ panic (err )
198+ }
199+
200+ log .Printf ("compacted log at index %d" , compactIndex )
201+ rc .snapshotIndex = rc .appliedIndex
202+ }
203+
115204func (rc * raftNode ) serveChannels () {
205+ snap , err := rc .raftStorage .Snapshot ()
206+ if err != nil {
207+ panic (err )
208+ }
209+ rc .confState = snap .Metadata .ConfState
210+ rc .snapshotIndex = snap .Metadata .Index
211+ rc .appliedIndex = snap .Metadata .Index
212+
116213 ticker := time .NewTicker (100 * time .Millisecond )
117214 defer ticker .Stop ()
118215
@@ -151,6 +248,11 @@ func (rc *raftNode) serveChannels() {
151248 // store raft entries, then publish over commit channel
152249 case rd := <- rc .node .Ready ():
153250 // saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
251+ if ! raft .IsEmptySnap (rd .Snapshot ) {
252+ log .Printf ("node %d: not empty snapshot\n " , rc .id )
253+ rc .saveSnap (rd .Snapshot )
254+ rc .raftStorage .ApplySnapshot (rd .Snapshot )
255+ }
154256 rc .raftStorage .Append (rd .Entries )
155257
156258 rc .t .send (rd .Messages )
@@ -194,7 +296,11 @@ func (rc *raftNode) serveChannels() {
194296 return
195297 }
196298 }
197- // TODO: after commit, update appliedIndex
299+ // after commit, update appliedIndex
300+ if len (rd .CommittedEntries ) > 0 {
301+ rc .appliedIndex = rd .CommittedEntries [len (rd .CommittedEntries )- 1 ].Index
302+ }
303+ rc .maybeTriggerSnapshot (applyDoneC )
198304 rc .node .Advance ()
199305 case <- rc .stopc :
200306 log .Println ("stopping at serveChannels" )
@@ -204,10 +310,18 @@ func (rc *raftNode) serveChannels() {
204310 }
205311}
206312
313+ func (rc * raftNode ) saveSnap (snap raftpb.Snapshot ) error {
314+ if err := rc .ss .saveSnap (snap ); err != nil {
315+ return err
316+ }
317+ return nil
318+ }
319+
207320func (rc * raftNode ) processCommits () error {
208321 for commit := range rc .commitC {
209322 if commit == nil {
210- // TODO: load snapshot
323+ // a request to load snapshot
324+ rc .loadAndApplySnapshot ()
211325 continue
212326 }
213327 if err := rc .fsm .ApplyCommits (commit ); err != nil {
0 commit comments