Skip to content

Commit 55074d2

Browse files
authored
Merge pull request #370 from bootjp/feature/rolling-update
Feature/rolling update
2 parents 0ae8389 + 46038ac commit 55074d2

File tree

8 files changed

+1234
-1
lines changed

8 files changed

+1234
-1
lines changed

README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,32 @@ To start the server, use the following command:
7878
go run cmd/server/demo.go
7979
```
8080

81+
### Migrating Legacy BoltDB Raft Storage
82+
83+
Recent versions store Raft logs and stable state in Pebble (`raft.db`) instead of
84+
the legacy BoltDB files (`logs.dat` and `stable.dat`). If startup fails with:
85+
86+
```text
87+
legacy boltdb Raft storage "logs.dat" found in ...
88+
```
89+
90+
stop the node and run the offline migrator against the directory shown in the
91+
error:
92+
93+
```bash
94+
go run ./cmd/raft-migrate --dir /var/lib/elastickv/n1
95+
mv /var/lib/elastickv/n1/logs.dat /var/lib/elastickv/n1/logs.dat.bak
96+
mv /var/lib/elastickv/n1/stable.dat /var/lib/elastickv/n1/stable.dat.bak
97+
```
98+
99+
For multi-group layouts, pass the exact group directory from the error message
100+
(for example `/var/lib/elastickv/n1/group-1`).
101+
102+
After that, start Elastickv normally. The migrator leaves the legacy files in
103+
place as a backup, but they must be moved or removed before startup because the
104+
server intentionally refuses to run while `logs.dat` or `stable.dat` are still
105+
present.
106+
81107
To expose metrics on a dedicated port:
82108
```bash
83109
go run . \

