Skip to content

Commit 87c3c30

Browse files
committed
libsql: Remove lazy pulling optimization
The optimization attempts to reduce pulling when an application is writing. However, the logic is embedded in prepare(), which is totally wrong because a statement can be reused. Let's remove the optimization as incorrect. Refs: tursodatabase/turso-cloud#5
1 parent 7fbef83 commit 87c3c30

File tree

3 files changed

+29
-40
lines changed

3 files changed

+29
-40
lines changed

libsql/src/database.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,6 @@ impl Database {
719719
read_your_writes: *read_your_writes,
720720
context: db.sync_ctx.clone().unwrap(),
721721
state: std::sync::Arc::new(Mutex::new(State::Init)),
722-
needs_pull: std::sync::atomic::AtomicBool::new(false).into(),
723722
};
724723

725724
let conn = std::sync::Arc::new(synced);

libsql/src/sync/connection.rs

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ use crate::{
88
sync::SyncContext,
99
BatchRows, Error, Result, Statement, Transaction, TransactionBehavior,
1010
};
11-
use std::sync::{
12-
atomic::{AtomicBool, Ordering},
13-
Arc,
14-
};
11+
use std::sync::Arc;
1512
use std::time::Duration;
1613
use tokio::sync::Mutex;
1714

@@ -24,7 +21,6 @@ pub struct SyncedConnection {
2421
pub read_your_writes: bool,
2522
pub context: Arc<Mutex<SyncContext>>,
2623
pub state: Arc<Mutex<State>>,
27-
pub needs_pull: Arc<AtomicBool>,
2824
}
2925

3026
impl SyncedConnection {
@@ -110,55 +106,49 @@ impl Conn for SyncedConnection {
110106

111107
async fn execute_batch(&self, sql: &str) -> Result<BatchRows> {
112108
if self.should_execute_local(sql).await? {
113-
if self.needs_pull.load(Ordering::Relaxed) {
109+
self.local.execute_batch(sql)
110+
} else {
111+
let result = self.remote.execute_batch(sql).await;
112+
if self.read_your_writes {
114113
let mut context = self.context.lock().await;
115114
crate::sync::try_pull(&mut context, &self.local).await?;
116-
self.needs_pull.store(false, Ordering::Relaxed);
117115
}
118-
self.local.execute_batch(sql)
119-
} else {
120-
self.remote.execute_batch(sql).await
116+
result
121117
}
122118
}
123119

124120
async fn execute_transactional_batch(&self, sql: &str) -> Result<BatchRows> {
125121
if self.should_execute_local(sql).await? {
126-
if self.needs_pull.load(Ordering::Relaxed) {
127-
let mut context = self.context.lock().await;
128-
crate::sync::try_pull(&mut context, &self.local).await?;
129-
self.needs_pull.store(false, Ordering::Relaxed);
130-
}
131122
self.local.execute_transactional_batch(sql)?;
132123
Ok(BatchRows::empty())
133124
} else {
134-
self.remote.execute_transactional_batch(sql).await
125+
let result = self.remote.execute_transactional_batch(sql).await;
126+
if self.read_your_writes {
127+
let mut context = self.context.lock().await;
128+
crate::sync::try_pull(&mut context, &self.local).await?;
129+
}
130+
result
135131
}
136132
}
137133

138134
async fn prepare(&self, sql: &str) -> Result<Statement> {
139135
if self.should_execute_local(sql).await? {
140-
let stmt = Statement {
136+
Ok(Statement {
141137
inner: Box::new(LibsqlStmt(self.local.prepare(sql)?)),
138+
})
139+
} else {
140+
let stmt = Statement {
141+
inner: Box::new(self.remote.prepare(sql).await?),
142142
};
143143

144144
Ok(Statement {
145145
inner: Box::new(SyncedStatement {
146146
conn: self.local.clone(),
147147
inner: stmt,
148148
context: self.context.clone(),
149-
needs_pull: self.needs_pull.clone(),
149+
pull_after: self.read_your_writes,
150150
}),
151151
})
152-
} else {
153-
let stmt = Statement {
154-
inner: Box::new(self.remote.prepare(sql).await?),
155-
};
156-
157-
if self.read_your_writes {
158-
self.needs_pull.store(true, Ordering::Relaxed);
159-
}
160-
161-
Ok(stmt)
162152
}
163153
}
164154

libsql/src/sync/statement.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use crate::{
44
statement::Stmt,
55
sync::SyncContext, Column, Result, Rows, Statement,
66
};
7-
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
7+
use std::sync::Arc;
88
use tokio::sync::Mutex;
99

1010
pub struct SyncedStatement {
1111
pub conn: local::Connection,
1212
pub inner: Statement,
1313
pub context: Arc<Mutex<SyncContext>>,
14-
pub needs_pull: Arc<AtomicBool>,
14+
pub pull_after: bool,
1515
}
1616

1717
#[async_trait::async_trait]
@@ -21,30 +21,30 @@ impl Stmt for SyncedStatement {
2121
}
2222

2323
async fn execute(&self, params: &Params) -> Result<usize> {
24-
if self.needs_pull.load(Ordering::Relaxed) {
24+
let result = self.inner.execute(params).await;
25+
if self.pull_after {
2526
let mut context = self.context.lock().await;
2627
crate::sync::try_pull(&mut context, &self.conn).await?;
27-
self.needs_pull.store(false, Ordering::Relaxed);
2828
}
29-
self.inner.execute(params).await
29+
result
3030
}
3131

3232
async fn query(&self, params: &Params) -> Result<Rows> {
33-
if self.needs_pull.load(Ordering::Relaxed) {
33+
let result = self.inner.query(params).await;
34+
if self.pull_after {
3435
let mut context = self.context.lock().await;
3536
crate::sync::try_pull(&mut context, &self.conn).await?;
36-
self.needs_pull.store(false, Ordering::Relaxed);
3737
}
38-
self.inner.query(params).await
38+
result
3939
}
4040

4141
async fn run(&self, params: &Params) -> Result<()> {
42-
if self.needs_pull.load(Ordering::Relaxed) {
42+
let result = self.inner.run(params).await;
43+
if self.pull_after {
4344
let mut context = self.context.lock().await;
4445
crate::sync::try_pull(&mut context, &self.conn).await?;
45-
self.needs_pull.store(false, Ordering::Relaxed);
4646
}
47-
self.inner.run(params).await
47+
result
4848
}
4949

5050
fn interrupt(&self) -> Result<()> {

0 commit comments

Comments
 (0)