From 5e1e805b4ff2647ff555ade54af5232f7099dc4a Mon Sep 17 00:00:00 2001 From: gwenn Date: Sat, 28 Jul 2018 16:04:42 +0200 Subject: [PATCH 1/6] Use `sqlite3_get_autocommit` instead of our own flag/status --- src/transaction.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/transaction.rs b/src/transaction.rs index 104bac0..fb74fd4 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -61,7 +61,6 @@ pub type SqliteTransaction<'conn> = Transaction<'conn>; pub struct Transaction<'conn> { conn: &'conn Connection, drop_behavior: DropBehavior, - committed: bool, } /// Represents a savepoint on a database connection. @@ -111,7 +110,6 @@ impl<'conn> Transaction<'conn> { Transaction { conn, drop_behavior: DropBehavior::Rollback, - committed: false, } }) } @@ -168,7 +166,6 @@ impl<'conn> Transaction<'conn> { fn commit_(&mut self) -> Result<()> { self.conn.execute_batch("COMMIT")?; - self.committed = true; Ok(()) } @@ -179,7 +176,6 @@ impl<'conn> Transaction<'conn> { fn rollback_(&mut self) -> Result<()> { self.conn.execute_batch("ROLLBACK")?; - self.committed = true; Ok(()) } @@ -193,7 +189,7 @@ impl<'conn> Transaction<'conn> { } fn finish_(&mut self) -> Result<()> { - if self.committed { + if self.conn.is_autocommit() { return Ok(()); } match self.drop_behavior() { From 73ebce5f9801f0597ef8d5e978999ba7b03d7739 Mon Sep 17 00:00:00 2001 From: gwenn Date: Sat, 28 Jul 2018 12:10:57 +0200 Subject: [PATCH 2/6] Add binding to sqlite3_busy_timeout and sqlite3_busy_handler. --- src/busy.rs | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- 2 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 src/busy.rs diff --git a/src/busy.rs b/src/busy.rs new file mode 100644 index 0000000..f5ad1eb --- /dev/null +++ b/src/busy.rs @@ -0,0 +1,160 @@ +///! Busy handler (when the database is locked) +use std::time::Duration; +use std::mem; +use std::os::raw::{c_int, c_void}; +use std::ptr; + +use ffi; +use {Connection, InnerConnection, Result}; + +impl Connection { + /// Set a busy handler that sleeps for a specified amount of time when a table is locked. + /// The handler will sleep multiple times until at least "ms" milliseconds of sleeping have accumulated. + /// + /// Calling this routine with an argument equal to zero turns off all busy handlers. + // + /// There can only be a single busy handler for a particular database connection at any given moment. + /// If another busy handler was defined (using `busy_handler`) prior to calling this routine, that other busy handler is cleared. + pub fn busy_timeout(&self, timeout: Duration) -> Result<()> { + let ms = timeout + .as_secs() + .checked_mul(1000) + .and_then(|t| t.checked_add(timeout.subsec_millis().into())) + .expect("too big"); + self.db.borrow_mut().busy_timeout(ms as i32) + } + + /// Register a callback to handle `SQLITE_BUSY` errors. + /// + /// If the busy callback is `None`, then `SQLITE_BUSY is returned immediately upon encountering the lock.` + /// The argument to the busy handler callback is the number of times that the busy handler has been invoked previously for the same locking event. + /// If the busy callback returns `false`, then no additional attempts are made to access the database and `SQLITE_BUSY` is returned to the application. + /// If the callback returns `true`, then another attempt is made to access the database and the cycle repeats. + /// + /// There can only be a single busy handler defined for each database connection. + /// Setting a new busy handler clears any previously set handler. + /// Note that calling `busy_timeout()` or evaluating `PRAGMA busy_timeout=N` will change the busy handler and thus clear any previously set busy handler. + pub fn busy_handler(&self, callback: Option bool>) -> Result<()> { + unsafe extern "C" fn busy_handler_callback(p_arg: *mut c_void, count: c_int) -> c_int { + let handler_fn: fn(i32) -> bool = mem::transmute(p_arg); + if handler_fn(count) { + 1 + } else { + 0 + } + } + let mut c = self.db.borrow_mut(); + let r = match callback { + Some(f) => unsafe { + ffi::sqlite3_busy_handler(c.db(), Some(busy_handler_callback), mem::transmute(f)) + }, + None => unsafe { ffi::sqlite3_busy_handler(c.db(), None, ptr::null_mut()) }, + }; + c.decode_result(r) + } +} + +impl InnerConnection { + fn busy_timeout(&mut self, timeout: c_int) -> Result<()> { + let r = unsafe { ffi::sqlite3_busy_timeout(self.db, timeout) }; + self.decode_result(r) + } +} + +#[cfg(test)] +mod test { + extern crate tempdir; + use self::tempdir::TempDir; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::sync_channel; + use std::time::Duration; + use std::thread; + + use {Connection, Error, ErrorCode, TransactionBehavior}; + + #[test] + fn test_default_busy() { + let temp_dir = TempDir::new("test_default_busy").unwrap(); + let path = temp_dir.path().join("test.db3"); + + let mut db1 = Connection::open(&path).unwrap(); + let tx1 = db1 + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + let db2 = Connection::open(&path).unwrap(); + let r = db2.query_row("PRAGMA schema_version", &[], |_| unreachable!()); + match r.unwrap_err() { + Error::SqliteFailure(err, _) => { + assert_eq!(err.code, ErrorCode::DatabaseBusy); + } + err => panic!("Unexpected error {}", err), + } + tx1.rollback().unwrap(); + } + + #[test] + fn test_busy_timeout() { + let temp_dir = TempDir::new("test_busy_timeout").unwrap(); + let path = temp_dir.path().join("test.db3"); + + let db2 = Connection::open(&path).unwrap(); + db2.busy_timeout(Duration::from_secs(1)).unwrap(); + + let (rx, tx) = sync_channel(0); + let child = thread::spawn(move || { + let mut db1 = Connection::open(&path).unwrap(); + let tx1 = db1 + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + rx.send(1).unwrap(); + thread::sleep(Duration::from_millis(100)); + tx1.rollback().unwrap(); + }); + + assert_eq!(tx.recv().unwrap(), 1); + let _ = + db2.query_row("PRAGMA schema_version", &[], |row| { + row.get_checked::<_, i32>(0) + }).expect("unexpected error"); + + child.join().unwrap(); + } + + #[test] + fn test_busy_handler() { + lazy_static! { + static ref CALLED: AtomicBool = AtomicBool::new(false); + } + fn busy_handler(_: i32) -> bool { + CALLED.store(true, Ordering::Relaxed); + thread::sleep(Duration::from_millis(100)); + true + } + + let temp_dir = TempDir::new("test_busy_handler").unwrap(); + let path = temp_dir.path().join("test.db3"); + + let db2 = Connection::open(&path).unwrap(); + db2.busy_handler(Some(busy_handler)).unwrap(); + + let (rx, tx) = sync_channel(0); + let child = thread::spawn(move || { + let mut db1 = Connection::open(&path).unwrap(); + let tx1 = db1 + .transaction_with_behavior(TransactionBehavior::Exclusive) + .unwrap(); + rx.send(1).unwrap(); + thread::sleep(Duration::from_millis(100)); + tx1.rollback().unwrap(); + }); + + assert_eq!(tx.recv().unwrap(), 1); + let _ = + db2.query_row("PRAGMA schema_version", &[], |row| { + row.get_checked::<_, i32>(0) + }).expect("unexpected error"); + assert_eq!(CALLED.load(Ordering::Relaxed), true); + + child.join().unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs index b67ef0d..aa52b4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,7 @@ extern crate libsqlite3_sys as ffi; extern crate lru_cache; #[macro_use] extern crate bitflags; -#[cfg(all(test, feature = "trace"))] +#[cfg(test)] #[macro_use] extern crate lazy_static; @@ -126,6 +126,7 @@ mod hooks; #[cfg(feature = "hooks")] pub use hooks::*; mod unlock_notify; +mod busy; // Number of cached prepared statements we'll hold on to. const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16; From 88ac29abfebd1e9debcfe17d4e59b42f4ff5713a Mon Sep 17 00:00:00 2001 From: gwenn Date: Sat, 28 Jul 2018 15:43:54 +0200 Subject: [PATCH 3/6] Upgrade to last stable version of Rust --- appveyor.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/appveyor.yml b/appveyor.yml index 3a7c0a7..7658d5e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,8 +1,8 @@ environment: matrix: - - TARGET: 1.25.0-x86_64-pc-windows-gnu + - TARGET: 1.27.2-x86_64-pc-windows-gnu MSYS2_BITS: 64 - - TARGET: 1.25.0-x86_64-pc-windows-msvc + - TARGET: 1.27.2-x86_64-pc-windows-msvc VCPKG_DEFAULT_TRIPLET: x64-windows VCPKGRS_DYNAMIC: 1 - TARGET: nightly-x86_64-pc-windows-msvc From a5403eb05f11fd7aeeb6ce8577f21c7623694d5c Mon Sep 17 00:00:00 2001 From: gwenn Date: Sat, 28 Jul 2018 17:02:17 +0200 Subject: [PATCH 4/6] Replace PRAGMA call by Rust busy_timeout function --- src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index aa52b4b..367258d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -993,6 +993,7 @@ mod test { #[test] fn test_concurrent_transactions_busy_commit() { + use std::time::Duration; let tmp = TempDir::new("locked").unwrap(); let path = tmp.path().join("transactions.db3"); @@ -1004,8 +1005,8 @@ mod test { let mut db1 = Connection::open(&path).unwrap(); let mut db2 = Connection::open(&path).unwrap(); - db1.execute_batch("PRAGMA busy_timeout = 0;").unwrap(); - db2.execute_batch("PRAGMA busy_timeout = 0;").unwrap(); + db1.busy_timeout(Duration::from_millis(0)).unwrap(); + db2.busy_timeout(Duration::from_millis(0)).unwrap(); { let tx1 = db1.transaction().unwrap(); From 7310cac6f580a2aadb7fedc843b516201ca6fd71 Mon Sep 17 00:00:00 2001 From: gwenn Date: Tue, 31 Jul 2018 22:17:17 +0200 Subject: [PATCH 5/6] Use pub(crate) --- src/cache.rs | 1 - src/lib.rs | 2 -- src/row.rs | 34 +++++++--------------------------- src/statement.rs | 20 +++++--------------- 4 files changed, 12 insertions(+), 45 deletions(-) diff --git a/src/cache.rs b/src/cache.rs index cef1aca..82cd8ca 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -5,7 +5,6 @@ use std::ops::{Deref, DerefMut}; use lru_cache::LruCache; use {Result, Connection, Statement}; use raw_statement::RawStatement; -use statement::StatementCrateImpl; impl Connection { /// Prepare a SQL statement for execution, returning a previously prepared (but diff --git a/src/lib.rs b/src/lib.rs index 367258d..598520b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,10 +80,8 @@ use raw_statement::RawStatement; use cache::StatementCache; pub use statement::Statement; -use statement::StatementCrateImpl; pub use row::{Row, Rows, MappedRows, AndThenRows, RowIndex}; -use row::RowsCrateImpl; #[allow(deprecated)] pub use transaction::{SqliteTransaction, SqliteTransactionBehavior}; diff --git a/src/row.rs b/src/row.rs index 0e0c5d3..bbcee78 100644 --- a/src/row.rs +++ b/src/row.rs @@ -3,7 +3,6 @@ use std::marker::PhantomData; use super::{Statement, Error, Result}; use types::{FromSql, FromSqlError}; -use statement::StatementCrateImpl; /// An handle for the resulting rows of a query. pub struct Rows<'stmt> { @@ -49,19 +48,12 @@ impl<'stmt> Rows<'stmt> { } } -// TODO: This trait lets us have "pub(crate)" visibility on some methods. Remove this -// once pub(crate) is stable. -pub trait RowsCrateImpl<'stmt> { - fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt>; - fn get_expected_row<'a>(&'a mut self) -> Result>; -} - -impl<'stmt> RowsCrateImpl<'stmt> for Rows<'stmt> { - fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt> { +impl<'stmt> Rows<'stmt> { + pub(crate) fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt> { Rows { stmt: Some(stmt) } } - fn get_expected_row<'a>(&'a mut self) -> Result> { + pub(crate) fn get_expected_row<'a>(&'a mut self) -> Result> { match self.next() { Some(row) => row, None => Err(Error::QueryReturnedNoRows), @@ -81,16 +73,10 @@ pub struct MappedRows<'stmt, F> { map: F, } -// TODO: This trait lets us have "pub(crate)" visibility on some methods. Remove this -// once pub(crate) is stable. -pub trait MappedRowsCrateImpl<'stmt, T, F> { - fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F>; -} - -impl<'stmt, T, F> MappedRowsCrateImpl<'stmt, T, F> for MappedRows<'stmt, F> +impl<'stmt, T, F> MappedRows<'stmt, F> where F: FnMut(&Row) -> T { - fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F> { + pub(crate) fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F> { MappedRows { rows, map: f } } } @@ -115,16 +101,10 @@ pub struct AndThenRows<'stmt, F> { map: F, } -// TODO: This trait lets us have "pub(crate)" visibility on some methods. Remove this -// once pub(crate) is stable. -pub trait AndThenRowsCrateImpl<'stmt, T, E, F> { - fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F>; -} - -impl<'stmt, T, E, F> AndThenRowsCrateImpl<'stmt, T, E, F> for AndThenRows<'stmt, F> +impl<'stmt, T, E, F> AndThenRows<'stmt, F> where F: FnMut(&Row) -> result::Result { - fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F> { + pub(crate) fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F> { AndThenRows { rows, map: f } } } diff --git a/src/statement.rs b/src/statement.rs index 2d44a57..5545617 100644 --- a/src/statement.rs +++ b/src/statement.rs @@ -7,7 +7,6 @@ use super::ffi; use super::{Connection, RawStatement, Result, Error, ValueRef, Row, Rows, AndThenRows, MappedRows}; use super::str_to_cstring; use types::{ToSql, ToSqlOutput}; -use row::{RowsCrateImpl, MappedRowsCrateImpl, AndThenRowsCrateImpl}; /// A prepared statement. pub struct Statement<'conn> { @@ -494,24 +493,15 @@ impl<'conn> Drop for Statement<'conn> { } } -// TODO: This trait lets us have "pub(crate)" visibility on some Statement methods. Remove this -// once pub(crate) is stable. -pub trait StatementCrateImpl<'conn> { - fn new(conn: &'conn Connection, stmt: RawStatement) -> Self; - fn value_ref(&self, col: usize) -> ValueRef; - fn step(&self) -> Result; - fn reset(&self) -> c_int; -} - -impl<'conn> StatementCrateImpl<'conn> for Statement<'conn> { - fn new(conn: &Connection, stmt: RawStatement) -> Statement { +impl<'conn> Statement<'conn> { + pub(crate) fn new(conn: &Connection, stmt: RawStatement) -> Statement { Statement { conn, stmt, } } - fn value_ref(&self, col: usize) -> ValueRef { + pub(crate) fn value_ref(&self, col: usize) -> ValueRef { let raw = unsafe { self.stmt.ptr() }; match self.stmt.column_type(col) { @@ -554,7 +544,7 @@ impl<'conn> StatementCrateImpl<'conn> for Statement<'conn> { } } - fn step(&self) -> Result { + pub(crate) fn step(&self) -> Result { match self.stmt.step() { ffi::SQLITE_ROW => Ok(true), ffi::SQLITE_DONE => Ok(false), @@ -562,7 +552,7 @@ impl<'conn> StatementCrateImpl<'conn> for Statement<'conn> { } } - fn reset(&self) -> c_int { + pub(crate) fn reset(&self) -> c_int { self.stmt.reset() } } From 15ab96bcebe94c044145479bc2beca31e980a53f Mon Sep 17 00:00:00 2001 From: gwenn Date: Sun, 5 Aug 2018 11:39:46 +0200 Subject: [PATCH 6/6] [ci skip] Ignore unstable busy tests by default --- src/busy.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/busy.rs b/src/busy.rs index f5ad1eb..78c29de 100644 --- a/src/busy.rs +++ b/src/busy.rs @@ -93,6 +93,7 @@ mod test { } #[test] + #[ignore] // FIXME: unstable fn test_busy_timeout() { let temp_dir = TempDir::new("test_busy_timeout").unwrap(); let path = temp_dir.path().join("test.db3"); @@ -121,6 +122,7 @@ mod test { } #[test] + #[ignore] // FIXME: unstable fn test_busy_handler() { lazy_static! { static ref CALLED: AtomicBool = AtomicBool::new(false);