Merge pull request #824 from gwenn/progress_handler

Expose query progress information
This commit is contained in:
gwenn 2020-11-03 18:14:36 +01:00 committed by GitHub
commit b9ccb252ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 269 additions and 109 deletions

View File

@ -136,7 +136,10 @@
//!
//! // Insert another BLOB, this time using a parameter passed in from
//! // rust (potentially with a dynamic size).
//! db.execute("INSERT INTO test_table (content) VALUES (?)", &[ZeroBlob(64)])?;
//! db.execute(
//! "INSERT INTO test_table (content) VALUES (?)",
//! &[ZeroBlob(64)],
//! )?;
//!
//! // given a new row ID, we can reopen the blob on that row
//! let rowid = db.last_insert_rowid();
@ -177,7 +180,10 @@
//!
//! // Insert another blob, this time using a parameter passed in from
//! // rust (potentially with a dynamic size).
//! db.execute("INSERT INTO test_table (content) VALUES (?)", &[ZeroBlob(64)])?;
//! db.execute(
//! "INSERT INTO test_table (content) VALUES (?)",
//! &[ZeroBlob(64)],
//! )?;
//!
//! // given a new row ID, we can reopen the blob on that row
//! let rowid = db.last_insert_rowid();
@ -196,8 +202,8 @@ use crate::{Connection, DatabaseName, Result};
mod pos_io;
/// `feature = "blob"` Handle to an open BLOB. See [`rusqlite::blob`](crate::blob) documentation for
/// in-depth discussion.
/// `feature = "blob"` Handle to an open BLOB. See
/// [`rusqlite::blob`](crate::blob) documentation for in-depth discussion.
pub struct Blob<'conn> {
conn: &'conn Connection,
blob: *mut ffi::sqlite3_blob,

View File

