Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions dash-spv/src/client/chainlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
.await
{
// Penalize the peer that relayed the invalid ChainLock
let reason = format!("Invalid ChainLock: {}", e);
self.network.penalize_peer_invalid_chainlock(peer_address, &reason).await;
self.network.penalize_peer_invalid_chainlock(peer_address).await;
return Err(SpvError::Validation(e));
}
}
Expand Down Expand Up @@ -110,7 +109,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
tracing::warn!("{}", reason);

// Ban the peer using the reputation system
self.network.penalize_peer_invalid_instantlock(peer_address, &reason).await;
self.network.penalize_peer_invalid_instantlock(peer_address).await;

return Err(SpvError::Validation(e));
}
Expand Down
1 change: 1 addition & 0 deletions dash-spv/src/client/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
// Update storage with chain state including sync_base_height
{
let mut storage = self.storage.lock().await;

storage
.store_headers_at_height(&[checkpoint_header], checkpoint.height)
.await?;
Expand Down
138 changes: 61 additions & 77 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use crate::network::addrv2::AddrV2Handler;
use crate::network::constants::*;
use crate::network::discovery::DnsDiscovery;
use crate::network::pool::PeerPool;
use crate::network::reputation::{
misbehavior_scores, positive_scores, PeerReputationManager, ReputationAware,
};
use crate::network::reputation::{ChangeReason, PeerReputationManager};
use crate::network::{
HandshakeManager, Message, MessageDispatcher, MessageType, NetworkManager, Peer,
};
Expand All @@ -43,7 +41,7 @@ pub struct PeerNetworkManager {
/// Peer persistence
peer_store: Arc<PersistentPeerStorage>,
/// Peer reputation manager
reputation_manager: Arc<PeerReputationManager>,
reputation_manager: Arc<Mutex<PeerReputationManager>>,
/// Network type
network: Network,
/// Shutdown token
Expand Down Expand Up @@ -80,11 +78,7 @@ impl PeerNetworkManager {

let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?;

let reputation_manager = Arc::new(PeerReputationManager::new());

if let Err(e) = reputation_manager.load_from_storage(&peer_store).await {
log::warn!("Failed to load peer reputation data: {}", e);
}
let reputation_manager = PeerReputationManager::load_or_new(&peer_store).await;

// Determine exclusive mode: either explicitly requested or peers were provided
let exclusive_mode = config.restrict_to_configured_peers || !config.peers.is_empty();
Expand All @@ -94,7 +88,7 @@ impl PeerNetworkManager {
discovery: Arc::new(discovery),
addrv2_handler: Arc::new(AddrV2Handler::new()),
peer_store: Arc::new(peer_store),
reputation_manager,
reputation_manager: Arc::new(Mutex::new(reputation_manager)),
network: config.network,
shutdown_token: CancellationToken::new(),
tasks: Arc::new(Mutex::new(JoinSet::new())),
Expand Down Expand Up @@ -175,7 +169,7 @@ impl PeerNetworkManager {
/// Connect to a specific peer
async fn connect_to_peer(&self, addr: SocketAddr) {
// Check reputation first
if !self.reputation_manager.should_connect_to_peer(&addr).await {
if !self.reputation_manager.lock().await.should_connect_to_peer(&addr).await {
log::warn!("Not connecting to {} due to bad reputation", addr);
return;
}
Expand All @@ -191,7 +185,7 @@ impl PeerNetworkManager {
}

// Record connection attempt
self.reputation_manager.record_connection_attempt(addr).await;
self.reputation_manager.lock().await.record_connection_attempt(addr).await;

let pool = self.pool.clone();
let network = self.network;
Expand All @@ -218,9 +212,6 @@ impl PeerNetworkManager {
Ok(_) => {
log::info!("Successfully connected to {}", addr);

// Record successful connection
reputation_manager.record_successful_connection(addr).await;

// Add to pool
if let Err(e) = pool.add_peer(addr, peer).await {
log::error!("Failed to add peer to pool: {}", e);
Expand Down Expand Up @@ -250,11 +241,9 @@ impl PeerNetworkManager {
log::warn!("Handshake failed with {}: {}", addr, e);
// Update reputation for handshake failure
reputation_manager
.update_reputation(
addr,
misbehavior_scores::INVALID_MESSAGE,
"Handshake failed",
)
.lock()
.await
.update_reputation(addr, ChangeReason::HandshakeFailure)
.await;
// For handshake failures, try again later
tokio::time::sleep(RECONNECT_DELAY).await;
Expand All @@ -265,11 +254,9 @@ impl PeerNetworkManager {
log::debug!("Failed to connect to {}: {}", addr, e);
// Minor reputation penalty for connection failure
reputation_manager
.update_reputation(
addr,
misbehavior_scores::TIMEOUT / 2,
"Connection failed",
)
.lock()
.await
.update_reputation(addr, ChangeReason::ConnectionFailure)
.await;
}
}
Expand All @@ -283,7 +270,7 @@ impl PeerNetworkManager {
pool: Arc<PeerPool>,
addrv2_handler: Arc<AddrV2Handler>,
shutdown_token: CancellationToken,
reputation_manager: Arc<PeerReputationManager>,
reputation_manager: Arc<Mutex<PeerReputationManager>>,
connected_peer_count: Arc<AtomicUsize>,
headers2_disabled: Arc<Mutex<HashSet<SocketAddr>>>,
message_dispatcher: Arc<Mutex<MessageDispatcher>>,
Expand Down Expand Up @@ -457,11 +444,9 @@ impl PeerNetworkManager {
headers2_disabled.lock().await.insert(addr);
// Apply reputation penalty
reputation_manager
.update_reputation(
addr,
misbehavior_scores::INVALID_MESSAGE,
"Headers2 decompression failed",
)
.lock()
.await
.update_reputation(addr, ChangeReason::InvalidHeaders2)
.await;
continue; // Don't forward corrupted message
}
Expand Down Expand Up @@ -515,11 +500,9 @@ impl PeerNetworkManager {
log::debug!("Timeout reading from {}, continuing...", addr);
// Minor reputation penalty for timeout
reputation_manager
.update_reputation(
addr,
misbehavior_scores::TIMEOUT,
"Read timeout",
)
.lock()
.await
.update_reputation(addr, ChangeReason::Timeout)
.await;
continue;
}
Expand All @@ -537,10 +520,11 @@ impl PeerNetworkManager {
);
// Reputation penalty for invalid data
reputation_manager
.lock()
.await
.update_reputation(
addr,
misbehavior_scores::INVALID_TRANSACTION,
"Invalid transaction type in block",
ChangeReason::InvalidTransaction,
)
.await;
} else if error_msg
Expand Down Expand Up @@ -599,7 +583,9 @@ impl PeerNetworkManager {
if conn_duration > Duration::from_secs(3600) {
// 1 hour
reputation_manager
.update_reputation(addr, positive_scores::LONG_UPTIME, "Long connection uptime")
.lock()
.await
.update_reputation(addr, ChangeReason::LongUptime)
.await;
}
});
Expand Down Expand Up @@ -676,7 +662,7 @@ impl PeerNetworkManager {
let known = addrv2_handler.get_known_addresses().await;
let needed = TARGET_PEERS.saturating_sub(count);
// Select best peers based on reputation
let best_peers = reputation_manager.select_best_peers(known, needed * 2).await;
let best_peers = reputation_manager.lock().await.select_best_peers(known, needed * 2).await;
let mut attempted = 0;

for addr in best_peers {
Expand Down Expand Up @@ -750,10 +736,9 @@ impl PeerNetworkManager {
if let Err(e) = peer_guard.send_ping().await {
log::error!("Failed to ping {}: {}", addr, e);
// Update reputation for ping failure
reputation_manager.update_reputation(
reputation_manager.lock().await.update_reputation(
addr,
misbehavior_scores::TIMEOUT,
"Ping failed",
ChangeReason::PingFailure,
).await;
}
}
Expand All @@ -770,7 +755,7 @@ impl PeerNetworkManager {
}

// Save reputation data periodically
if let Err(e) = reputation_manager.save_to_storage(&*peer_store).await {
if let Err(e) = reputation_manager.lock().await.save_to_storage(&*peer_store).await {
log::warn!("Failed to save reputation data: {}", e);
}
}
Expand Down Expand Up @@ -994,8 +979,9 @@ impl PeerNetworkManager {

/// Get reputation information for all peers
pub async fn get_peer_reputations(&self) -> HashMap<SocketAddr, (i32, bool)> {
let reputations = self.reputation_manager.get_all_reputations().await;
reputations.into_iter().map(|(addr, rep)| (addr, (rep.score, rep.is_banned()))).collect()
let mut lock = self.reputation_manager.lock().await;
let reputations = lock.get_all_reputations().await;
reputations.iter().map(|(addr, rep)| (*addr, (rep.score(), rep.is_banned()))).collect()
}

/// Ban a specific peer manually
Expand All @@ -1007,19 +993,17 @@ impl PeerNetworkManager {

// Update reputation to trigger ban
self.reputation_manager
.update_reputation(
*addr,
misbehavior_scores::INVALID_HEADER * 2, // Severe penalty
reason,
)
.lock()
.await
.update_reputation(*addr, ChangeReason::ManuallyBanned)
.await;

Ok(())
}

/// Unban a specific peer
pub async fn unban_peer(&self, addr: &SocketAddr) {
self.reputation_manager.unban_peer(addr).await;
self.reputation_manager.lock().await.unban_peer(addr).await;
}

/// Shutdown the network manager
Expand All @@ -1036,7 +1020,9 @@ impl PeerNetworkManager {
}

// Save reputation data before shutdown
if let Err(e) = self.reputation_manager.save_to_storage(&*self.peer_store).await {
if let Err(e) =
self.reputation_manager.lock().await.save_to_storage(&*self.peer_store).await
{
log::warn!("Failed to save reputation data on shutdown: {}", e);
}

Expand Down Expand Up @@ -1131,64 +1117,62 @@ impl NetworkManager for PeerNetworkManager {
} // end match
} // end send_message

async fn penalize_peer(&self, address: SocketAddr, score_change: i32, reason: &str) {
self.reputation_manager.update_reputation(address, score_change, reason).await;
async fn penalize_peer(&self, address: SocketAddr, reason: ChangeReason) {
self.reputation_manager.lock().await.update_reputation(address, reason).await;
}

async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr, reason: &str) {
match self.disconnect_peer(&address, reason).await {
async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr) {
match self.disconnect_peer(&address, &ChangeReason::InvalidChainLock.to_string()).await {
Ok(()) => {
log::warn!(
"Peer {} disconnected for invalid ChainLock enforcement: {}",
address,
reason
);
log::warn!("Peer {} disconnected for invalid ChainLock enforcement", address,);
}
Err(err) => {
log::error!(
"Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}",
"Failed to disconnect peer {} after invalid ChainLock enforcement: {}",
address,
reason,
err
);
}
}

// Apply misbehavior score and a short temporary ban
self.reputation_manager
.update_reputation(address, misbehavior_scores::INVALID_CHAINLOCK, reason)
.lock()
.await
.update_reputation(address, ChangeReason::InvalidChainLock)
.await;

// Short ban: 10 minutes for relaying invalid ChainLock
self.reputation_manager
.temporary_ban_peer(address, Duration::from_secs(10 * 60), reason)
.lock()
.await
.temporary_ban_peer(address, Duration::from_secs(10 * 60))
.await;
}

async fn penalize_peer_invalid_instantlock(&self, address: SocketAddr, reason: &str) {
async fn penalize_peer_invalid_instantlock(&self, address: SocketAddr) {
// Apply misbehavior score and a short temporary ban
self.reputation_manager
.update_reputation(address, misbehavior_scores::INVALID_INSTANTLOCK, reason)
.lock()
.await
.update_reputation(address, ChangeReason::InvalidInstantLock)
.await;

// Short ban: 10 minutes for relaying invalid InstantLock
self.reputation_manager
.temporary_ban_peer(address, Duration::from_secs(10 * 60), reason)
.lock()
.await
.temporary_ban_peer(address, Duration::from_secs(10 * 60))
.await;

match self.disconnect_peer(&address, reason).await {
match self.disconnect_peer(&address, &ChangeReason::InvalidInstantLock.to_string()).await {
Ok(()) => {
log::warn!(
"Peer {} disconnected for invalid InstantLock enforcement: {}",
address,
reason
);
log::warn!("Peer {} disconnected for invalid InstantLock enforcement", address,);
}
Err(err) => {
log::error!(
"Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}",
"Failed to disconnect peer {} after invalid InstantLock enforcement: {}",
address,
reason,
err
);
}
Expand Down
Loading
Loading