Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions payjoin-cli/src/app/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ impl AppTrait for App {
.create_v1_post_request();
let http = http_agent(&self.config)?;
let body = String::from_utf8(req.body.clone()).unwrap();
println!("Sending fallback request to {}", &req.url);
let response = http
.post(req.url)
.header("Content-Type", req.content_type)
.body(body.clone())
.send()
.await
.with_context(|| "HTTP request failed")?;

let fallback_tx = Psbt::from_str(&body)
.map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
.extract_tx()?;
Expand All @@ -86,13 +79,41 @@ impl AppTrait for App {
"Sent fallback transaction hex: {:#}",
payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
);
let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
tracing::debug!("Error processing response: {e:?}");
anyhow!("Failed to process response {e}")
})?;
println!("Sending fallback request to {}", &req.url);

self.process_pj_response(psbt)?;
Ok(())
let response = match http
.post(req.url)
.header("Content-Type", req.content_type)
.body(body.clone())
.send()
.await
{
Ok(resp) => resp,
Err(e) => {
tracing::debug!("HTTP request failed: {e:?}");
println!("Payjoin request failed: {e}. Broadcasting fallback transaction.");
let txid = self.wallet().broadcast_tx(&fallback_tx)?;
println!("Fallback transaction broadcasted. TXID: {txid}");
return Ok(());
}
};

// Try to process the payjoin response
match ctx.process_response(&response.bytes().await?) {
Ok(psbt) => {
self.process_pj_response(psbt)?;
Ok(())
}
Err(e) => {
tracing::debug!("Error processing response: {e:?}");
println!("Payjoin failed: {e}. Broadcasting fallback transaction.");

// Broadcast the fallback transaction
let txid = self.wallet().broadcast_tx(&fallback_tx)?;
println!("Fallback transaction broadcasted. TXID: {txid}");
Ok(())
}
}
}

