Skip to content

Commit 148f5b3

Browse files
committed
Add raft migration and rollout tooling
1 parent 73987fd commit 148f5b3

File tree

8 files changed

+1219
-0
lines changed

8 files changed

+1219
-0
lines changed

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,32 @@ To start the server, use the following command:
7878
go run cmd/server/demo.go
7979
```
8080

81+
### Migrating Legacy BoltDB Raft Storage
82+
83+
Recent versions store Raft logs and stable state in Pebble (`raft.db`) instead of
84+
the legacy BoltDB files (`logs.dat` and `stable.dat`). If startup fails with:
85+
86+
```text
87+
legacy boltdb Raft storage "logs.dat" found in ...
88+
```
89+
90+
stop the node and run the offline migrator against the directory shown in the
91+
error:
92+
93+
```bash
94+
go run ./cmd/raft-migrate --dir /var/lib/elastickv/n1
95+
mv /var/lib/elastickv/n1/logs.dat /var/lib/elastickv/n1/logs.dat.bak
96+
mv /var/lib/elastickv/n1/stable.dat /var/lib/elastickv/n1/stable.dat.bak
97+
```
98+
99+
For multi-group layouts, pass the exact group directory from the error message
100+
(for example `/var/lib/elastickv/n1/group-1`).
101+
102+
After that, start Elastickv normally. The migrator leaves the legacy files in
103+
place as a backup, but they must be moved or removed before startup because the
104+
server intentionally refuses to run while `logs.dat` or `stable.dat` are still
105+
present.
106+
81107
To expose metrics on a dedicated port:
82108
```bash
83109
go run . \