cmd/raft-migrate/main.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"log"
7+
"path/filepath"
8+
9+
"github.com/bootjp/elastickv/internal/raftstore"
10+
)
11+
12+
func main() {
13+
var (
14+
dir = flag.String("dir", "", "Directory containing legacy logs.dat and stable.dat")
15+
out = flag.String("out", "", "Destination Pebble raft.db directory (default: <dir>/raft.db)")
16+
)
17+
flag.Parse()
18+
19+
if *dir == "" {
20+
log.Fatal("--dir is required")
21+
}
22+
23+
dest := *out
24+
if dest == "" {
25+
dest = filepath.Join(*dir, "raft.db")
26+
}
27+
28+
stats, err := raftstore.MigrateLegacyBoltDB(
29+
filepath.Join(*dir, "logs.dat"),
30+
filepath.Join(*dir, "stable.dat"),
31+
dest,
32+
)
33+
if err != nil {
34+
log.Fatalf("migration failed: %v", err)
35+
}
36+
37+
fmt.Printf("migrated legacy raft storage to %s (logs=%d stable_keys=%d)\n", dest, stats.Logs, stats.StableKeys)
38+
fmt.Println("next: archive or remove logs.dat and stable.dat before starting elastickv")
39+
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/emirpasic/gods v1.18.1
2020
github.com/getsentry/sentry-go v0.27.0
2121
github.com/hashicorp/go-hclog v1.6.3
22+
github.com/hashicorp/go-msgpack/v2 v2.1.2
2223
github.com/hashicorp/raft v1.7.3
2324
github.com/pkg/errors v0.9.1
2425
github.com/prometheus/client_golang v1.23.2
@@ -28,6 +29,7 @@ require (
2829
github.com/tidwall/redcon v1.6.2
2930
github.com/vmihailenco/msgpack/v5 v5.4.1
3031
github.com/yuin/gopher-lua v1.1.1
32+
go.etcd.io/bbolt v1.4.3
3133
golang.org/x/sync v0.20.0
3234
golang.org/x/sys v0.42.0
3335
google.golang.org/grpc v1.79.3
@@ -65,7 +67,6 @@ require (
6567
github.com/hashicorp/errwrap v1.0.0 // indirect
6668
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
6769
github.com/hashicorp/go-metrics v0.5.4 // indirect
68-
github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect
6970
github.com/hashicorp/go-multierror v1.1.1 // indirect
7071
github.com/hashicorp/golang-lru v1.0.2 // indirect
7172
github.com/klauspost/compress v1.18.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
336336
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
337337
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
338338
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
339+
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
340+
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
339341
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
340342
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
341343
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=

internal/raftstore/migrate.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package raftstore
2+
3+
import (
4+
"bytes"
5+
"os"
6+
"path/filepath"
7+
8+
"github.com/cockroachdb/errors"
9+
"github.com/hashicorp/go-msgpack/v2/codec"
10+
"github.com/hashicorp/raft"
11+
"go.etcd.io/bbolt"
12+
)
13+
14+
const (
15+
legacyLogsBucket = "logs"
16+
legacyStableBucket = "conf"
17+
legacyBatchSize = 1024
18+
legacyBoltFileMode = 0o600
19+
legacyMigrationSuffix = ".migrating"
20+
)
21+
22+
type MigrationStats struct {
23+
Logs uint64
24+
StableKeys uint64
25+
}
26+
27+
func MigrateLegacyBoltDB(logsPath, stablePath, destDir string) (*MigrationStats, error) {
28+
tempDir, err := prepareMigrationPaths(logsPath, stablePath, destDir)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
logsDB, stableDB, closeSources, err := openLegacySourceDBs(logsPath, stablePath)
34+
if err != nil {
35+
return nil, err
36+
}
37+
defer closeSources()
38+
39+
stats, err := migrateLegacyBoltToTempDir(logsDB, stableDB, tempDir)
40+
if err != nil {
41+
return nil, err
42+
}
43+
if err := finalizeMigratedStore(tempDir, destDir); err != nil {
44+
return nil, err
45+
}
46+
return stats, nil
47+
}
48+
49+
func prepareMigrationPaths(logsPath, stablePath, destDir string) (string, error) {
50+
if logsPath == "" {
51+
return "", errors.New("logs path is required")
52+
}
53+
if stablePath == "" {
54+
return "", errors.New("stable path is required")
55+
}
56+
if destDir == "" {
57+
return "", errors.New("destination dir is required")
58+
}
59+
60+
destDir = filepath.Clean(destDir)
61+
62+
if err := requireExistingFile(logsPath); err != nil {
63+
return "", err
64+
}
65+
if err := requireExistingFile(stablePath); err != nil {
66+
return "", err
67+
}
68+
if err := requireDestinationAbsent(destDir); err != nil {
69+
return "", err
70+
}
71+
72+
tempDir := destDir + legacyMigrationSuffix
73+
if err := requireDestinationAbsent(tempDir); err != nil {
74+
return "", err
75+
}
76+
return tempDir, nil
77+
}
78+
79+
func openLegacySourceDBs(logsPath, stablePath string) (logsDB *bbolt.DB, stableDB *bbolt.DB, closeFn func(), err error) {
80+
logsDB, err = openLegacyBoltReadOnly(logsPath)
81+
if err != nil {
82+
return nil, nil, nil, err
83+
}
84+
85+
stableDB, err = openLegacyBoltReadOnly(stablePath)
86+
if err != nil {
87+
_ = logsDB.Close()
88+
return nil, nil, nil, err
89+
}
90+
91+
closeFn = func() {
92+
_ = stableDB.Close()
93+
_ = logsDB.Close()
94+
}
95+
return logsDB, stableDB, closeFn, nil
96+
}
97+
98+
func migrateLegacyBoltToTempDir(logsDB, stableDB *bbolt.DB, tempDir string) (*MigrationStats, error) {
99+
store, err := NewPebbleStore(tempDir)
100+
if err != nil {
101+
return nil, err
102+
}
103+
104+
cleanupTemp := func() {
105+
_ = store.Close()
106+
_ = os.RemoveAll(tempDir)
107+
}
108+
109+
stats, err := migrateLegacyBoltData(logsDB, stableDB, store)
110+
if err != nil {
111+
cleanupTemp()
112+
return nil, err
113+
}
114+
if err := store.Close(); err != nil {
115+
_ = os.RemoveAll(tempDir)
116+
return nil, err
117+
}
118+
return stats, nil
119+
}
120+
121+
func finalizeMigratedStore(tempDir, destDir string) error {
122+
if err := os.MkdirAll(filepath.Dir(destDir), pebbleDirPerm); err != nil {
123+
_ = os.RemoveAll(tempDir)
124+
return errors.WithStack(err)
125+
}
126+
if err := os.Rename(tempDir, destDir); err != nil {
127+
_ = os.RemoveAll(tempDir)
128+
return errors.WithStack(err)
129+
}
130+
return nil
131+
}
132+
133+
func migrateLegacyBoltData(logsDB, stableDB *bbolt.DB, dest *PebbleStore) (*MigrationStats, error) {
134+
stats := &MigrationStats{}
135+
136+
if err := copyLegacyStable(stableDB, dest, stats); err != nil {
137+
return nil, err
138+
}
139+
if err := copyLegacyLogs(logsDB, dest, stats); err != nil {
140+
return nil, err
141+
}
142+
143+
return stats, nil
144+
}
145+
146+
func copyLegacyStable(stableDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
147+
return errors.WithStack(stableDB.View(func(tx *bbolt.Tx) error {
148+
bucket := tx.Bucket([]byte(legacyStableBucket))
149+
if bucket == nil {
150+
return errors.Newf("legacy stable bucket %q not found", legacyStableBucket)
151+
}
152+
return bucket.ForEach(func(k, v []byte) error {
153+
if err := dest.Set(k, append([]byte(nil), v...)); err != nil {
154+
return err
155+
}
156+
stats.StableKeys++
157+
return nil
158+
})
159+
}))
160+
}
161+
162+
func copyLegacyLogs(logsDB *bbolt.DB, dest *PebbleStore, stats *MigrationStats) error {
163+
batch := make([]*raft.Log, 0, legacyBatchSize)
164+
165+
flush := func() error {
166+
if len(batch) == 0 {
167+
return nil
168+
}
169+
if err := dest.StoreLogs(batch); err != nil {
170+
return err
171+
}
172+
stats.Logs += uint64(len(batch))
173+
batch = batch[:0]
174+
return nil
175+
}
176+
177+
err := logsDB.View(func(tx *bbolt.Tx) error {
178+
bucket := tx.Bucket([]byte(legacyLogsBucket))
179+
if bucket == nil {
180+
return errors.Newf("legacy logs bucket %q not found", legacyLogsBucket)
181+
}
182+
return bucket.ForEach(func(_, v []byte) error {
183+
var entry raft.Log
184+
if err := decodeLegacyLog(v, &entry); err != nil {
185+
return err
186+
}
187+
batch = append(batch, &entry)
188+
if len(batch) < legacyBatchSize {
189+
return nil
190+
}
191+
return flush()
192+
})
193+
})
194+
if err != nil {
195+
return errors.WithStack(err)
196+
}
197+
198+
return flush()
199+
}
200+
201+
func openLegacyBoltReadOnly(path string) (*bbolt.DB, error) {
202+
db, err := bbolt.Open(path, legacyBoltFileMode, &bbolt.Options{ReadOnly: true})
203+
if err != nil {
204+
return nil, errors.WithStack(err)
205+
}
206+
return db, nil
207+
}
208+
209+
func requireExistingFile(path string) error {
210+
info, err := os.Stat(path)
211+
if err != nil {
212+
return errors.WithStack(err)
213+
}
214+
if info.IsDir() {
215+
return errors.WithStack(errors.Newf("%s is a directory, expected file", path))
216+
}
217+
return nil
218+
}
219+
220+
func requireDestinationAbsent(path string) error {
221+
if _, err := os.Stat(path); err == nil {
222+
return errors.WithStack(errors.Newf("destination already exists: %s", path))
223+
} else if !os.IsNotExist(err) {
224+
return errors.WithStack(err)
225+
}
226+
return nil
227+
}
228+
229+
func decodeLegacyLog(payload []byte, out *raft.Log) error {
230+
handle := codec.MsgpackHandle{}
231+
decoder := codec.NewDecoder(bytes.NewReader(payload), &handle)
232+
return errors.WithStack(decoder.Decode(out))
233+
}

0 commit comments

Comments
 (0)