From b4565f565a85f863c720924d5f4f3d58f024db36 Mon Sep 17 00:00:00 2001 From: Thom Chiovoloni Date: Wed, 26 Sep 2018 13:36:01 -0700 Subject: [PATCH] Add a method of interrupting a query executing on a separate thread, fixes #407 --- src/lib.rs | 149 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 143 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d23d86b..c8b3009 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ use std::ptr; use std::result; use std::str; use std::sync::atomic::{AtomicBool, Ordering, ATOMIC_BOOL_INIT}; -use std::sync::{Once, ONCE_INIT}; +use std::sync::{Once, ONCE_INIT, Arc, Mutex}; use cache::StatementCache; use error::{error_from_handle, error_from_sqlite_code}; @@ -598,6 +598,12 @@ impl Connection { self.db.borrow().db() } + /// Get access to a handle that can be used to interrupt long running queries + /// from another thread. + pub fn get_interrupt_handle(&self) -> InterruptHandle { + self.db.borrow().get_interrupt_handle() + } + fn decode_result(&self, code: c_int) -> Result<()> { self.db.borrow_mut().decode_result(code) } @@ -629,6 +635,13 @@ impl fmt::Debug for Connection { struct InnerConnection { db: *mut ffi::sqlite3, + // It's unsafe to call `sqlite3_close` while another thread is performing + // a `sqlite3_interrupt`, and vice versa, so we take this mutex during + // those functions. This protects a copy of the `db` pointer (which is + // cleared on closing), however the main copy, `db`, is unprotected. + // Otherwise, a long running query would prevent calling interrupt, as + // interrupt would only acquire the lock after the query's completion. + interrupt_lock: Arc>, #[cfg(feature = "hooks")] free_commit_hook: Option, #[cfg(feature = "hooks")] @@ -810,13 +823,17 @@ To fix this, either: impl InnerConnection { #[cfg(not(feature = "hooks"))] fn new(db: *mut ffi::sqlite3) -> InnerConnection { - InnerConnection { db } + InnerConnection { + db, + interrupt_lock: Arc::new(Mutex::new(db)), + } } #[cfg(feature = "hooks")] fn new(db: *mut ffi::sqlite3) -> InnerConnection { InnerConnection { - db, + db: db, + interrupt_lock: Arc::new(Mutex::new(db)), free_commit_hook: None, free_rollback_hook: None, free_update_hook: None, @@ -875,10 +892,14 @@ impl InnerConnection { } fn decode_result(&mut self, code: c_int) -> Result<()> { + InnerConnection::decode_result_raw(self.db(), code) + } + + fn decode_result_raw(db: *mut ffi::sqlite3, code: c_int) -> Result<()> { if code == ffi::SQLITE_OK { Ok(()) } else { - Err(error_from_handle(self.db(), code)) + Err(error_from_handle(db, code)) } } @@ -887,16 +908,28 @@ impl InnerConnection { return Ok(()); } self.remove_hooks(); + let mut shared_handle = self.interrupt_lock.lock().unwrap(); + assert!(!shared_handle.is_null(), + "Bug: Somehow interrupt_lock was cleared before the DB was closed"); unsafe { - let r = ffi::sqlite3_close(self.db()); - let r = self.decode_result(r); + let r = ffi::sqlite3_close(self.db); + // Need to use _raw because _guard has a reference out, and + // decode_result takes &mut self. + let r = InnerConnection::decode_result_raw(self.db, r); if r.is_ok() { + *shared_handle = ptr::null_mut(); self.db = ptr::null_mut(); } r } } + fn get_interrupt_handle(&self) -> InterruptHandle { + InterruptHandle { + db_lock: Arc::clone(&self.interrupt_lock), + } + } + fn execute_batch(&mut self, sql: &str) -> Result<()> { let c_sql = try!(str_to_cstring(sql)); unsafe { @@ -1015,6 +1048,25 @@ impl InnerConnection { fn remove_hooks(&mut self) {} } +/// Allows interrupting a long-running computation. +pub struct InterruptHandle { + db_lock: Arc>, +} + +unsafe impl Send for InterruptHandle {} +unsafe impl Sync for InterruptHandle {} + +impl InterruptHandle { + /// Interrupt the query currently executing on another thread. This will + /// cause that query to fail with a `SQLITE3_INTERRUPT` error. + pub fn interrupt(&self) { + let db_handle = self.db_lock.lock().unwrap(); + if !db_handle.is_null() { + unsafe { ffi::sqlite3_interrupt(*db_handle) } + } + } +} + impl Drop for InnerConnection { #[allow(unused_must_use)] fn drop(&mut self) { @@ -1057,6 +1109,12 @@ mod test { #[allow(dead_code, unconditional_recursion)] fn ensure_send() { ensure_send::(); + ensure_send::(); + } + + #[allow(dead_code, unconditional_recursion)] + fn ensure_sync() { + ensure_sync::(); } pub fn checked_memory_handle() -> Connection { @@ -1472,6 +1530,85 @@ mod test { assert!(version().contains(&format!("{}.{}.{}", major, minor, patch))); } + #[test] + fn test_interrupt() { + use std::thread; + use std::time::Duration; + let tries = 15; + // Sadly, this is inherently finnicky. Even if sqlite gets the + // interrupt, it isn't guaranteed to stop. In practice, 15 tends to be + // more than enough so long as we move everything we can outside of + // the thread body, but it still is unfortunate. + for i in 0..tries { + let db = checked_memory_handle(); + db.execute_batch("CREATE TABLE dummy(id)").unwrap(); + let interrupt_handle = db.get_interrupt_handle(); + // generate an arbitrary query which will be very slow to execute. + let sql = format!("{};", + (0..100_000).into_iter() + .map(|i| format!("INSERT INTO dummy(id) VALUES({})", i)) + .collect::>() + .join(";\n")); + + // Do this on the main thread to minimize the amount of time spent + // when interrupt won't do anything (because we haven't started + // executing the query). + let c_sql = str_to_cstring(&sql).unwrap(); + + let joiner = thread::spawn(move || { + unsafe { + let raw_db = db.db.borrow().db; + let r = ffi::sqlite3_exec( + raw_db, + c_sql.as_ptr(), + None, + ptr::null_mut(), + ptr::null_mut(), + ); + db.decode_result(r) + } + }); + + // Try a few times to make sure we don't catch it too early. + interrupt_handle.interrupt(); + for &delay in &[10, 100, 1000] { + thread::sleep(Duration::from_millis(delay)); + interrupt_handle.interrupt(); + } + let result = joiner.join().unwrap(); + + if i != tries - 1 && !result.is_err() { + continue; + } + + match result.unwrap_err() { + Error::SqliteFailure(err, _) => { + assert_eq!(err.code, ErrorCode::OperationInterrupted); + return; + } + err => { + panic!("Unexpected error {}", err); + } + } + } + } + + #[test] + fn test_interrupt_close() { + let db = checked_memory_handle(); + let handle = db.get_interrupt_handle(); + handle.interrupt(); + db.close().unwrap(); + handle.interrupt(); + + // Look at it's internals to see if we cleared it out properly. + let db_guard = handle.db_lock.lock().unwrap(); + assert!(db_guard.is_null()); + // It would be nice to test that we properly handle close/interrupt + // running at the same time, but it seems impossible to do with any + // degree of reliability. + } + mod query_and_then_tests { extern crate libsqlite3_sys as ffi; use super::*;