cmd/raft-migrate/main.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"log"
7+
"path/filepath"
8+
9+
"github.com/bootjp/elastickv/internal/raftstore"
10+
)
11+
12+
func main() {
13+
var (
14+
dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat")
15+
out = flag.String("out", "", "Destination Pebble raft.db directory (default: <dir>/raft.db)")
16+
)
17+
flag.Parse()
18+
19+
if *dir == "" {
20+
log.Fatal("--dir is required")
21+
}
22+
23+
dest := *out
24+
if dest == "" {
25+
dest = filepath.Join(*dir, "raft.db")
26+
}
27+
28+
stats, err := raftstore.MigrateLegacyBoltDB(
29+
filepath.Join(*dir, "logs.dat"),
30+
filepath.Join(*dir, "stable.dat"),
31+
dest,
32+
)
33+
if err != nil {
34+
log.Fatalf("migration failed: %v", err)
35+
}
36+
37+
fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys)
38+
fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv")
39+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ require (
2828
github.com/tidwall/redcon v1.6.2
2929
github.com/vmihailenco/msgpack/v5 v5.4.1
3030
github.com/yuin/gopher-lua v1.1.1
31+
go.etcd.io/bbolt v1.4.3
3132
golang.org/x/sync v0.20.0
3233
golang.org/x/sys v0.42.0
3334
google.golang.org/grpc v1.79.3

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
336336
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
337337
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
338338
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
339+
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
340+
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
339341
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
340342
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
341343
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=

internal/raftstore/migrate.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package raftstore
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"path/filepath"
7+
8+
"github.com/cockroachdb/errors"
9+
"github.com/hashicorp/go-msgpack/v2/codec"
10+
"github.com/hashicorp/raft"
11+
"go.etcd.io/bbolt"
12+
)
13+
14+
const (
15+
legacyLogsBucket = "logs"
16+
legacyStableBucket = "conf"
17+
legacyBatchSize = 1024
18+
legacyBoltFileMode = 0o600
19+
legacyMigrationSuffix = ".migrating"
20+
)
21+
22+
type MigrationStats struct {
23+
Logs uint64
24+
StableKeys uint64
25+
}
26+
27+
func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) {
28+
tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath)
34+
if err != nil {
35+
return nil, err
36+
}
37+
defer closeSources()
38+
39+
stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir)
40+
if err != nil {
41+
return nil, err
42+
}
43+
if err := finalizeMigratedStore(tempDir, destDir); err != nil {
44+
return nil, err
45+
}
46+
return stats, nil
47+
}
48+
49+
func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) {
50+
if logsPath == "" {
51+
return "", errors.New("logs path is required")
52+
}
53+
if stablePath == "" {
54+
return "", errors.New("stable path is required")
55+
}
56+
if destDir == "" {
57+
return "", errors.New("destination dir is required")
58+
}
59+
60+
if err := requireExistingFile(logsPath); err != nil {
61+
return "", err
62+
}
63+
if err := requireExistingFile(stablePath); err != nil {
64+
return "", err
65+
}
66+
if err := requireDestinationAbsent(destDir); err != nil {
67+
return "", err
68+
}
69+
70+
tempDir := destDir + legacyMigrationSuffix
71+
if err := requireDestinationAbsent(tempDir); err != nil {
72+
return "", err
73+
}
74+
return tempDir, nil
75+
}
76+
77+
func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) {
78+
logsDB, err = openLegacyBoltReadOnly(logsPath)
79+
if err != nil {
80+
return nil, nil, nil, err
81+
}
82+
83+
stableDB, err = openLegacyBoltReadOnly(stablePath)
84+
if err != nil {
85+
_ = logsDB.Close()
86+
return nil, nil, nil, err
87+
}
88+
89+
closeFn = func() {
90+
_ = stableDB.Close()
91+
_ = logsDB.Close()
92+
}
93+
return logsDB, stableDB, closeFn, nil
94+
}
95+
96+
func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) {
97+
store, err := NewPebbleStore(tempDir)
98+
if err != nil {
99+
return nil, err
100+
}
101+
102+
cleanupTemp := func() {
103+
_ = store.Close()
104+
_ = os.RemoveAll(tempDir)
105+
}
106+
107+
stats, err := migrateLegacyBoltData(logsDB, stableDB, store)
108+
if err != nil {
109+
cleanupTemp()
110+
return nil, err
111+
}
112+
if err := store.Close(); err != nil {
113+
_ = os.RemoveAll(tempDir)
114+
return nil, err
115+
}
116+
return stats, nil
117+
}
118+
119+
func finalizeMigratedStore(tempDir, destDir string) error {
120+
if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil {
121+
_ = os.RemoveAll(tempDir)
122+
return errors.WithStack(err)
123+
}
124+
if err := os.Rename(tempDir, destDir); err != nil {
125+
_ = os.RemoveAll(tempDir)
126+
return errors.WithStack(err)
127+
}
128+
return nil
129+
}
130+
131+
func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) {
132+
stats := &MigrationStats{}
133+
134+
if err := copyLegacyStable(stableDB, dest, stats); err != nil {
135+
return nil, err
136+
}
137+
if err := copyLegacyLogs(logsDB, dest, stats); err != nil {
138+
return nil, err
139+
}
140+
141+
return stats, nil
142+
}
143+
144+
func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
145+
return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error {
146+
bucket := tx.Bucket([]byte(legacyStableBucket))
147+
if bucket == nil {
148+
return errors.Newf("legacy stable bucket %q not found", legacyStableBucket)
149+
}
150+
return bucket.ForEach(func(k, v []byte) error {
151+
if err := dest.Set(k, append([]byte(nil), v...)); err != nil {
152+
return err
153+
}
154+
stats.StableKeys++
155+
return nil
156+
})
157+
}))
158+
}
159+
160+
func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
161+
batch := make([]*raft.Log, 0, legacyBatchSize)
162+
163+
flush := func() error {
164+
if len(batch) == 0 {
165+
return nil
166+
}
167+
if err := dest.StoreLogs(batch); err != nil {
168+
return err
169+
}
170+
stats.Logs += uint64(len(batch))
171+
batch = batch[:0]
172+
return nil
173+
}
174+
175+
err := logsDB.View(func(tx *bbolt.Tx) error {
176+
bucket := tx.Bucket([]byte(legacyLogsBucket))
177+
if bucket == nil {
178+
return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket)
179+
}
180+
return bucket.ForEach(func(_, v []byte) error {
181+
var entry raft.Log
182+
if err := decodeLegacyLog(v, &entry); err != nil {
183+
return err
184+
}
185+
batch = append(batch, &entry)
186+
if len(batch) < legacyBatchSize {
187+
return nil
188+
}
189+
return flush()
190+
})
191+
})
192+
if err != nil {
193+
return errors.WithStack(err)
194+
}
195+
196+
return flush()
197+
}
198+
199+
func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) {
200+
db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true})
201+
if err != nil {
202+
return nil, errors.WithStack(err)
203+
}
204+
return db, nil
205+
}
206+
207+
func requireExistingFile(path string) error {
208+
info, err := os.Stat(path)
209+
if err != nil {
210+
return errors.WithStack(err)
211+
}
212+
if info.IsDir() {
213+
return errors.WithStack(errors.Newf("%s is a directory, expected file", path))
214+
}
215+
return nil
216+
}
217+
218+
func requireDestinationAbsent(path string) error {
219+
if _, err := os.Stat(path); err == nil {
220+
return errors.WithStack(errors.Newf("destination already exists: %s", path))
221+
} else if !os.IsNotExist(err) {
222+
return errors.WithStack(err)
223+
}
224+
return nil
225+
}
226+
227+
func decodeLegacyLog(payload []byte, out *raft.Log) error {
228+
handle := codec.MsgpackHandle{}
229+
decoder := codec.NewDecoder(bytes.NewReader(payload), &handle)
230+
return errors.WithStack(decoder.Decode(out))
231+
}

0 commit comments

Comments
 (0)