#[allow(clippy::incompatible_msrv)]
Expand Down
201 changes: 163 additions & 38 deletions payjoin-cli/src/app/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl AppTrait for App {
PjParam::V1(pj_param) => {
use std::str::FromStr;

use payjoin::send::ResponseError;

let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
let (req, ctx) = payjoin::send::v1::SenderBuilder::from_parts(
psbt,
Expand Down Expand Up @@ -196,52 +198,71 @@ impl AppTrait for App {
"Sent fallback transaction hex: {:#}",
payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
);
let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
tracing::debug!("Error processing response: {e:?}");
anyhow!("Failed to process response {e}")
})?;

self.process_pj_response(psbt)?;
Ok(())
// Try to process the payjoin response
match ctx.process_response(&response.bytes().await?) {
Ok(psbt) => {
println!("Payjoin proposal received, processing...");
self.process_pj_response(psbt)?;
Ok(())
}
Err(e) => {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should be falling back on any Error. We should be doing something similar to what you did below and only fallback on a FatalError -- an irrecoverable protocol deviation.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which of the error types/variant is safe for fallback broadcast .

these two ResponseError::WellKnown() | ResponseError::Unrecognized are the ideal ones IMO. however from https://github.com/bitcoin/bips/blob/master/bip-0078.mediawiki#user-content-Receivers_well_known_errors , i don't think every variant needs fallback broadcasted

Copy link
Copy Markdown
Collaborator

@arminsabouri arminsabouri Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think every variant needs fallback broadcasted

Why not? You could argue unavailable doesn't require a fallback to be broadcasted.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think we should this @zealsham . Lets double check this against the spec

tracing::debug!("Error processing response: {e:?}");
match e {
ResponseError::WellKnown(_) | ResponseError::Validation { .. } => {
println!("Payjoin failed: {e}. Broadcasting fallback transaction.");
let txid = self.wallet().broadcast_tx(&fallback_tx)?;
println!("Fallback transaction broadcasted. TXID: {txid}");
Ok(())
}
ResponseError::Unrecognized { .. } =>
Err(anyhow!("Payjoin response validation failed: {e}")),
}
}
}
}
PjParam::V2(pj_param) => {
let receiver_pubkey = pj_param.receiver_pubkey();
let sender_state =
self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
let (sender_state, persister, fallback_tx) =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to pass fallback_tx around the app. IIRC with the persister + session id you can always replay the session and obtain the fallback via the session history.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still think we should this @zealsham . Lets double check this against the spec

Yes, This handles V1 sender flow, How much of a reward it is depends on how much V1 payjoin transaction are occuring. Since payjoin-cli is a reference example, we should demonstrate all cases even if a bulk of payjoin-TX would be v2

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you answered a question from a different thread. Would still appreaciate an answer on this one

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, my mistake @arminsabouri . The fallback is gotten from replay_event just once. i could avoid that in the None branch but that would require me running the replay_session_event again. so as a trade off i kept the fallback in hand to avoid another redundant replay_event_ call.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaying is not a computationally heavy task (there are bounded number of events emitted from the API) and if it reduces code complexity I think this is worth it. To some extent we dont even need a persister object either -- thats a seperate problem.

Something like #1197 would also help in this situtation. i.e the fallback is always available on the top most sender struct. Sender.fallback_tx -> bitcoin::Transaction . For now I think its fine to just replay the events when we need to fallback.

match self.db.get_send_session_ids()?.into_iter().find_map(|session_id| {
let session_receiver_pubkey = self
.db
.get_send_session_receiver_pk(&session_id)
.expect("Receiver pubkey should exist if session id exists");
if session_receiver_pubkey == *receiver_pubkey {
let sender_persister =
SenderPersister::from_id(self.db.clone(), session_id);
let (send_session, _) = replay_sender_event_log(&sender_persister)
.map_err(|e| anyhow!("Failed to replay sender event log: {:?}", e))
.ok()?;

Some((send_session, sender_persister))
let (send_session, history) =
replay_sender_event_log(&sender_persister)
.map_err(|e| {
anyhow!("Failed to replay sender event log: {:?}", e)
})
.ok()?;

Some((send_session, sender_persister, history.fallback_tx()))
} else {
None
}
});

let (sender_state, persister) = match sender_state {
Some((sender_state, persister)) => (sender_state, persister),
None => {
let persister =
SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
let sender =
SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
.build_recommended(fee_rate)?
.save(&persister)?;

(SendSession::WithReplyKey(sender), persister)
}
};
}) {
Some((sender_state, persister, fallback_tx)) =>
(sender_state, persister, fallback_tx),
None => {
let persister =
SenderPersister::new(self.db.clone(), receiver_pubkey.clone())?;
let psbt = self.create_original_psbt(&address, amount, fee_rate)?;
let fallback_tx = psbt.clone().extract_tx().map_err(|e| {
anyhow!("Failed to extract fallback transaction: {}", e)
})?;
let sender =
SenderBuilder::from_parts(psbt, pj_param, &address, Some(amount))
.build_recommended(fee_rate)?
.save(&persister)?;

(SendSession::WithReplyKey(sender), persister, fallback_tx)
}
};
let mut interrupt = self.interrupt.clone();
tokio::select! {
_ = self.process_sender_session(sender_state, &persister) => return Ok(()),
_ = self.process_sender_session(sender_state, &persister, &fallback_tx) => return Ok(()),
_ = interrupt.changed() => {
println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
return Err(anyhow!("Interrupted"))
Expand Down Expand Up @@ -309,10 +330,13 @@ impl AppTrait for App {
for session_id in send_session_ids {
let sender_persiter = SenderPersister::from_id(self.db.clone(), session_id.clone());
match replay_sender_event_log(&sender_persiter) {
Ok((sender_state, _)) => {
Ok((sender_state, history)) => {
let fallback_tx = history.fallback_tx();
let self_clone = self.clone();
tasks.push(tokio::spawn(async move {
self_clone.process_sender_session(sender_state, &sender_persiter).await
self_clone
.process_sender_session(sender_state, &sender_persiter, &fallback_tx)
.await
}));
}
Err(e) => {
Expand Down Expand Up @@ -479,17 +503,30 @@ impl App {
&self,
session: SendSession,
persister: &SenderPersister,
fallback_tx: &payjoin::bitcoin::Transaction,
) -> Result<()> {
match session {
SendSession::WithReplyKey(context) =>
self.post_original_proposal(context, persister).await?,
SendSession::PollingForProposal(context) =>
self.get_proposed_payjoin_psbt(context, persister).await?,
SendSession::Closed(SenderSessionOutcome::Success(proposal)) => {
self.process_pj_response(proposal)?;
return Ok(());
}
_ => return Err(anyhow!("Unexpected sender state")),
SendSession::Closed(outcome) => match outcome {
SenderSessionOutcome::Failure | SenderSessionOutcome::Cancel => {
let label = if matches!(outcome, SenderSessionOutcome::Failure) {
"closed without success"
} else {
"canceled"
};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This label seems unnecessary. Is this log read anywhere?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The label isn't logged , just printed. it should be logged, the label basically tells why the fallback tx was broadcasted, it exist to seperate cancel scenario from failure scenario

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok in that case I would just remove "success" from the closed message.Also as a developer I wouldn't really know the difference between closed / cancled from this message. A better word may be "failed". Or event better mention why it failed. e.g display the error message.

tracing::info!("Payjoin session {label}, broadcasting fallback");
let txid = self.wallet().broadcast_tx(fallback_tx)?;
tracing::info!("Fallback transaction broadcasted. TXID: {txid}");
return Ok(());
}
SenderSessionOutcome::Success(proposal) => {
self.process_pj_response(proposal)?;
return Ok(());
}
},
}
Ok(())
}
Expand Down Expand Up @@ -870,3 +907,91 @@ fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
None => anyhow!("No HTTP response: {}", e),
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use payjoin::bitcoin::{Amount, FeeRate};
use payjoin::send::v2::{SendSession, SessionOutcome as SenderSessionOutcome};
use payjoin::HpkeKeyPair;
use payjoin_test_utils::corepc_node;
use tempfile::tempdir;
use tokio::sync::watch;

use super::ohttp::RelayManager;
use super::App;
use crate::app::config::{BitcoindConfig, Config};
use crate::app::wallet::BitcoindWallet;
use crate::db::v2::SenderPersister;
use crate::db::Database;

async fn test_app(
bitcoind: &corepc_node::Node,
wallet_name: &str,
) -> (App, BitcoindWallet, tempfile::TempDir) {
let rpchost = url::Url::parse(&format!(
"http://{}/wallet/{}",
bitcoind.params.rpc_socket, wallet_name
))
.expect("valid url");
let bitcoind_config = BitcoindConfig {
rpchost: rpchost.clone(),
cookie: Some(bitcoind.params.cookie_file.clone()),
rpcuser: String::new(),
rpcpassword: String::new(),
};
let wallet = BitcoindWallet::new(&bitcoind_config).await.expect("wallet should connect");
let temp_dir = tempdir().expect("temp dir");
let db = Arc::new(Database::create(temp_dir.path().join("test.db")).expect("db created"));
let (_, interrupt_rx) = watch::channel(());
let config = Config {
db_path: temp_dir.path().to_path_buf(),
max_fee_rate: None,
bitcoind: bitcoind_config,
version: None,
#[cfg(feature = "_manual-tls")]
root_certificate: None,
#[cfg(feature = "_manual-tls")]
certificate_key: None,
};
let app = App {
config,
db,
wallet: wallet.clone(),
interrupt: interrupt_rx,
relay_manager: Arc::new(Mutex::new(RelayManager::new())),
};
(app, wallet, temp_dir)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn sender_processes_fallback_on_failure() {
let (bitcoind, _sender, _receiver) =
payjoin_test_utils::init_bitcoind_sender_receiver(None, None)
.expect("bitcoind should start");
let (app, wallet, _temp_dir) = test_app(&bitcoind, "sender").await;

let address = wallet.get_new_address().expect("address");
let mut outputs = HashMap::new();
outputs.insert(address.to_string(), Amount::from_sat(10_000));
let psbt = wallet
.create_psbt(outputs, FeeRate::BROADCAST_MIN, false)
.expect("psbt should be created");
let fallback_tx = psbt.extract_tx_unchecked_fee_rate();

let receiver_pubkey = HpkeKeyPair::gen_keypair().1;
let persister =
SenderPersister::new(app.db.clone(), receiver_pubkey).expect("persister created");

let session = SendSession::Closed(SenderSessionOutcome::Failure);
let result = app.process_sender_session(session, &persister, &fallback_tx).await;
assert!(result.is_ok(), "process_sender_session should succeed: {result:?}");

let tx_in_wallet = wallet
.get_raw_transaction(&fallback_tx.compute_txid())
.expect("rpc call should succeed");
assert!(tx_in_wallet.is_some(), "fallback tx should be in the mempool after broadcast");
}
}
Loading
Loading