Merge pull request #408 from thomcc/interrupt_handle

Add a method of interrupting a query executing on a separate thread, fixes #407
This commit is contained in:
gwenn 2018-10-08 18:37:30 +02:00 committed by GitHub
commit 7176be2d6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -80,7 +80,7 @@ use std::ptr;
use std::result; use std::result;
use std::str; use std::str;
use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT};
use std::sync::{Once, ONCE_INIT}; use std::sync::{Once, ONCE_INIT, Arc, Mutex};
use cache::StatementCache; use cache::StatementCache;
use error::{error_from_handle, error_from_sqlite_code}; use error::{error_from_handle, error_from_sqlite_code};
@ -573,6 +573,12 @@ impl Connection {
self.db.borrow().db() self.db.borrow().db()
} }
/// Get access to a handle that can be used to interrupt long running queries
/// from another thread.
pub fn get_interrupt_handle(&self) -> InterruptHandle {
self.db.borrow().get_interrupt_handle()
}
fn decode_result(&self, code: c_int) -> Result<()> { fn decode_result(&self, code: c_int) -> Result<()> {
self.db.borrow_mut().decode_result(code) self.db.borrow_mut().decode_result(code)
} }
@ -604,6 +610,13 @@ impl fmt::Debug for Connection {
struct InnerConnection { struct InnerConnection {
db: *mut ffi::sqlite3, db: *mut ffi::sqlite3,
// It's unsafe to call `sqlite3_close` while another thread is performing
// a `sqlite3_interrupt`, and vice versa, so we take this mutex during
// those functions. This protects a copy of the `db` pointer (which is
// cleared on closing), however the main copy, `db`, is unprotected.
// Otherwise, a long running query would prevent calling interrupt, as
// interrupt would only acquire the lock after the query's completion.
interrupt_lock: Arc<Mutex<*mut ffi::sqlite3>>,
#[cfg(feature = "hooks")] #[cfg(feature = "hooks")]
free_commit_hook: Option<fn(*mut ::std::os::raw::c_void)>, free_commit_hook: Option<fn(*mut ::std::os::raw::c_void)>,
#[cfg(feature = "hooks")] #[cfg(feature = "hooks")]
@ -785,13 +798,17 @@ To fix this, either:
impl InnerConnection { impl InnerConnection {
#[cfg(not(feature = "hooks"))] #[cfg(not(feature = "hooks"))]
fn new(db: *mut ffi::sqlite3) -> InnerConnection { fn new(db: *mut ffi::sqlite3) -> InnerConnection {
InnerConnection { db } InnerConnection {
db,
interrupt_lock: Arc::new(Mutex::new(db)),
}
} }
#[cfg(feature = "hooks")] #[cfg(feature = "hooks")]
fn new(db: *mut ffi::sqlite3) -> InnerConnection { fn new(db: *mut ffi::sqlite3) -> InnerConnection {
InnerConnection { InnerConnection {
db, db: db,
interrupt_lock: Arc::new(Mutex::new(db)),
free_commit_hook: None, free_commit_hook: None,
free_rollback_hook: None, free_rollback_hook: None,
free_update_hook: None, free_update_hook: None,
@ -850,10 +867,14 @@ impl InnerConnection {
} }
fn decode_result(&mut self, code: c_int) -> Result<()> { fn decode_result(&mut self, code: c_int) -> Result<()> {
InnerConnection::decode_result_raw(self.db(), code)
}
fn decode_result_raw(db: *mut ffi::sqlite3, code: c_int) -> Result<()> {
if code == ffi::SQLITE_OK { if code == ffi::SQLITE_OK {
Ok(()) Ok(())
} else { } else {
Err(error_from_handle(self.db(), code)) Err(error_from_handle(db, code))
} }
} }
@ -862,16 +883,28 @@ impl InnerConnection {
return Ok(()); return Ok(());
} }
self.remove_hooks(); self.remove_hooks();
let mut shared_handle = self.interrupt_lock.lock().unwrap();
assert!(!shared_handle.is_null(),
"Bug: Somehow interrupt_lock was cleared before the DB was closed");
unsafe { unsafe {
let r = ffi::sqlite3_close(self.db()); let r = ffi::sqlite3_close(self.db);
let r = self.decode_result(r); // Need to use _raw because _guard has a reference out, and
// decode_result takes &mut self.
let r = InnerConnection::decode_result_raw(self.db, r);
if r.is_ok() { if r.is_ok() {
*shared_handle = ptr::null_mut();
self.db = ptr::null_mut(); self.db = ptr::null_mut();
} }
r r
} }
} }
fn get_interrupt_handle(&self) -> InterruptHandle {
InterruptHandle {
db_lock: Arc::clone(&self.interrupt_lock),
}
}
fn execute_batch(&mut self, sql: &str) -> Result<()> { fn execute_batch(&mut self, sql: &str) -> Result<()> {
let c_sql = try!(str_to_cstring(sql)); let c_sql = try!(str_to_cstring(sql));
unsafe { unsafe {
@ -990,6 +1023,25 @@ impl InnerConnection {
fn remove_hooks(&mut self) {} fn remove_hooks(&mut self) {}
} }
/// Allows interrupting a long-running computation.
pub struct InterruptHandle {
db_lock: Arc<Mutex<*mut ffi::sqlite3>>,
}
unsafe impl Send for InterruptHandle {}
unsafe impl Sync for InterruptHandle {}
impl InterruptHandle {
/// Interrupt the query currently executing on another thread. This will
/// cause that query to fail with a `SQLITE3_INTERRUPT` error.
pub fn interrupt(&self) {
let db_handle = self.db_lock.lock().unwrap();
if !db_handle.is_null() {
unsafe { ffi::sqlite3_interrupt(*db_handle) }
}
}
}
impl Drop for InnerConnection { impl Drop for InnerConnection {
#[allow(unused_must_use)] #[allow(unused_must_use)]
fn drop(&mut self) { fn drop(&mut self) {
@ -1032,6 +1084,12 @@ mod test {
#[allow(dead_code, unconditional_recursion)] #[allow(dead_code, unconditional_recursion)]
fn ensure_send<T: Send>() { fn ensure_send<T: Send>() {
ensure_send::<Connection>(); ensure_send::<Connection>();
ensure_send::<InterruptHandle>();
}
#[allow(dead_code, unconditional_recursion)]
fn ensure_sync<T: Sync>() {
ensure_sync::<InterruptHandle>();
} }
pub fn checked_memory_handle() -> Connection { pub fn checked_memory_handle() -> Connection {
@ -1450,6 +1508,85 @@ mod test {
assert!(version().contains(&format!("{}.{}.{}", major, minor, patch))); assert!(version().contains(&format!("{}.{}.{}", major, minor, patch)));
} }
#[test]
fn test_interrupt() {
use std::thread;
use std::time::Duration;
let tries = 15;
// Sadly, this is inherently finnicky. Even if sqlite gets the
// interrupt, it isn't guaranteed to stop. In practice, 15 tends to be
// more than enough so long as we move everything we can outside of
// the thread body, but it still is unfortunate.
for i in 0..tries {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE dummy(id)").unwrap();
let interrupt_handle = db.get_interrupt_handle();
// generate an arbitrary query which will be very slow to execute.
let sql = format!("{};",
(0..100_000).into_iter()
.map(|i| format!("INSERT INTO dummy(id) VALUES({})", i))
.collect::<Vec<_>>()
.join(";\n"));
// Do this on the main thread to minimize the amount of time spent
// when interrupt won't do anything (because we haven't started
// executing the query).
let c_sql = str_to_cstring(&sql).unwrap();
let joiner = thread::spawn(move || {
unsafe {
let raw_db = db.db.borrow().db;
let r = ffi::sqlite3_exec(
raw_db,
c_sql.as_ptr(),
None,
ptr::null_mut(),
ptr::null_mut(),
);
db.decode_result(r)
}
});
// Try a few times to make sure we don't catch it too early.
interrupt_handle.interrupt();
for &delay in &[10, 100, 1000] {
thread::sleep(Duration::from_millis(delay));
interrupt_handle.interrupt();
}
let result = joiner.join().unwrap();
if i != tries - 1 && !result.is_err() {
continue;
}
match result.unwrap_err() {
Error::SqliteFailure(err, _) => {
assert_eq!(err.code, ErrorCode::OperationInterrupted);
return;
}
err => {
panic!("Unexpected error {}", err);
}
}
}
}
#[test]
fn test_interrupt_close() {
let db = checked_memory_handle();
let handle = db.get_interrupt_handle();
handle.interrupt();
db.close().unwrap();
handle.interrupt();
// Look at it's internals to see if we cleared it out properly.
let db_guard = handle.db_lock.lock().unwrap();
assert!(db_guard.is_null());
// It would be nice to test that we properly handle close/interrupt
// running at the same time, but it seems impossible to do with any
// degree of reliability.
}
mod query_and_then_tests { mod query_and_then_tests {
extern crate libsqlite3_sys as ffi; extern crate libsqlite3_sys as ffi;
use super::*; use super::*;