mirror of
https://github.com/isar/rusqlite.git
synced 2025-08-21 05:41:06 +08:00
Merge remote-tracking branch 'jgallagher/master' into vtab
This commit is contained in:
160
src/busy.rs
Normal file
160
src/busy.rs
Normal file
@@ -0,0 +1,160 @@
|
||||
///! Busy handler (when the database is locked)
|
||||
use std::time::Duration;
|
||||
use std::mem;
|
||||
use std::os::raw::{c_int, c_void};
|
||||
use std::ptr;
|
||||
|
||||
use ffi;
|
||||
use {Connection, InnerConnection, Result};
|
||||
|
||||
impl Connection {
|
||||
/// Set a busy handler that sleeps for a specified amount of time when a table is locked.
|
||||
/// The handler will sleep multiple times until at least "ms" milliseconds of sleeping have accumulated.
|
||||
///
|
||||
/// Calling this routine with an argument equal to zero turns off all busy handlers.
|
||||
//
|
||||
/// There can only be a single busy handler for a particular database connection at any given moment.
|
||||
/// If another busy handler was defined (using `busy_handler`) prior to calling this routine, that other busy handler is cleared.
|
||||
pub fn busy_timeout(&self, timeout: Duration) -> Result<()> {
|
||||
let ms = timeout
|
||||
.as_secs()
|
||||
.checked_mul(1000)
|
||||
.and_then(|t| t.checked_add(timeout.subsec_millis().into()))
|
||||
.expect("too big");
|
||||
self.db.borrow_mut().busy_timeout(ms as i32)
|
||||
}
|
||||
|
||||
/// Register a callback to handle `SQLITE_BUSY` errors.
|
||||
///
|
||||
/// If the busy callback is `None`, then `SQLITE_BUSY is returned immediately upon encountering the lock.`
|
||||
/// The argument to the busy handler callback is the number of times that the busy handler has been invoked previously for the same locking event.
|
||||
/// If the busy callback returns `false`, then no additional attempts are made to access the database and `SQLITE_BUSY` is returned to the application.
|
||||
/// If the callback returns `true`, then another attempt is made to access the database and the cycle repeats.
|
||||
///
|
||||
/// There can only be a single busy handler defined for each database connection.
|
||||
/// Setting a new busy handler clears any previously set handler.
|
||||
/// Note that calling `busy_timeout()` or evaluating `PRAGMA busy_timeout=N` will change the busy handler and thus clear any previously set busy handler.
|
||||
pub fn busy_handler(&self, callback: Option<fn(i32) -> bool>) -> Result<()> {
|
||||
unsafe extern "C" fn busy_handler_callback(p_arg: *mut c_void, count: c_int) -> c_int {
|
||||
let handler_fn: fn(i32) -> bool = mem::transmute(p_arg);
|
||||
if handler_fn(count) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
let mut c = self.db.borrow_mut();
|
||||
let r = match callback {
|
||||
Some(f) => unsafe {
|
||||
ffi::sqlite3_busy_handler(c.db(), Some(busy_handler_callback), mem::transmute(f))
|
||||
},
|
||||
None => unsafe { ffi::sqlite3_busy_handler(c.db(), None, ptr::null_mut()) },
|
||||
};
|
||||
c.decode_result(r)
|
||||
}
|
||||
}
|
||||
|
||||
impl InnerConnection {
|
||||
fn busy_timeout(&mut self, timeout: c_int) -> Result<()> {
|
||||
let r = unsafe { ffi::sqlite3_busy_timeout(self.db, timeout) };
|
||||
self.decode_result(r)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
extern crate tempdir;
|
||||
use self::tempdir::TempDir;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::time::Duration;
|
||||
use std::thread;
|
||||
|
||||
use {Connection, Error, ErrorCode, TransactionBehavior};
|
||||
|
||||
#[test]
|
||||
fn test_default_busy() {
|
||||
let temp_dir = TempDir::new("test_default_busy").unwrap();
|
||||
let path = temp_dir.path().join("test.db3");
|
||||
|
||||
let mut db1 = Connection::open(&path).unwrap();
|
||||
let tx1 = db1
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)
|
||||
.unwrap();
|
||||
let db2 = Connection::open(&path).unwrap();
|
||||
let r = db2.query_row("PRAGMA schema_version", &[], |_| unreachable!());
|
||||
match r.unwrap_err() {
|
||||
Error::SqliteFailure(err, _) => {
|
||||
assert_eq!(err.code, ErrorCode::DatabaseBusy);
|
||||
}
|
||||
err => panic!("Unexpected error {}", err),
|
||||
}
|
||||
tx1.rollback().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_busy_timeout() {
|
||||
let temp_dir = TempDir::new("test_busy_timeout").unwrap();
|
||||
let path = temp_dir.path().join("test.db3");
|
||||
|
||||
let db2 = Connection::open(&path).unwrap();
|
||||
db2.busy_timeout(Duration::from_secs(1)).unwrap();
|
||||
|
||||
let (rx, tx) = sync_channel(0);
|
||||
let child = thread::spawn(move || {
|
||||
let mut db1 = Connection::open(&path).unwrap();
|
||||
let tx1 = db1
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)
|
||||
.unwrap();
|
||||
rx.send(1).unwrap();
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
tx1.rollback().unwrap();
|
||||
});
|
||||
|
||||
assert_eq!(tx.recv().unwrap(), 1);
|
||||
let _ =
|
||||
db2.query_row("PRAGMA schema_version", &[], |row| {
|
||||
row.get_checked::<_, i32>(0)
|
||||
}).expect("unexpected error");
|
||||
|
||||
child.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_busy_handler() {
|
||||
lazy_static! {
|
||||
static ref CALLED: AtomicBool = AtomicBool::new(false);
|
||||
}
|
||||
fn busy_handler(_: i32) -> bool {
|
||||
CALLED.store(true, Ordering::Relaxed);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
true
|
||||
}
|
||||
|
||||
let temp_dir = TempDir::new("test_busy_handler").unwrap();
|
||||
let path = temp_dir.path().join("test.db3");
|
||||
|
||||
let db2 = Connection::open(&path).unwrap();
|
||||
db2.busy_handler(Some(busy_handler)).unwrap();
|
||||
|
||||
let (rx, tx) = sync_channel(0);
|
||||
let child = thread::spawn(move || {
|
||||
let mut db1 = Connection::open(&path).unwrap();
|
||||
let tx1 = db1
|
||||
.transaction_with_behavior(TransactionBehavior::Exclusive)
|
||||
.unwrap();
|
||||
rx.send(1).unwrap();
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
tx1.rollback().unwrap();
|
||||
});
|
||||
|
||||
assert_eq!(tx.recv().unwrap(), 1);
|
||||
let _ =
|
||||
db2.query_row("PRAGMA schema_version", &[], |row| {
|
||||
row.get_checked::<_, i32>(0)
|
||||
}).expect("unexpected error");
|
||||
assert_eq!(CALLED.load(Ordering::Relaxed), true);
|
||||
|
||||
child.join().unwrap();
|
||||
}
|
||||
}
|
40
src/lib.rs
40
src/lib.rs
@@ -56,7 +56,7 @@ extern crate libsqlite3_sys as ffi;
|
||||
extern crate lru_cache;
|
||||
#[macro_use]
|
||||
extern crate bitflags;
|
||||
#[cfg(any(all(test, feature = "trace"), feature = "vtab"))]
|
||||
#[cfg(any(test, feature = "vtab"))]
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
@@ -126,6 +126,7 @@ mod hooks;
|
||||
#[cfg(feature = "hooks")]
|
||||
pub use hooks::*;
|
||||
mod unlock_notify;
|
||||
mod busy;
|
||||
#[cfg(feature = "vtab")]
|
||||
pub mod vtab;
|
||||
#[cfg(any(feature = "functions", feature = "vtab"))]
|
||||
@@ -627,7 +628,7 @@ bitflags! {
|
||||
|
||||
impl Default for OpenFlags {
|
||||
fn default() -> OpenFlags {
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE |
|
||||
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE |
|
||||
OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI
|
||||
}
|
||||
}
|
||||
@@ -994,6 +995,41 @@ mod test {
|
||||
Connection::open_in_memory().unwrap()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_concurrent_transactions_busy_commit() {
|
||||
let tmp = TempDir::new("locked").unwrap();
|
||||
let path = tmp.path().join("transactions.db3");
|
||||
|
||||
Connection::open(&path).expect("create temp db").execute_batch("
|
||||
BEGIN; CREATE TABLE foo(x INTEGER);
|
||||
INSERT INTO foo VALUES(42); END;")
|
||||
.expect("create temp db");
|
||||
|
||||
let mut db1 = Connection::open(&path).unwrap();
|
||||
let mut db2 = Connection::open(&path).unwrap();
|
||||
|
||||
db1.execute_batch("PRAGMA busy_timeout = 0;").unwrap();
|
||||
db2.execute_batch("PRAGMA busy_timeout = 0;").unwrap();
|
||||
|
||||
{
|
||||
let tx1 = db1.transaction().unwrap();
|
||||
let tx2 = db2.transaction().unwrap();
|
||||
|
||||
// SELECT first makes sqlite lock with a shared lock
|
||||
let _ = tx1.query_row("SELECT x FROM foo LIMIT 1", &[], |_| ()).unwrap();
|
||||
let _ = tx2.query_row("SELECT x FROM foo LIMIT 1", &[], |_| ()).unwrap();
|
||||
|
||||
tx1.execute("INSERT INTO foo VALUES(?1)", &[&1]).unwrap();
|
||||
let _ = tx2.execute("INSERT INTO foo VALUES(?1)", &[&2]);
|
||||
|
||||
let _ = tx1.commit();
|
||||
let _ = tx2.commit();
|
||||
}
|
||||
|
||||
let _ = db1.transaction().expect("commit should have closed transaction");
|
||||
let _ = db2.transaction().expect("commit should have closed transaction");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(rustfmt, rustfmt_skip)]
|
||||
fn test_persistence() {
|
||||
|
@@ -167,8 +167,9 @@ impl<'conn> Transaction<'conn> {
|
||||
}
|
||||
|
||||
fn commit_(&mut self) -> Result<()> {
|
||||
self.conn.execute_batch("COMMIT")?;
|
||||
self.committed = true;
|
||||
self.conn.execute_batch("COMMIT")
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A convenience method which consumes and rolls back a transaction.
|
||||
@@ -177,8 +178,9 @@ impl<'conn> Transaction<'conn> {
|
||||
}
|
||||
|
||||
fn rollback_(&mut self) -> Result<()> {
|
||||
self.conn.execute_batch("ROLLBACK")?;
|
||||
self.committed = true;
|
||||
self.conn.execute_batch("ROLLBACK")
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Consumes the transaction, committing or rolling back according to the current setting
|
||||
@@ -195,7 +197,7 @@ impl<'conn> Transaction<'conn> {
|
||||
return Ok(());
|
||||
}
|
||||
match self.drop_behavior() {
|
||||
DropBehavior::Commit => self.commit_(),
|
||||
DropBehavior::Commit => self.commit_().or_else(|_| self.rollback_()),
|
||||
DropBehavior::Rollback => self.rollback_(),
|
||||
DropBehavior::Ignore => Ok(()),
|
||||
DropBehavior::Panic => panic!("Transaction dropped unexpectedly."),
|
||||
@@ -277,9 +279,10 @@ impl<'conn> Savepoint<'conn> {
|
||||
}
|
||||
|
||||
fn commit_(&mut self) -> Result<()> {
|
||||
self.committed = true;
|
||||
self.conn
|
||||
.execute_batch(&format!("RELEASE {}", self.name))
|
||||
.execute_batch(&format!("RELEASE {}", self.name))?;
|
||||
self.committed = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A convenience method which rolls back a savepoint.
|
||||
@@ -307,7 +310,7 @@ impl<'conn> Savepoint<'conn> {
|
||||
return Ok(());
|
||||
}
|
||||
match self.drop_behavior() {
|
||||
DropBehavior::Commit => self.commit_(),
|
||||
DropBehavior::Commit => self.commit_().or_else(|_| self.rollback()),
|
||||
DropBehavior::Rollback => self.rollback(),
|
||||
DropBehavior::Ignore => Ok(()),
|
||||
DropBehavior::Panic => panic!("Savepoint dropped unexpectedly."),
|
||||
|
@@ -1,3 +1,4 @@
|
||||
use std::borrow::Cow;
|
||||
use super::{Null, Value, ValueRef};
|
||||
#[cfg(feature = "array")]
|
||||
use vtab::array::Array;
|
||||
@@ -156,6 +157,12 @@ impl<T: ToSql> ToSql for Option<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ToSql for Cow<'a, str> {
|
||||
fn to_sql(&self) -> Result<ToSqlOutput> {
|
||||
Ok(ToSqlOutput::from(self.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::ToSql;
|
||||
@@ -172,4 +179,16 @@ mod test {
|
||||
is_to_sql::<u16>();
|
||||
is_to_sql::<u32>();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cow_str() {
|
||||
use std::borrow::Cow;
|
||||
let s = "str";
|
||||
let cow = Cow::Borrowed(s);
|
||||
let r = cow.to_sql();
|
||||
assert!(r.is_ok());
|
||||
let cow = Cow::Owned::<str>(String::from(s));
|
||||
let r = cow.to_sql();
|
||||
assert!(r.is_ok());
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user