@ -137,9 +137,7 @@ mod test {
#[test]
#[ignore] // FIXME: unstable
fn test_busy_handler() {
lazy_static::lazy_static! {
static ref CALLED: AtomicBool = AtomicBool::new(false);
}
static CALLED: AtomicBool = AtomicBool::new(false);
fn busy_handler(_: i32) -> bool {
CALLED.store(true, Ordering::Relaxed);
thread::sleep(Duration::from_millis(100));

View File

@ -15,9 +15,9 @@ unsafe extern "C" fn free_boxed_value<T>(p: *mut c_void) {
impl Connection {
/// `feature = "collation"` Add or modify a collation.
pub fn create_collation<C>(&self, collation_name: &str, x_compare: C) -> Result<()>
pub fn create_collation<'c, C>(&'c self, collation_name: &str, x_compare: C) -> Result<()>
where
C: Fn(&str, &str) -> Ordering + Send + UnwindSafe + 'static,
C: Fn(&str, &str) -> Ordering + Send + UnwindSafe + 'c,
{
self.db
.borrow_mut()
@ -39,9 +39,9 @@ impl Connection {
}
impl InnerConnection {
fn create_collation<C>(&mut self, collation_name: &str, x_compare: C) -> Result<()>
fn create_collation<'c, C>(&'c mut self, collation_name: &str, x_compare: C) -> Result<()>
where
C: Fn(&str, &str) -> Ordering + Send + UnwindSafe + 'static,
C: Fn(&str, &str) -> Ordering + Send + UnwindSafe + 'c,
{
unsafe extern "C" fn call_boxed_closure<C>(
arg1: *mut c_void,

View File

@ -111,8 +111,9 @@ pub enum Error {
InvalidParameterCount(usize, usize),
/// Returned from various functions in the Blob IO positional API. For
/// example, [`Blob::raw_read_at_exact`](crate::blob::Blob::raw_read_at_exact)
/// will return it if the blob has insufficient data.
/// example,
/// [`Blob::raw_read_at_exact`](crate::blob::Blob::raw_read_at_exact) will
/// return it if the blob has insufficient data.
#[cfg(feature = "blob")]
BlobSizeError,
}

View File

@ -22,8 +22,7 @@
//! FunctionFlags::SQLITE_UTF8 | FunctionFlags::SQLITE_DETERMINISTIC,
//! move |ctx| {
//! assert_eq!(ctx.len(), 2, "called with unexpected number of arguments");
//! let regexp: Arc<Regex> = ctx
//! .get_or_create_aux(0, |vr| -> Result<_, BoxError> {
//! let regexp: Arc<Regex> = ctx.get_or_create_aux(0, |vr| -> Result<_, BoxError> {
//! Ok(Regex::new(vr.as_str()?)?)
//! })?;
//! let is_match = {
@ -334,15 +333,15 @@ impl Connection {
/// # Failure
///
/// Will return Err if the function could not be attached to the connection.
pub fn create_scalar_function<'a, F, T>(
&'a self,
pub fn create_scalar_function<'c, F, T>(
&'c self,
fn_name: &str,
n_arg: c_int,
flags: FunctionFlags,
x_func: F,
) -> Result<()>
where
F: FnMut(&Context<'_>) -> Result<T> + Send + UnwindSafe + 'a,
F: FnMut(&Context<'_>) -> Result<T> + Send + UnwindSafe + 'c,
T: ToSql,
{
self.db
@ -411,15 +410,15 @@ impl Connection {
}
impl InnerConnection {
fn create_scalar_function<'a, F, T>(
&'a mut self,
fn create_scalar_function<'c, F, T>(
&'c mut self,
fn_name: &str,
n_arg: c_int,
flags: FunctionFlags,
x_func: F,
) -> Result<()>
where
F: FnMut(&Context<'_>) -> Result<T> + Send + UnwindSafe + 'a,
F: FnMut(&Context<'_>) -> Result<T> + Send + UnwindSafe + 'c,
T: ToSql,
{
unsafe extern "C" fn call_boxed_closure<F, T>(

View File

@ -2,7 +2,7 @@
#![allow(non_camel_case_types)]
use std::os::raw::{c_char, c_int, c_void};
use std::panic::catch_unwind;
use std::panic::{catch_unwind, RefUnwindSafe};
use std::ptr;
use crate::ffi;
@ -40,9 +40,9 @@ impl Connection {
/// a transaction is committed.
///
/// The callback returns `true` to rollback.
pub fn commit_hook<F>(&self, hook: Option<F>)
pub fn commit_hook<'c, F>(&'c self, hook: Option<F>)
where
F: FnMut() -> bool + Send + 'static,
F: FnMut() -> bool + Send + 'c,
{
self.db.borrow_mut().commit_hook(hook);
}
@ -51,9 +51,9 @@ impl Connection {
/// a transaction is committed.
///
/// The callback returns `true` to rollback.
pub fn rollback_hook<F>(&self, hook: Option<F>)
pub fn rollback_hook<'c, F>(&'c self, hook: Option<F>)
where
F: FnMut() + Send + 'static,
F: FnMut() + Send + 'c,
{
self.db.borrow_mut().rollback_hook(hook);
}
@ -68,12 +68,27 @@ impl Connection {
/// - the name of the database ("main", "temp", ...),
/// - the name of the table that is updated,
/// - the ROWID of the row that is updated.
pub fn update_hook<F>(&self, hook: Option<F>)
pub fn update_hook<'c, F>(&'c self, hook: Option<F>)
where
F: FnMut(Action, &str, &str, i64) + Send + 'static,
F: FnMut(Action, &str, &str, i64) + Send + 'c,
{
self.db.borrow_mut().update_hook(hook);
}
/// `feature = "hooks"` Register a query progress callback.
///
/// The parameter `num_ops` is the approximate number of virtual machine
/// instructions that are evaluated between successive invocations of the
/// `handler`. If `num_ops` is less than one then the progress handler
/// is disabled.
///
/// If the progress callback returns `true`, the operation is interrupted.
pub fn progress_handler<F>(&self, num_ops: c_int, handler: Option<F>)
where
F: FnMut() -> bool + Send + RefUnwindSafe + 'static,
{
self.db.borrow_mut().progress_handler(num_ops, handler);
}
}
impl InnerConnection {
@ -81,11 +96,12 @@ impl InnerConnection {
self.update_hook(None::<fn(Action, &str, &str, i64)>);
self.commit_hook(None::<fn() -> bool>);
self.rollback_hook(None::<fn()>);
self.progress_handler(0, None::<fn() -> bool>);
}
fn commit_hook<F>(&mut self, hook: Option<F>)
fn commit_hook<'c, F>(&'c mut self, hook: Option<F>)
where
F: FnMut() -> bool + Send + 'static,
F: FnMut() -> bool + Send + 'c,
{
unsafe extern "C" fn call_boxed_closure<F>(p_arg: *mut c_void) -> c_int
where
@ -132,9 +148,9 @@ impl InnerConnection {
self.free_commit_hook = free_commit_hook;
}
fn rollback_hook<F>(&mut self, hook: Option<F>)
fn rollback_hook<'c, F>(&'c mut self, hook: Option<F>)
where
F: FnMut() + Send + 'static,
F: FnMut() + Send + 'c,
{
unsafe extern "C" fn call_boxed_closure<F>(p_arg: *mut c_void)
where
@ -173,9 +189,9 @@ impl InnerConnection {
self.free_rollback_hook = free_rollback_hook;
}
fn update_hook<F>(&mut self, hook: Option<F>)
fn update_hook<'c, F>(&'c mut self, hook: Option<F>)
where
F: FnMut(Action, &str, &str, i64) + Send + 'static,
F: FnMut(Action, &str, &str, i64) + Send + 'c,
{
unsafe extern "C" fn call_boxed_closure<F>(
p_arg: *mut c_void,
@ -236,6 +252,45 @@ impl InnerConnection {
}
self.free_update_hook = free_update_hook;
}
fn progress_handler<F>(&mut self, num_ops: c_int, handler: Option<F>)
where
F: FnMut() -> bool + Send + RefUnwindSafe + 'static,
{
unsafe extern "C" fn call_boxed_closure<F>(p_arg: *mut c_void) -> c_int
where
F: FnMut() -> bool,
{
let r = catch_unwind(|| {
let boxed_handler: *mut F = p_arg as *mut F;
(*boxed_handler)()
});
if let Ok(true) = r {
1
} else {
0
}
}
match handler {
Some(handler) => {
let boxed_handler = Box::new(handler);
unsafe {
ffi::sqlite3_progress_handler(
self.db(),
num_ops,
Some(call_boxed_closure::<F>),
&*boxed_handler as *const F as *mut _,
)
}
self.progress_handler = Some(boxed_handler);
}
_ => {
unsafe { ffi::sqlite3_progress_handler(self.db(), num_ops, None, ptr::null_mut()) }
self.progress_handler = None;
}
};
}
}
unsafe fn free_boxed_hook<F>(p: *mut c_void) {
@ -246,23 +301,20 @@ unsafe fn free_boxed_hook<F>(p: *mut c_void) {
mod test {
use super::Action;
use crate::Connection;
use lazy_static::lazy_static;
use std::sync::atomic::{AtomicBool, Ordering};
#[test]
fn test_commit_hook() {
let db = Connection::open_in_memory().unwrap();
lazy_static! {
static ref CALLED: AtomicBool = AtomicBool::new(false);
}
let mut called = false;
db.commit_hook(Some(|| {
CALLED.store(true, Ordering::Relaxed);
called = true;
false
}));
db.execute_batch("BEGIN; CREATE TABLE foo (t TEXT); COMMIT;")
.unwrap();
assert!(CALLED.load(Ordering::Relaxed));
assert!(called);
}
#[test]
@ -282,33 +334,59 @@ mod test {
fn test_rollback_hook() {
let db = Connection::open_in_memory().unwrap();
lazy_static! {
static ref CALLED: AtomicBool = AtomicBool::new(false);
}
let mut called = false;
db.rollback_hook(Some(|| {
CALLED.store(true, Ordering::Relaxed);
called = true;
}));
db.execute_batch("BEGIN; CREATE TABLE foo (t TEXT); ROLLBACK;")
.unwrap();
assert!(CALLED.load(Ordering::Relaxed));
assert!(called);
}
#[test]
fn test_update_hook() {
let db = Connection::open_in_memory().unwrap();
lazy_static! {
static ref CALLED: AtomicBool = AtomicBool::new(false);
}
let mut called = false;
db.update_hook(Some(|action, db: &str, tbl: &str, row_id| {
assert_eq!(Action::SQLITE_INSERT, action);
assert_eq!("main", db);
assert_eq!("foo", tbl);
assert_eq!(1, row_id);
CALLED.store(true, Ordering::Relaxed);
called = true;
}));
db.execute_batch("CREATE TABLE foo (t TEXT)").unwrap();
db.execute_batch("INSERT INTO foo VALUES ('lisa')").unwrap();
assert!(called);
}
#[test]
fn test_progress_handler() {
let db = Connection::open_in_memory().unwrap();
static CALLED: AtomicBool = AtomicBool::new(false);
db.progress_handler(
1,
Some(|| {
CALLED.store(true, Ordering::Relaxed);
false
}),
);
db.execute_batch("BEGIN; CREATE TABLE foo (t TEXT); COMMIT;")
.unwrap();
assert!(CALLED.load(Ordering::Relaxed));
}
#[test]
fn test_progress_handler_interrupt() {
let db = Connection::open_in_memory().unwrap();
fn handler() -> bool {
true
}
db.progress_handler(1, Some(handler));
db.execute_batch("BEGIN; CREATE TABLE foo (t TEXT); COMMIT;")
.unwrap_err();
}
}

View File

@ -31,6 +31,8 @@ pub struct InnerConnection {
pub free_rollback_hook: Option<unsafe fn(*mut ::std::os::raw::c_void)>,
#[cfg(feature = "hooks")]
pub free_update_hook: Option<unsafe fn(*mut ::std::os::raw::c_void)>,
#[cfg(feature = "hooks")]
pub progress_handler: Option<Box<dyn FnMut() -> bool + Send>>,
owned: bool,
}
@ -46,6 +48,8 @@ impl InnerConnection {
free_rollback_hook: None,
#[cfg(feature = "hooks")]
free_update_hook: None,
#[cfg(feature = "hooks")]
progress_handler: None,
owned,
}
}

View File

@ -810,6 +810,67 @@ impl fmt::Debug for Connection {
}
}
/// Batch iterator
/// ```rust
/// use rusqlite::{Batch, Connection, Result, NO_PARAMS};
///
/// fn main() -> Result<()> {
/// let conn = Connection::open_in_memory()?;
/// let sql = r"
/// CREATE TABLE tbl1 (col);
/// CREATE TABLE tbl2 (col);
/// ";
/// let mut batch = Batch::new(&conn, sql);
/// while let Some(mut stmt) = batch.next()? {
/// stmt.execute(NO_PARAMS)?;
/// }
/// Ok(())
/// }
/// ```
#[derive(Debug)]
pub struct Batch<'conn, 'sql> {
conn: &'conn Connection,
sql: &'sql str,
tail: usize,
}
impl<'conn, 'sql> Batch<'conn, 'sql> {
/// Constructor
pub fn new(conn: &'conn Connection, sql: &'sql str) -> Batch<'conn, 'sql> {
Batch { conn, sql, tail: 0 }
}
/// Iterates on each batch statements.
///
/// Returns `Ok(None)` when batch is completed.
#[allow(clippy::should_implement_trait)] // fallible iterator
pub fn next(&mut self) -> Result<Option<Statement<'conn>>> {
while self.tail < self.sql.len() {
let sql = &self.sql[self.tail..];
let next = self.conn.prepare(sql)?;
let tail = next.stmt.tail();
if tail == 0 {
self.tail = self.sql.len();
} else {
self.tail += tail;
}
if next.stmt.is_null() {
continue;
}
return Ok(Some(next));
}
Ok(None)
}
}
impl<'conn> Iterator for Batch<'conn, '_> {
type Item = Result<Statement<'conn>>;
fn next(&mut self) -> Option<Result<Statement<'conn>>> {
self.next().transpose()
}
}
bitflags::bitflags! {
/// Flags for opening SQLite database connections.
/// See [sqlite3_open_v2](http://www.sqlite.org/c3ref/open.html) for details.
@ -1749,6 +1810,7 @@ mod test {
err => panic!("Unexpected error {}", err),
}
}
}
#[test]
fn test_dynamic() {
@ -1759,7 +1821,7 @@ mod test {
END;";
db.execute_batch(sql).unwrap();
db.query_row("SELECT * FROM foo", params![], |r| {
db.query_row("SELECT * FROM foo", NO_PARAMS, |r| {
assert_eq!(2, r.column_count());
Ok(())
})
@ -1771,7 +1833,7 @@ mod test {
db.execute_batch("CREATE TABLE foo(x INTEGER);").unwrap();
let b: Box<dyn ToSql> = Box::new(5);
db.execute("INSERT INTO foo VALUES(?)", &[b]).unwrap();
db.query_row("SELECT x FROM foo", params![], |r| {
db.query_row("SELECT x FROM foo", NO_PARAMS, |r| {
assert_eq!(5, r.get_unwrap::<_, i32>(0));
Ok(())
})
@ -1788,8 +1850,8 @@ mod test {
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?;",
params![
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1,
],
|r| {
assert_eq!(1, r.get_unwrap::<_, i32>(0));
@ -1805,7 +1867,20 @@ mod test {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE x(t);").unwrap();
// `execute_batch` should be used but `execute` should also work
db.execute("ALTER TABLE x RENAME TO y;", params![]).unwrap();
db.execute("ALTER TABLE x RENAME TO y;", NO_PARAMS).unwrap();
}
#[test]
fn test_batch() {
let db = checked_memory_handle();
let sql = r"
CREATE TABLE tbl1 (col);
CREATE TABLE tbl2 (col);
";
let batch = Batch::new(&db, sql);
for stmt in batch {
let mut stmt = stmt.unwrap();
stmt.execute(NO_PARAMS).unwrap();
}
}
}

View File

@ -819,9 +819,7 @@ mod test {
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
lazy_static::lazy_static! {
static ref CALLED: AtomicBool = AtomicBool::new(false);
}
static CALLED: AtomicBool = AtomicBool::new(false);
db.apply(
&changeset,
None::<fn(&str) -> bool>,

View File

@ -15,8 +15,8 @@
//! `FromSql` has different behaviour depending on the SQL and Rust types, and
//! the value.
//!
//! * `INTEGER` to integer: returns an `Error::IntegralValueOutOfRange` error
//! if the value does not fit in the Rust type.
//! * `INTEGER` to integer: returns an `Error::IntegralValueOutOfRange` error if
//! the value does not fit in the Rust type.
//! * `REAL` to integer: always returns an `Error::InvalidColumnType` error.
//! * `INTEGER` to float: casts using `as` operator. Never fails.
//! * `REAL` to float: casts using `as` operator. Never fails.
@ -32,7 +32,6 @@
//! can be parsed by SQLite's builtin
//! [datetime](https://www.sqlite.org/lang_datefunc.html) functions. If you
//! want different storage for datetimes, you can use a newtype.
//!
#![cfg_attr(
feature = "time",
doc = r##"
@ -387,7 +386,7 @@ mod test {
}
macro_rules! test_conversion {
($db_etc:ident, $insert_value:expr, $get_type:ty, expect $expected_value:expr) => {
($db_etc:ident, $insert_value:expr, $get_type:ty,expect $expected_value:expr) => {
$db_etc
.insert_statement
.execute(params![$insert_value])
@ -398,7 +397,7 @@ mod test {
assert_eq!(res.unwrap(), $expected_value);
$db_etc.delete_statement.execute(NO_PARAMS).unwrap();
};
($db_etc:ident, $insert_value:expr, $get_type:ty, expect_from_sql_error) => {
($db_etc:ident, $insert_value:expr, $get_type:ty,expect_from_sql_error) => {
$db_etc
.insert_statement
.execute(params![$insert_value])
@ -409,7 +408,7 @@ mod test {
res.unwrap_err();
$db_etc.delete_statement.execute(NO_PARAMS).unwrap();
};
($db_etc:ident, $insert_value:expr, $get_type:ty, expect_to_sql_error) => {
($db_etc:ident, $insert_value:expr, $get_type:ty,expect_to_sql_error) => {
$db_etc
.insert_statement
.execute(params![$insert_value])

View File

@ -100,6 +100,7 @@ impl std::fmt::Debug for SmallCString {
impl std::ops::Deref for SmallCString {
type Target = CStr;
#[inline]
fn deref(&self) -> &CStr {
self.as_cstr()

View File

@ -130,9 +130,10 @@ impl SqliteMallocString {
// This is safe:
// - `align` is never 0
// - `align` is always a power of 2.
// - `size` needs no realignment because it's guaranteed to be
// aligned (everything is aligned to 1)
// - `size` is also never zero, although this function doesn't actually require it now.
// - `size` needs no realignment because it's guaranteed to be aligned
// (everything is aligned to 1)
// - `size` is also never zero, although this function doesn't actually require
// it now.
let layout = Layout::from_size_align_unchecked(s.len().saturating_add(1), 1);
// Note: This call does not return.
handle_alloc_error(layout);