Skip to content

Commit d17e9ce

Browse files
committed
feat: Replace Consul with NATS KV for key-value storage and add migration script
1 parent 878c8df commit d17e9ce

25 files changed

Lines changed: 553 additions & 545 deletions

cmd/mpcium-cli/register-peers.go

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/fystack/mpcium/pkg/config"
1111
"github.com/fystack/mpcium/pkg/infra"
1212
"github.com/fystack/mpcium/pkg/logger"
13-
"github.com/hashicorp/consul/api"
13+
"github.com/nats-io/nats.go/jetstream"
1414
"github.com/urfave/cli/v3"
1515
)
1616

@@ -23,8 +23,8 @@ func registerPeers(ctx context.Context, c *cli.Command) error {
2323
inputPath = "peers.json"
2424
}
2525

26-
// Hardcoded prefix for MPC peers in Consul
27-
prefix := "mpc_peers/"
26+
// Prefix for MPC peers in NATS KV (empty as bucket namespace is sufficient)
27+
prefix := ""
2828

2929
// Validate the input file path for security
3030
if err := pathutil.ValidateFilePath(inputPath); err != nil {
@@ -57,41 +57,54 @@ func registerPeers(ctx context.Context, c *cli.Command) error {
5757

5858
// Initialize config and logger
5959
config.InitViperConfig(c.String("config"))
60+
appConfig := config.LoadConfig()
6061
logger.Init(environment, true)
6162

62-
// Connect to Consul
63-
client := infra.GetConsulClient(environment)
64-
kv := client.KV()
63+
// Connect to NATS
64+
// reusing getNATSConnection from benchmark.go which is in the same package
65+
nc, err := getNATSConnection(environment, appConfig)
66+
if err != nil {
67+
return fmt.Errorf("failed to connect to NATS: %w", err)
68+
}
69+
defer nc.Close()
70+
71+
js, err := jetstream.New(nc)
72+
if err != nil {
73+
return fmt.Errorf("failed to get JetStream context: %w", err)
74+
}
6575

66-
// Register peers in Consul
76+
peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
77+
if err != nil {
78+
return fmt.Errorf("failed to init mpc-peers KV bucket: %w", err)
79+
}
80+
81+
// Register peers in NATS KV
6782
for nodeName, nodeID := range peerMap {
6883
key := prefix + nodeName
6984

7085
// Check if the key already exists
71-
existing, _, err := kv.Get(key, nil)
86+
existing, err := peersKV.Get(key)
7287
if err != nil {
7388
return fmt.Errorf("failed to check existing key %s: %w", key, err)
7489
}
7590

7691
if existing != nil {
77-
existingID := string(existing.Value)
92+
existingID := string(existing)
7893
if existingID != nodeID {
7994
return fmt.Errorf("conflict detected: peer %s already exists with ID %s, but trying to register with different ID %s", nodeName, existingID, nodeID)
8095
}
8196
fmt.Printf("Peer %s already registered with same ID %s, skipping\n", nodeName, nodeID)
8297
continue
8398
}
8499

85-
p := &api.KVPair{Key: key, Value: []byte(nodeID)}
86-
87100
// Store the key-value pair
88-
_, err = kv.Put(p, nil)
101+
err = peersKV.Put(key, []byte(nodeID))
89102
if err != nil {
90103
return fmt.Errorf("failed to store key %s: %w", key, err)
91104
}
92-
fmt.Printf("Registered peer %s with ID %s to Consul\n", nodeName, nodeID)
105+
fmt.Printf("Registered peer %s with ID %s to NATS KV\n", nodeName, nodeID)
93106
}
94107

95-
logger.Info("Successfully registered peers to Consul", "peers", peerMap, "prefix", prefix)
108+
logger.Info("Successfully registered peers to NATS KV", "peers", peerMap, "bucket", "mpc-peers")
96109
return nil
97110
}

cmd/mpcium/main.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/fystack/mpcium/pkg/messaging"
2525
"github.com/fystack/mpcium/pkg/mpc"
2626
"github.com/fystack/mpcium/pkg/security"
27-
"github.com/hashicorp/consul/api"
2827
"github.com/nats-io/nats.go"
28+
"github.com/nats-io/nats.go/jetstream"
2929
"github.com/spf13/viper"
3030
"github.com/urfave/cli/v3"
3131
"golang.org/x/term"
@@ -38,11 +38,11 @@ const (
3838

3939
func printBanner() {
4040
banner := fmt.Sprintf(`
41-
╔══════════════════════════════════════════════════════════════╗
42-
║ MPCIUM v%s ║
43-
║ Multi-Party Computation Threshold Signatures Node ║
44-
╚══════════════════════════════════════════════════════════════╝
45-
`, Version)
41+
41: ╔══════════════════════════════════════════════════════════════╗
42+
42: ║ MPCIUM v%s ║
43+
43: ║ Multi-Party Computation Threshold Signatures Node ║
44+
44: ╚══════════════════════════════════════════════════════════════╝
45+
45: `, Version)
4646
fmt.Print(banner)
4747
}
4848

@@ -144,9 +144,33 @@ func runNode(ctx context.Context, c *cli.Command) error {
144144
// Validate the config values
145145
checkRequiredConfigValues(appConfig)
146146

147-
consulClient := infra.GetConsulClient(environment)
148-
keyinfoStore := keyinfo.NewStore(consulClient.KV())
149-
peers := LoadPeersFromConsul(consulClient)
147+
natsConn, err := GetNATSConnection(environment, appConfig)
148+
if err != nil {
149+
logger.Fatal("Failed to connect to NATS", err)
150+
}
151+
152+
js, err := jetstream.New(natsConn)
153+
if err != nil {
154+
logger.Fatal("Failed to get JetStream context", err)
155+
}
156+
157+
readyKV, err := infra.NewNatsKVStore(js, "mpc-ready")
158+
if err != nil {
159+
logger.Fatal("Failed to init mpc-ready KV bucket", err)
160+
}
161+
162+
keyinfoKV, err := infra.NewNatsKVStore(js, "mpc-keyinfo")
163+
if err != nil {
164+
logger.Fatal("Failed to init mpc-keyinfo KV bucket", err)
165+
}
166+
167+
peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
168+
if err != nil {
169+
logger.Fatal("Failed to init mpc-peers KV bucket", err)
170+
}
171+
172+
keyinfoStore := keyinfo.NewStore(keyinfoKV)
173+
peers := LoadPeersFromNatsKV(peersKV)
150174
nodeID := GetIDFromName(nodeName, peers)
151175

152176
badgerKV := NewBadgerKV(nodeName, nodeID, appConfig)
@@ -165,11 +189,6 @@ func runNode(ctx context.Context, c *cli.Command) error {
165189
logger.Fatal("Failed to create identity store", err)
166190
}
167191

168-
natsConn, err := GetNATSConnection(environment, appConfig)
169-
if err != nil {
170-
logger.Fatal("Failed to connect to NATS", err)
171-
}
172-
173192
pubsub := messaging.NewNATSPubSub(natsConn)
174193
keygenBroker, err := messaging.NewJetStreamBroker(ctx, natsConn, event.KeygenBrokerStream, []string{
175194
event.KeygenRequestTopic,
@@ -201,7 +220,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
201220
logger.Info("Starting mpcium node", "version", Version, "ID", nodeID, "name", nodeName)
202221

203222
peerNodeIDs := GetPeerIDs(peers)
204-
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, consulClient.KV(), directMessaging, pubsub, identityStore)
223+
peerRegistry := mpc.NewRegistry(nodeID, peerNodeIDs, readyKV, directMessaging, pubsub, identityStore)
205224

206225
chainCodeHex := viper.GetString("chain_code")
207226
ckd, err := mpc.NewCKDFromHex(chainCodeHex)
@@ -256,7 +275,7 @@ func runNode(ctx context.Context, c *cli.Command) error {
256275
if healthAddr == "" {
257276
healthAddr = ":8080"
258277
}
259-
healthServer = healthcheck.NewServer(healthAddr, peerRegistry, natsConn, consulClient)
278+
healthServer = healthcheck.NewServer(healthAddr, peerRegistry, natsConn)
260279
go func() {
261280
if err := healthServer.Start(); err != nil {
262281
logger.Error("Health check server error", err)
@@ -460,25 +479,12 @@ func checkRequiredConfigValues(appConfig *config.AppConfig) {
460479
}
461480
}
462481

463-
func NewConsulClient(addr string) *api.Client {
464-
// Create a new Consul client
465-
consulConfig := api.DefaultConfig()
466-
consulConfig.Address = addr
467-
consulClient, err := api.NewClient(consulConfig)
468-
if err != nil {
469-
logger.Fatal("Failed to create consul client", err)
470-
}
471-
logger.Info("Connected to consul!")
472-
return consulClient
473-
}
474-
475-
func LoadPeersFromConsul(consulClient *api.Client) []config.Peer { // Create a Consul Key-Value store client
476-
kv := consulClient.KV()
477-
peers, err := config.LoadPeersFromConsul(kv, "mpc_peers/")
482+
func LoadPeersFromNatsKV(peersKV infra.NatsKV) []config.Peer {
483+
peers, err := config.LoadPeersFromNatsKV(peersKV)
478484
if err != nil {
479-
logger.Fatal("Failed to load peers from Consul", err)
485+
logger.Fatal("Failed to load peers from NATS KV", err)
480486
}
481-
logger.Info("Loaded peers from consul", "peers", peers)
487+
logger.Info("Loaded peers from NATS KV", "peers", peers)
482488

483489
return peers
484490
}

e2e/base_test.go

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import (
1717
"github.com/dgraph-io/badger/v4/options"
1818
"github.com/fystack/mpcium/pkg/client"
1919
"github.com/fystack/mpcium/pkg/event"
20+
"github.com/fystack/mpcium/pkg/infra"
2021
"github.com/fystack/mpcium/pkg/kvstore"
2122
"github.com/fystack/mpcium/pkg/types"
22-
"github.com/hashicorp/consul/api"
2323
"github.com/nats-io/nats.go"
24+
"github.com/nats-io/nats.go/jetstream"
2425
"github.com/stretchr/testify/require"
2526
"gopkg.in/yaml.v2"
2627
)
@@ -35,9 +36,6 @@ type TestConfig struct {
3536
Nats struct {
3637
URL string `yaml:"url"`
3738
} `yaml:"nats"`
38-
Consul struct {
39-
Address string `yaml:"address"`
40-
} `yaml:"consul"`
4139
MPCThreshold int `yaml:"mpc_threshold"`
4240
Environment string `yaml:"environment"`
4341
BadgerPassword string `yaml:"badger_password"`
@@ -49,7 +47,7 @@ type TestConfig struct {
4947

5048
type E2ETestSuite struct {
5149
ctx context.Context
52-
consulClient *api.Client
50+
cancel context.CancelFunc
5351
natsConn *nats.Conn
5452
mpcClient client.MPCClient
5553
testDir string
@@ -62,9 +60,10 @@ type E2ETestSuite struct {
6260
}
6361

6462
func NewE2ETestSuite(testDir string) *E2ETestSuite {
65-
ctx, _ := context.WithCancel(context.Background())
63+
ctx, cancel := context.WithCancel(context.Background())
6664
return &E2ETestSuite{
6765
ctx: ctx,
66+
cancel: cancel,
6867
testDir: testDir,
6968
walletIDs: make([]string, 0),
7069
keygenResults: make(map[string]*event.KeygenResultEvent),
@@ -148,18 +147,7 @@ func (s *E2ETestSuite) setupClients(t *testing.T) {
148147
var err error
149148

150149
// Use the fixed ports from docker-compose.test.yaml
151-
consulPort := 8501 // consul-test service maps 8501:8500
152-
natsPort := 4223 // nats-server-test service maps 4223:4222
153-
154-
// Setup Consul client
155-
consulConfig := api.DefaultConfig()
156-
consulConfig.Address = fmt.Sprintf("localhost:%d", consulPort)
157-
s.consulClient, err = api.NewClient(consulConfig)
158-
require.NoError(t, err, "Failed to create Consul client")
159-
160-
// Test Consul connection
161-
_, err = s.consulClient.Agent().Self()
162-
require.NoError(t, err, "Failed to connect to Consul")
150+
natsPort := 4223 // nats-server-test service maps 4223:4222
163151

164152
// Setup NATS client
165153
natsConn, err := nats.Connect(fmt.Sprintf("nats://localhost:%d", natsPort))
@@ -216,13 +204,7 @@ func (s *E2ETestSuite) SetupTestNodes(t *testing.T) {
216204
}
217205

218206
func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
219-
t.Log("Registering peers in Consul...")
220-
221-
// Check Consul health before proceeding
222-
t.Log("Checking Consul health...")
223-
_, err := s.consulClient.Status().Leader()
224-
require.NoError(t, err, "Consul is not healthy")
225-
t.Log("Consul is healthy")
207+
t.Log("Registering peers in NATS KV...")
226208

227209
// Use mpcium register-peers command instead of manual registration
228210
t.Log("Running mpcium-cli register-peers...")
@@ -237,20 +219,24 @@ func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
237219
require.NoError(t, err, "Failed to register peers")
238220
}
239221

240-
t.Log("Peers registered in Consul")
222+
t.Log("Peers registered in NATS KV")
241223

242224
// List current peers to verify registration
243-
t.Log("Listing current peers in Consul...")
244-
kv := s.consulClient.KV()
225+
t.Log("Listing current peers in NATS KV...")
226+
js, err := jetstream.New(s.natsConn)
227+
require.NoError(t, err, "Failed to get JetStream context")
228+
229+
peersKV, err := infra.NewNatsKVStore(js, "mpc-peers")
230+
require.NoError(t, err, "Failed to init mpc-peers KV bucket")
245231

246-
// Get all keys under the mpc_peers/ prefix (matches register-peers command)
247-
pairs, _, err := kv.List("mpc_peers/", nil)
232+
// Get all keys (empty prefix)
233+
pairs, err := peersKV.List("")
248234
if err != nil {
249235
t.Logf("Failed to list peers: %v", err)
250236
} else {
251-
t.Logf("Found %d peer entries in Consul under 'mpc_peers/':", len(pairs))
252-
for _, pair := range pairs {
253-
t.Logf(" - Key: %s, Value: %s", pair.Key, string(pair.Value))
237+
t.Logf("Found %d peer entries in NATS KV under 'mpc_peers/':", len(pairs))
238+
for k, v := range pairs {
239+
t.Logf(" - Key: %s, Value: %s", k, string(v))
254240
}
255241
}
256242

@@ -267,13 +253,12 @@ func (s *E2ETestSuite) RegisterPeers(t *testing.T) {
267253
func (s *E2ETestSuite) StartNodes(t *testing.T) {
268254
t.Log("Starting MPC nodes...")
269255

270-
// Double-check that Consul is still accessible before starting nodes
271-
t.Log("Verifying Consul is still accessible...")
272-
_, err := s.consulClient.Status().Leader()
273-
if err != nil {
274-
t.Logf("Consul connection test failed: %v", err)
256+
// Double-check that NATS is still accessible before starting nodes
257+
t.Log("Verifying NATS is still accessible...")
258+
if !s.natsConn.IsConnected() {
259+
t.Log("NATS connection lost")
275260
} else {
276-
t.Log("Consul is still accessible")
261+
t.Log("NATS is still accessible")
277262
}
278263

279264
s.mpciumProcesses = make([]*exec.Cmd, numNodes)
@@ -318,12 +303,11 @@ func (s *E2ETestSuite) StartNodes(t *testing.T) {
318303
time.Sleep(5 * time.Second)
319304

320305
// Verify containers are still accessible
321-
t.Log("Final verification that Consul is still accessible...")
322-
_, err = s.consulClient.Status().Leader()
323-
if err != nil {
324-
t.Logf("Consul connection test failed after starting nodes: %v", err)
306+
t.Log("Final verification that NATS is still accessible...")
307+
if !s.natsConn.IsConnected() {
308+
t.Log("NATS connection lost after starting nodes")
325309
} else {
326-
t.Log("Consul is still accessible after starting nodes")
310+
t.Log("NATS is still accessible after starting nodes")
327311
}
328312

329313
// Show recent logs from each node
@@ -562,6 +546,11 @@ func (s *E2ETestSuite) Cleanup(t *testing.T) {
562546
s.natsConn.Close()
563547
}
564548

549+
// Cancel the context to release resources
550+
if s.cancel != nil {
551+
s.cancel()
552+
}
553+
565554
// Stop Docker Compose stack
566555
t.Log("Stopping Docker Compose stack...")
567556
cmd := exec.Command("docker", "compose", "-f", "docker-compose.test.yaml", "down", "-v")

e2e/docker-compose.test.yaml

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,3 @@ services:
99
- "6223:6222"
1010
tty: true
1111
restart: always
12-
13-
consul-test:
14-
image: consul:1.15.4
15-
container_name: consul-test
16-
ports:
17-
- "8501:8500"
18-
- "8602:8600/udp"
19-
command: "agent -server -ui -node=server-1 -bootstrap-expect=1 -client=0.0.0.0"
20-
restart: always

0 commit comments

Comments
 (0)