From dbc4eef6575ac5bbd26d599a9025f0b9dc5d5c84 Mon Sep 17 00:00:00 2001 From: gwenn <45554+gwenn@users.noreply.github.com> Date: Sun, 13 Jan 2019 12:46:19 +0100 Subject: [PATCH] Session extension (#459) Session extension bindings --- Cargo.toml | 3 + README.md | 1 + libsqlite3-sys/Cargo.toml | 4 + libsqlite3-sys/build.rs | 22 +- src/context.rs | 49 --- src/error.rs | 9 + src/hooks.rs | 66 +-- src/lib.rs | 5 +- src/session.rs | 888 ++++++++++++++++++++++++++++++++++++++ src/types/from_sql.rs | 12 + src/types/value_ref.rs | 53 +++ 11 files changed, 996 insertions(+), 116 deletions(-) create mode 100644 src/session.rs diff --git a/Cargo.toml b/Cargo.toml index 0a1b6d3..504d565 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,8 @@ vtab_v3 = ["vtab"] csvtab = ["csv", "vtab"] # pointer passing interfaces: 3.20.0 array = ["vtab"] +# session extension: 3.13.0 +session = ["libsqlite3-sys/session", "hooks", "fallible-streaming-iterator"] [dependencies] time = "0.1.0" @@ -53,6 +55,7 @@ serde_json = { version = "1.0", optional = true } csv = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true } byteorder = { version = "1.2", features = ["i128"], optional = true } +fallible-streaming-iterator = { version = "0.1", optional = true } [dev-dependencies] tempdir = "0.3" diff --git a/README.md b/README.md index 7e4472d..f2d983e 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ features](https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-s * [`csvtab`](https://sqlite.org/csv.html), CSV virtual table written in Rust. * [`array`](https://sqlite.org/carray.html), The `rarray()` Table-Valued Function. * `i128_blob` allows storing values of type `i128` type in SQLite databases. Internally, the data is stored as a 16 byte big-endian blob, with the most significant bit flipped, which allows ordering and comparison between different blobs storing i128s to work as expected. +* [`session`](https://sqlite.org/sessionintro.html), Session module extension. ## Notes on building rusqlite and libsqlite3-sys diff --git a/libsqlite3-sys/Cargo.toml b/libsqlite3-sys/Cargo.toml index bd18df4..19eeb78 100644 --- a/libsqlite3-sys/Cargo.toml +++ b/libsqlite3-sys/Cargo.toml @@ -22,6 +22,10 @@ min_sqlite_version_3_7_7 = ["pkg-config", "vcpkg"] min_sqlite_version_3_7_16 = ["pkg-config", "vcpkg"] # sqlite3_unlock_notify >= 3.6.12 unlock_notify = [] +# 3.13.0 +preupdate_hook = [] +# 3.13.0 +session = ["preupdate_hook"] [build-dependencies] bindgen = { version = "0.46", optional = true } diff --git a/libsqlite3-sys/build.rs b/libsqlite3-sys/build.rs index 8006788..87998ef 100644 --- a/libsqlite3-sys/build.rs +++ b/libsqlite3-sys/build.rs @@ -54,6 +54,12 @@ mod build { if cfg!(feature = "unlock_notify") { cfg.flag("-DSQLITE_ENABLE_UNLOCK_NOTIFY"); } + if cfg!(feature = "preupdate_hook") { + cfg.flag("-DSQLITE_ENABLE_PREUPDATE_HOOK"); + } + if cfg!(feature = "session") { + cfg.flag("-DSQLITE_ENABLE_SESSION"); + } cfg.compile("libsqlite3.a"); println!("cargo:lib_dir={}", out_dir); @@ -237,10 +243,22 @@ mod bindings { pub fn write_to_out_dir(header: HeaderLocation, out_path: &Path) { let header: String = header.into(); let mut output = Vec::new(); - bindgen::builder() + let mut bindings = bindgen::builder() .header(header.clone()) .parse_callbacks(Box::new(SqliteTypeChooser)) - .rustfmt_bindings(true) + .rustfmt_bindings(true); + + if cfg!(feature = "unlock_notify") { + bindings = bindings.clang_arg("-DSQLITE_ENABLE_UNLOCK_NOTIFY"); + } + if cfg!(feature = "preupdate_hook") { + bindings = bindings.clang_arg("-DSQLITE_ENABLE_PREUPDATE_HOOK"); + } + if cfg!(feature = "session") { + bindings = bindings.clang_arg("-DSQLITE_ENABLE_SESSION"); + } + + bindings .generate() .expect(&format!("could not run bindgen on header {}", header)) .write(Box::new(&mut output)) diff --git a/src/context.rs b/src/context.rs index 9834a7d..1b5bba4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -14,55 +14,6 @@ use crate::types::{ToSqlOutput, ValueRef}; #[cfg(feature = "array")] use crate::vtab::array::{free_array, ARRAY_TYPE}; -impl<'a> ValueRef<'a> { - pub(crate) unsafe fn from_value(value: *mut sqlite3_value) -> ValueRef<'a> { - use std::slice::from_raw_parts; - - match ffi::sqlite3_value_type(value) { - ffi::SQLITE_NULL => ValueRef::Null, - ffi::SQLITE_INTEGER => ValueRef::Integer(ffi::sqlite3_value_int64(value)), - ffi::SQLITE_FLOAT => ValueRef::Real(ffi::sqlite3_value_double(value)), - ffi::SQLITE_TEXT => { - let text = ffi::sqlite3_value_text(value); - assert!( - !text.is_null(), - "unexpected SQLITE_TEXT value type with NULL data" - ); - let s = CStr::from_ptr(text as *const c_char); - - // sqlite3_value_text returns UTF8 data, so our unwrap here should be fine. - let s = s - .to_str() - .expect("sqlite3_value_text returned invalid UTF-8"); - ValueRef::Text(s) - } - ffi::SQLITE_BLOB => { - let (blob, len) = ( - ffi::sqlite3_value_blob(value), - ffi::sqlite3_value_bytes(value), - ); - - assert!( - len >= 0, - "unexpected negative return from sqlite3_value_bytes" - ); - if len > 0 { - assert!( - !blob.is_null(), - "unexpected SQLITE_BLOB value type with NULL data" - ); - ValueRef::Blob(from_raw_parts(blob as *const u8, len as usize)) - } else { - // The return value from sqlite3_value_blob() for a zero-length BLOB - // is a NULL pointer. - ValueRef::Blob(&[]) - } - } - _ => unreachable!("sqlite3_value_type returned invalid value"), - } - } -} - pub(crate) unsafe fn set_result<'a>(ctx: *mut sqlite3_context, result: &ToSqlOutput<'a>) { let value = match *result { ToSqlOutput::Borrowed(v) => v, diff --git a/src/error.rs b/src/error.rs index 432de18..2da7426 100644 --- a/src/error.rs +++ b/src/error.rs @@ -290,3 +290,12 @@ pub fn error_from_handle(db: *mut ffi::sqlite3, code: c_int) -> Error { }; error_from_sqlite_code(code, message) } + +macro_rules! check { + ($funcall:expr) => {{ + let rc = $funcall; + if rc != crate::ffi::SQLITE_OK { + Err(crate::error::error_from_sqlite_code(rc, None))?; + } + }}; +} diff --git a/src/hooks.rs b/src/hooks.rs index 7c26877..9397ca0 100644 --- a/src/hooks.rs +++ b/src/hooks.rs @@ -9,83 +9,21 @@ use crate::ffi; use crate::{Connection, InnerConnection}; -/// Authorizer Action Codes -#[derive(Debug, PartialEq)] +/// Action Codes +#[derive(Clone, Copy, Debug, PartialEq)] pub enum Action { UNKNOWN = -1, - SQLITE_CREATE_INDEX = ffi::SQLITE_CREATE_INDEX as isize, - SQLITE_CREATE_TABLE = ffi::SQLITE_CREATE_TABLE as isize, - SQLITE_CREATE_TEMP_INDEX = ffi::SQLITE_CREATE_TEMP_INDEX as isize, - SQLITE_CREATE_TEMP_TABLE = ffi::SQLITE_CREATE_TEMP_TABLE as isize, - SQLITE_CREATE_TEMP_TRIGGER = ffi::SQLITE_CREATE_TEMP_TRIGGER as isize, - SQLITE_CREATE_TEMP_VIEW = ffi::SQLITE_CREATE_TEMP_VIEW as isize, - SQLITE_CREATE_TRIGGER = ffi::SQLITE_CREATE_TRIGGER as isize, - SQLITE_CREATE_VIEW = ffi::SQLITE_CREATE_VIEW as isize, SQLITE_DELETE = ffi::SQLITE_DELETE as isize, - SQLITE_DROP_INDEX = ffi::SQLITE_DROP_INDEX as isize, - SQLITE_DROP_TABLE = ffi::SQLITE_DROP_TABLE as isize, - SQLITE_DROP_TEMP_INDEX = ffi::SQLITE_DROP_TEMP_INDEX as isize, - SQLITE_DROP_TEMP_TABLE = ffi::SQLITE_DROP_TEMP_TABLE as isize, - SQLITE_DROP_TEMP_TRIGGER = ffi::SQLITE_DROP_TEMP_TRIGGER as isize, - SQLITE_DROP_TEMP_VIEW = ffi::SQLITE_DROP_TEMP_VIEW as isize, - SQLITE_DROP_TRIGGER = ffi::SQLITE_DROP_TRIGGER as isize, - SQLITE_DROP_VIEW = ffi::SQLITE_DROP_VIEW as isize, SQLITE_INSERT = ffi::SQLITE_INSERT as isize, - SQLITE_PRAGMA = ffi::SQLITE_PRAGMA as isize, - SQLITE_READ = ffi::SQLITE_READ as isize, - SQLITE_SELECT = ffi::SQLITE_SELECT as isize, - SQLITE_TRANSACTION = ffi::SQLITE_TRANSACTION as isize, SQLITE_UPDATE = ffi::SQLITE_UPDATE as isize, - SQLITE_ATTACH = ffi::SQLITE_ATTACH as isize, - SQLITE_DETACH = ffi::SQLITE_DETACH as isize, - SQLITE_ALTER_TABLE = ffi::SQLITE_ALTER_TABLE as isize, - SQLITE_REINDEX = ffi::SQLITE_REINDEX as isize, - SQLITE_ANALYZE = ffi::SQLITE_ANALYZE as isize, - SQLITE_CREATE_VTABLE = ffi::SQLITE_CREATE_VTABLE as isize, - SQLITE_DROP_VTABLE = ffi::SQLITE_DROP_VTABLE as isize, - SQLITE_FUNCTION = ffi::SQLITE_FUNCTION as isize, - SQLITE_SAVEPOINT = ffi::SQLITE_SAVEPOINT as isize, - SQLITE_COPY = ffi::SQLITE_COPY as isize, - SQLITE_RECURSIVE = 33, } impl From for Action { fn from(code: i32) -> Action { match code { - ffi::SQLITE_CREATE_INDEX => Action::SQLITE_CREATE_INDEX, - ffi::SQLITE_CREATE_TABLE => Action::SQLITE_CREATE_TABLE, - ffi::SQLITE_CREATE_TEMP_INDEX => Action::SQLITE_CREATE_TEMP_INDEX, - ffi::SQLITE_CREATE_TEMP_TABLE => Action::SQLITE_CREATE_TEMP_TABLE, - ffi::SQLITE_CREATE_TEMP_TRIGGER => Action::SQLITE_CREATE_TEMP_TRIGGER, - ffi::SQLITE_CREATE_TEMP_VIEW => Action::SQLITE_CREATE_TEMP_VIEW, - ffi::SQLITE_CREATE_TRIGGER => Action::SQLITE_CREATE_TRIGGER, - ffi::SQLITE_CREATE_VIEW => Action::SQLITE_CREATE_VIEW, ffi::SQLITE_DELETE => Action::SQLITE_DELETE, - ffi::SQLITE_DROP_INDEX => Action::SQLITE_DROP_INDEX, - ffi::SQLITE_DROP_TABLE => Action::SQLITE_DROP_TABLE, - ffi::SQLITE_DROP_TEMP_INDEX => Action::SQLITE_DROP_TEMP_INDEX, - ffi::SQLITE_DROP_TEMP_TABLE => Action::SQLITE_DROP_TEMP_TABLE, - ffi::SQLITE_DROP_TEMP_TRIGGER => Action::SQLITE_DROP_TEMP_TRIGGER, - ffi::SQLITE_DROP_TEMP_VIEW => Action::SQLITE_DROP_TEMP_VIEW, - ffi::SQLITE_DROP_TRIGGER => Action::SQLITE_DROP_TRIGGER, - ffi::SQLITE_DROP_VIEW => Action::SQLITE_DROP_VIEW, ffi::SQLITE_INSERT => Action::SQLITE_INSERT, - ffi::SQLITE_PRAGMA => Action::SQLITE_PRAGMA, - ffi::SQLITE_READ => Action::SQLITE_READ, - ffi::SQLITE_SELECT => Action::SQLITE_SELECT, - ffi::SQLITE_TRANSACTION => Action::SQLITE_TRANSACTION, ffi::SQLITE_UPDATE => Action::SQLITE_UPDATE, - ffi::SQLITE_ATTACH => Action::SQLITE_ATTACH, - ffi::SQLITE_DETACH => Action::SQLITE_DETACH, - ffi::SQLITE_ALTER_TABLE => Action::SQLITE_ALTER_TABLE, - ffi::SQLITE_REINDEX => Action::SQLITE_REINDEX, - ffi::SQLITE_ANALYZE => Action::SQLITE_ANALYZE, - ffi::SQLITE_CREATE_VTABLE => Action::SQLITE_CREATE_VTABLE, - ffi::SQLITE_DROP_VTABLE => Action::SQLITE_DROP_VTABLE, - ffi::SQLITE_FUNCTION => Action::SQLITE_FUNCTION, - ffi::SQLITE_SAVEPOINT => Action::SQLITE_SAVEPOINT, - ffi::SQLITE_COPY => Action::SQLITE_COPY, - 33 => Action::SQLITE_RECURSIVE, _ => Action::UNKNOWN, } } diff --git a/src/lib.rs b/src/lib.rs index a4e7591..40434f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,6 +115,7 @@ mod busy; mod cache; #[cfg(any(feature = "functions", feature = "vtab"))] mod context; +#[macro_use] mod error; #[cfg(feature = "functions")] pub mod functions; @@ -126,6 +127,8 @@ pub mod limits; mod load_extension_guard; mod raw_statement; mod row; +#[cfg(feature = "session")] +pub mod session; mod statement; #[cfg(feature = "trace")] pub mod trace; @@ -193,7 +196,7 @@ pub enum DatabaseName<'a> { // Currently DatabaseName is only used by the backup and blob mods, so hide // this (private) impl to avoid dead code warnings. -#[cfg(any(feature = "backup", feature = "blob"))] +#[cfg(any(feature = "backup", feature = "blob", feature = "session"))] impl<'a> DatabaseName<'a> { fn to_cstring(&self) -> Result { use self::DatabaseName::{Attached, Main, Temp}; diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..a07bd00 --- /dev/null +++ b/src/session.rs @@ -0,0 +1,888 @@ +//! [Session Extension](https://sqlite.org/sessionintro.html) +#![allow(non_camel_case_types)] + +use std::ffi::CStr; +use std::io::{Read, Write}; +use std::marker::PhantomData; +use std::mem; +use std::os::raw::{c_char, c_int, c_uchar, c_void}; +use std::panic::{catch_unwind, RefUnwindSafe}; +use std::ptr; +use std::slice::{from_raw_parts, from_raw_parts_mut}; + +use fallible_streaming_iterator::FallibleStreamingIterator; + +use crate::error::error_from_sqlite_code; +use crate::ffi; +use crate::hooks::Action; +use crate::types::ValueRef; +use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result}; + +// https://sqlite.org/session.html + +/// An instance of this object is a session that can be used to record changes +/// to a database. +pub struct Session<'conn> { + phantom: PhantomData<&'conn ()>, + s: *mut ffi::sqlite3_session, + filter: Option bool>>, +} + +impl<'conn> Session<'conn> { + /// Create a new session object + pub fn new(db: &'conn Connection) -> Result> { + Session::new_with_name(db, DatabaseName::Main) + } + + /// Create a new session object + pub fn new_with_name(db: &'conn Connection, name: DatabaseName<'_>) -> Result> { + let name = name.to_cstring()?; + + let db = db.db.borrow_mut().db; + + let mut s: *mut ffi::sqlite3_session = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) }); + + Ok(Session { + phantom: PhantomData, + s, + filter: None, + }) + } + + /// Set a table filter + pub fn table_filter(&mut self, filter: Option) + where + F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, + { + unsafe extern "C" fn call_boxed_closure( + p_arg: *mut c_void, + tbl_str: *const c_char, + ) -> c_int + where + F: Fn(&str) -> bool + RefUnwindSafe, + { + use std::ffi::CStr; + use std::str; + + let boxed_filter: *mut F = p_arg as *mut F; + let tbl_name = { + let c_slice = CStr::from_ptr(tbl_str).to_bytes(); + str::from_utf8_unchecked(c_slice) + }; + if let Ok(true) = catch_unwind(|| (*boxed_filter)(tbl_name)) { + 1 + } else { + 0 + } + } + + match filter { + Some(filter) => { + let boxed_filter = Box::new(filter); + unsafe { + ffi::sqlite3session_table_filter( + self.s, + Some(call_boxed_closure::), + &*boxed_filter as *const F as *mut _, + ); + } + self.filter = Some(boxed_filter); + } + _ => { + unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) } + self.filter = None; + } + }; + } + + /// Attach a table. `None` means all tables. + pub fn attach(&mut self, table: Option<&str>) -> Result<()> { + let table = if let Some(table) = table { + str_to_cstring(table)?.as_ptr() + } else { + ptr::null() + }; + unsafe { check!(ffi::sqlite3session_attach(self.s, table)) }; + Ok(()) + } + + /// Generate a Changeset + pub fn changeset(&mut self) -> Result { + let mut n = 0; + let mut cs: *mut c_void = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) }); + Ok(Changeset { cs, n }) + } + + /// Write the set of changes represented by this session to `output`. + pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> { + let output_ref = &output; + check!(unsafe { + ffi::sqlite3session_changeset_strm( + self.s, + Some(x_output), + output_ref as *const &mut dyn Write as *mut c_void, + ) + }); + Ok(()) + } + + /// Generate a Patchset + pub fn patchset(&mut self) -> Result { + let mut n = 0; + let mut ps: *mut c_void = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) }); + // TODO Validate: same struct + Ok(Changeset { cs: ps, n }) + } + + /// Write the set of patches represented by this session to `output`. + pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> { + let output_ref = &output; + check!(unsafe { + ffi::sqlite3session_patchset_strm( + self.s, + Some(x_output), + output_ref as *const &mut dyn Write as *mut c_void, + ) + }); + Ok(()) + } + + /// Load the difference between tables. + pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> { + let from = from.to_cstring()?; + let table = str_to_cstring(table)?.as_ptr(); + unsafe { + let mut errmsg: *mut c_char = mem::uninitialized(); + let r = ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg); + if r != ffi::SQLITE_OK { + let message = errmsg_to_string(&*errmsg); + ffi::sqlite3_free(errmsg as *mut ::std::os::raw::c_void); + return Err(error_from_sqlite_code(r, Some(message))); + } + } + Ok(()) + } + + /// Test if a changeset has recorded any changes + pub fn is_empty(&self) -> bool { + unsafe { ffi::sqlite3session_isempty(self.s) != 0 } + } + + /// Query the current state of the session + pub fn is_enabled(&self) -> bool { + unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 } + } + + /// Enable or disable the recording of changes + pub fn set_enabled(&mut self, enabled: bool) { + unsafe { + ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 }); + } + } + + /// Query the current state of the indirect flag + pub fn is_indirect(&self) -> bool { + unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 } + } + + /// Set or clear the indirect change flag + pub fn set_indirect(&mut self, indirect: bool) { + unsafe { + ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 }); + } + } +} + +impl<'conn> Drop for Session<'conn> { + fn drop(&mut self) { + if self.filter.is_some() { + self.table_filter(None:: bool>); + } + unsafe { ffi::sqlite3session_delete(self.s) }; + } +} + +/// Invert a changeset +pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> { + let input_ref = &input; + let output_ref = &output; + check!(unsafe { + ffi::sqlite3changeset_invert_strm( + Some(x_input), + input_ref as *const &mut dyn Read as *mut c_void, + Some(x_output), + output_ref as *const &mut dyn Write as *mut c_void, + ) + }); + Ok(()) +} + +/// Combine two changesets +pub fn concat_strm( + input_a: &mut dyn Read, + input_b: &mut dyn Read, + output: &mut dyn Write, +) -> Result<()> { + let input_a_ref = &input_a; + let input_b_ref = &input_b; + let output_ref = &output; + check!(unsafe { + ffi::sqlite3changeset_concat_strm( + Some(x_input), + input_a_ref as *const &mut dyn Read as *mut c_void, + Some(x_input), + input_b_ref as *const &mut dyn Read as *mut c_void, + Some(x_output), + output_ref as *const &mut dyn Write as *mut c_void, + ) + }); + Ok(()) +} + +/// Changeset or Patchset +pub struct Changeset { + cs: *mut c_void, + n: c_int, +} + +impl Changeset { + /// Invert a changeset + pub fn invert(&self) -> Result { + let mut n = 0; + let mut cs: *mut c_void = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs) }); + Ok(Changeset { cs, n }) + } + + /// Create an iterator to traverse a changeset + pub fn iter<'changeset>(&'changeset self) -> Result> { + let mut it: *mut ffi::sqlite3_changeset_iter = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3changeset_start(&mut it, self.n, self.cs) }); + Ok(ChangesetIter { + phantom: PhantomData, + it, + item: None, + }) + } + + /// Concatenate two changeset objects + pub fn concat(a: &Changeset, b: &Changeset) -> Result { + let mut n = 0; + let mut cs: *mut c_void = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs) }); + Ok(Changeset { cs, n }) + } +} + +impl Drop for Changeset { + fn drop(&mut self) { + unsafe { + ffi::sqlite3_free(self.cs); + } + } +} + +/// Cursor for iterating over the elements of a changeset or patchset. +pub struct ChangesetIter<'changeset> { + phantom: PhantomData<&'changeset ()>, + it: *mut ffi::sqlite3_changeset_iter, + item: Option, +} + +impl<'changeset> ChangesetIter<'changeset> { + /// Create an iterator on `input` + pub fn start_strm<'input>(input: &'input mut dyn Read) -> Result> { + let input_ref = &input; + let mut it: *mut ffi::sqlite3_changeset_iter = unsafe { mem::uninitialized() }; + check!(unsafe { + ffi::sqlite3changeset_start_strm( + &mut it, + Some(x_input), + input_ref as *const &mut dyn Read as *mut c_void, + ) + }); + Ok(ChangesetIter { + phantom: PhantomData, + it, + item: None, + }) + } +} + +impl<'changeset> FallibleStreamingIterator for ChangesetIter<'changeset> { + type Error = crate::error::Error; + type Item = ChangesetItem; + + fn advance(&mut self) -> Result<()> { + let rc = unsafe { ffi::sqlite3changeset_next(self.it) }; + match rc { + ffi::SQLITE_ROW => { + self.item = Some(ChangesetItem { it: self.it }); + Ok(()) + } + ffi::SQLITE_DONE => { + self.item = None; + Ok(()) + } + code => Err(error_from_sqlite_code(code, None)), + } + } + + fn get(&self) -> Option<&ChangesetItem> { + self.item.as_ref() + } +} + +pub struct Operation<'item> { + table_name: &'item str, + number_of_columns: i32, + code: Action, + indirect: bool, +} + +impl<'item> Operation<'item> { + pub fn table_name(&self) -> &str { + self.table_name + } + + pub fn number_of_columns(&self) -> i32 { + self.number_of_columns + } + + pub fn code(&self) -> Action { + self.code + } + + pub fn indirect(&self) -> bool { + self.indirect + } +} + +impl<'changeset> Drop for ChangesetIter<'changeset> { + fn drop(&mut self) { + unsafe { + ffi::sqlite3changeset_finalize(self.it); + } + } +} + +/// An item passed to a conflict-handler by `Connection::apply`, +/// or an item generated by `ChangesetIter::next`. +// TODO enum ? Delete, Insert, Update, ... +pub struct ChangesetItem { + it: *mut ffi::sqlite3_changeset_iter, +} + +impl ChangesetItem { + /// Obtain conflicting row values + /// + /// May only be called with an `SQLITE_CHANGESET_DATA` or + /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback. + pub fn conflict(&self, col: usize) -> Result> { + unsafe { + let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized(); + check!(ffi::sqlite3changeset_conflict( + self.it, + col as i32, + &mut p_value + )); + Ok(ValueRef::from_value(p_value)) + } + } + + /// Determine the number of foreign key constraint violations + /// + /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict + /// handler callback. + pub fn fk_conflicts(&self) -> Result { + unsafe { + let mut p_out = 0; + check!(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out)); + Ok(p_out) + } + } + + /// Obtain new.* Values + /// + /// May only be called if the type of change is either `SQLITE_UPDATE` or + /// `SQLITE_INSERT`. + pub fn new_value(&self, col: usize) -> Result> { + unsafe { + let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized(); + check!(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value)); + Ok(ValueRef::from_value(p_value)) + } + } + + /// Obtain old.* Values + /// + /// May only be called if the type of change is either `SQLITE_DELETE` or + /// `SQLITE_UPDATE`. + pub fn old_value(&self, col: usize) -> Result> { + unsafe { + let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized(); + check!(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value)); + Ok(ValueRef::from_value(p_value)) + } + } + + /// Obtain the current operation + pub fn op(&self) -> Result> { + let mut number_of_columns = 0; + let mut code = 0; + let mut indirect = 0; + let tab = unsafe { + let mut pz_tab: *const c_char = mem::uninitialized(); + check!(ffi::sqlite3changeset_op( + self.it, + &mut pz_tab, + &mut number_of_columns, + &mut code, + &mut indirect + )); + CStr::from_ptr(pz_tab) + }; + let table_name = tab.to_str()?; + Ok(Operation { + table_name, + number_of_columns, + code: Action::from(code), + indirect: indirect != 0, + }) + } + + /// Obtain the primary key definition of a table + pub fn pk(&self) -> Result<&[u8]> { + let mut number_of_columns = 0; + unsafe { + let mut pks: *mut c_uchar = mem::uninitialized(); + check!(ffi::sqlite3changeset_pk( + self.it, + &mut pks, + &mut number_of_columns + )); + Ok(from_raw_parts(pks, number_of_columns as usize)) + } + } +} + +/// Used to combine two or more changesets or +/// patchsets +pub struct Changegroup { + cg: *mut ffi::sqlite3_changegroup, +} + +impl Changegroup { + pub fn new() -> Result { + let mut cg: *mut ffi::sqlite3_changegroup = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3changegroup_new(&mut cg) }); + Ok(Changegroup { cg }) + } + + /// Add a changeset + pub fn add(&mut self, cs: &Changeset) -> Result<()> { + check!(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) }); + Ok(()) + } + + /// Add a changeset read from `input` to this change group. + pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> { + let input_ref = &input; + check!(unsafe { + ffi::sqlite3changegroup_add_strm( + self.cg, + Some(x_input), + input_ref as *const &mut dyn Read as *mut c_void, + ) + }); + Ok(()) + } + + /// Obtain a composite Changeset + pub fn output(&mut self) -> Result { + let mut n = 0; + let mut output: *mut c_void = unsafe { mem::uninitialized() }; + check!(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) }); + Ok(Changeset { cs: output, n }) + } + + /// Write the combined set of changes to `output`. + pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> { + let output_ref = &output; + check!(unsafe { + ffi::sqlite3changegroup_output_strm( + self.cg, + Some(x_output), + output_ref as *const &mut dyn Write as *mut c_void, + ) + }); + Ok(()) + } +} + +impl Drop for Changegroup { + fn drop(&mut self) { + unsafe { + ffi::sqlite3changegroup_delete(self.cg); + } + } +} + +impl Connection { + /// Apply a changeset to a database + pub fn apply(&self, cs: &Changeset, filter: Option, conflict: C) -> Result<()> + where + F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, + C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, + { + let db = self.db.borrow_mut().db; + + let filtered = filter.is_some(); + let tuple = &mut (filter, conflict); + check!(unsafe { + if filtered { + ffi::sqlite3changeset_apply( + db, + cs.n, + cs.cs, + Some(call_filter::), + Some(call_conflict::), + tuple as *mut (Option, C) as *mut c_void, + ) + } else { + ffi::sqlite3changeset_apply( + db, + cs.n, + cs.cs, + None, + Some(call_conflict::), + tuple as *mut (Option, C) as *mut c_void, + ) + } + }); + Ok(()) + } + + /// Apply a changeset to a database + pub fn apply_strm( + &self, + input: &mut dyn Read, + filter: Option, + conflict: C, + ) -> Result<()> + where + F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, + C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, + { + let input_ref = &input; + let db = self.db.borrow_mut().db; + + let filtered = filter.is_some(); + let tuple = &mut (filter, conflict); + check!(unsafe { + if filtered { + ffi::sqlite3changeset_apply_strm( + db, + Some(x_input), + input_ref as *const &mut dyn Read as *mut c_void, + Some(call_filter::), + Some(call_conflict::), + tuple as *mut (Option, C) as *mut c_void, + ) + } else { + ffi::sqlite3changeset_apply_strm( + db, + Some(x_input), + input_ref as *const &mut dyn Read as *mut c_void, + None, + Some(call_conflict::), + tuple as *mut (Option, C) as *mut c_void, + ) + } + }); + Ok(()) + } +} + +/// Constants passed to the conflict handler +#[derive(Debug, PartialEq)] +pub enum ConflictType { + UNKNOWN = -1, + SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA as isize, + SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND as isize, + SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT as isize, + SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT as isize, + SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY as isize, +} +impl From for ConflictType { + fn from(code: i32) -> ConflictType { + match code { + ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA, + ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND, + ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT, + ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT, + ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY, + _ => ConflictType::UNKNOWN, + } + } +} + +/// Constants returned by the conflict handler +#[derive(Debug, PartialEq)] +pub enum ConflictAction { + SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT as isize, + SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE as isize, + SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT as isize, +} + +unsafe extern "C" fn call_filter(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int +where + F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, + C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, +{ + use std::ffi::CStr; + use std::str; + + let tuple: *mut (Option, C) = p_ctx as *mut (Option, C); + let tbl_name = { + let c_slice = CStr::from_ptr(tbl_str).to_bytes(); + str::from_utf8_unchecked(c_slice) + }; + match *tuple { + (Some(ref filter), _) => { + if let Ok(true) = catch_unwind(|| filter(tbl_name)) { + 1 + } else { + 0 + } + } + _ => unimplemented!(), + } +} + +unsafe extern "C" fn call_conflict( + p_ctx: *mut c_void, + e_conflict: c_int, + p: *mut ffi::sqlite3_changeset_iter, +) -> c_int +where + F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, + C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, +{ + let tuple: *mut (Option, C) = p_ctx as *mut (Option, C); + let conflict_type = ConflictType::from(e_conflict); + let item = ChangesetItem { it: p }; + if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) { + action as c_int + } else { + ffi::SQLITE_CHANGESET_ABORT + } +} + +unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int { + if p_in.is_null() { + return ffi::SQLITE_MISUSE; + } + let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, len as usize); + let input = p_in as *mut &mut dyn Read; + match (*input).read(bytes) { + Ok(n) => { + *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to + // produce bytes. + ffi::SQLITE_OK + } + Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate + } +} + +unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int { + if p_out.is_null() { + return ffi::SQLITE_MISUSE; + } + // The sessions module never invokes an xOutput callback with the third + // parameter set to a value less than or equal to zero. + let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize); + let output = p_out as *mut &mut dyn Write; + match (*output).write_all(bytes) { + Ok(_) => ffi::SQLITE_OK, + Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate + } +} + +#[cfg(test)] +mod test { + use fallible_streaming_iterator::FallibleStreamingIterator; + use std::sync::atomic::{AtomicBool, Ordering}; + + use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session}; + use crate::hooks::Action; + use crate::Connection; + + fn one_changeset() -> Changeset { + let db = Connection::open_in_memory().unwrap(); + db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);") + .unwrap(); + + let mut session = Session::new(&db).unwrap(); + assert!(session.is_empty()); + + session.attach(None).unwrap(); + db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"]) + .unwrap(); + + session.changeset().unwrap() + } + + fn one_changeset_strm() -> Vec { + let db = Connection::open_in_memory().unwrap(); + db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);") + .unwrap(); + + let mut session = Session::new(&db).unwrap(); + assert!(session.is_empty()); + + session.attach(None).unwrap(); + db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"]) + .unwrap(); + + let mut output = Vec::new(); + session.changeset_strm(&mut output).unwrap(); + output + } + + #[test] + fn test_changeset() { + let changeset = one_changeset(); + let mut iter = changeset.iter().unwrap(); + let item = iter.next().unwrap(); + assert!(item.is_some()); + + let item = item.unwrap(); + let op = item.op().unwrap(); + assert_eq!("foo", op.table_name()); + assert_eq!(1, op.number_of_columns()); + assert_eq!(Action::SQLITE_INSERT, op.code()); + assert_eq!(false, op.indirect()); + + let pk = item.pk().unwrap(); + assert_eq!(&[1], pk); + + let new_value = item.new_value(0).unwrap(); + assert_eq!(Ok("bar"), new_value.as_str()); + } + + #[test] + fn test_changeset_strm() { + let output = one_changeset_strm(); + assert!(!output.is_empty()); + assert_eq!(14, output.len()); + + let mut input = output.as_slice(); + let mut iter = ChangesetIter::start_strm(&mut input).unwrap(); + let item = iter.next().unwrap(); + assert!(item.is_some()); + } + + #[test] + fn test_changeset_apply() { + let changeset = one_changeset(); + + let db = Connection::open_in_memory().unwrap(); + db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);") + .unwrap(); + + lazy_static! { + static ref called: AtomicBool = AtomicBool::new(false); + } + db.apply( + &changeset, + None:: bool>, + |_conflict_type, _item| { + called.store(true, Ordering::Relaxed); + ConflictAction::SQLITE_CHANGESET_OMIT + }, + ) + .unwrap(); + + assert!(!called.load(Ordering::Relaxed)); + let check = db + .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| row.get(0)) + .unwrap(); + assert_eq!(1, check); + + // conflict expected when same changeset applied again on the same db + db.apply( + &changeset, + None:: bool>, + |conflict_type, item| { + called.store(true, Ordering::Relaxed); + assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type); + let conflict = item.conflict(0).unwrap(); + assert_eq!(Ok("bar"), conflict.as_str()); + ConflictAction::SQLITE_CHANGESET_OMIT + }, + ) + .unwrap(); + assert!(called.load(Ordering::Relaxed)); + } + + #[test] + fn test_changeset_apply_strm() { + let output = one_changeset_strm(); + + let db = Connection::open_in_memory().unwrap(); + db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);") + .unwrap(); + + db.apply_strm( + &mut output.as_slice(), + None:: bool>, + |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT, + ) + .unwrap(); + + let check = db + .query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| row.get(0)) + .unwrap(); + assert_eq!(1, check); + } + + #[test] + fn test_session_empty() { + let db = Connection::open_in_memory().unwrap(); + db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);") + .unwrap(); + + let mut session = Session::new(&db).unwrap(); + assert!(session.is_empty()); + + session.attach(None).unwrap(); + db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"]) + .unwrap(); + + assert!(!session.is_empty()); + } + + #[test] + fn test_session_set_enabled() { + let db = Connection::open_in_memory().unwrap(); + + let mut session = Session::new(&db).unwrap(); + assert!(session.is_enabled()); + session.set_enabled(false); + assert!(!session.is_enabled()); + } + + #[test] + fn test_session_set_indirect() { + let db = Connection::open_in_memory().unwrap(); + + let mut session = Session::new(&db).unwrap(); + assert!(!session.is_indirect()); + session.set_indirect(true); + assert!(session.is_indirect()); + } +} diff --git a/src/types/from_sql.rs b/src/types/from_sql.rs index b36e079..79b477c 100644 --- a/src/types/from_sql.rs +++ b/src/types/from_sql.rs @@ -22,6 +22,18 @@ pub enum FromSqlError { Other(Box), } +impl PartialEq for FromSqlError { + fn eq(&self, other: &FromSqlError) -> bool { + match (self, other) { + (FromSqlError::InvalidType, FromSqlError::InvalidType) => true, + (FromSqlError::OutOfRange(n1), FromSqlError::OutOfRange(n2)) => n1 == n2, + #[cfg(feature = "i128_blob")] + (FromSqlError::InvalidI128Size(s1), FromSqlError::InvalidI128Size(s2)) => s1 == s2, + (_, _) => false, + } + } +} + impl fmt::Display for FromSqlError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { diff --git a/src/types/value_ref.rs b/src/types/value_ref.rs index 6da2634..a701256 100644 --- a/src/types/value_ref.rs +++ b/src/types/value_ref.rs @@ -104,3 +104,56 @@ impl<'a> From<&'a Value> for ValueRef<'a> { } } } + +#[cfg(any(feature = "functions", feature = "session", feature = "vtab"))] +impl<'a> ValueRef<'a> { + pub(crate) unsafe fn from_value(value: *mut crate::ffi::sqlite3_value) -> ValueRef<'a> { + use crate::ffi; + use std::ffi::CStr; + use std::os::raw::c_char; + use std::slice::from_raw_parts; + + match ffi::sqlite3_value_type(value) { + ffi::SQLITE_NULL => ValueRef::Null, + ffi::SQLITE_INTEGER => ValueRef::Integer(ffi::sqlite3_value_int64(value)), + ffi::SQLITE_FLOAT => ValueRef::Real(ffi::sqlite3_value_double(value)), + ffi::SQLITE_TEXT => { + let text = ffi::sqlite3_value_text(value); + assert!( + !text.is_null(), + "unexpected SQLITE_TEXT value type with NULL data" + ); + let s = CStr::from_ptr(text as *const c_char); + + // sqlite3_value_text returns UTF8 data, so our unwrap here should be fine. + let s = s + .to_str() + .expect("sqlite3_value_text returned invalid UTF-8"); + ValueRef::Text(s) + } + ffi::SQLITE_BLOB => { + let (blob, len) = ( + ffi::sqlite3_value_blob(value), + ffi::sqlite3_value_bytes(value), + ); + + assert!( + len >= 0, + "unexpected negative return from sqlite3_value_bytes" + ); + if len > 0 { + assert!( + !blob.is_null(), + "unexpected SQLITE_BLOB value type with NULL data" + ); + ValueRef::Blob(from_raw_parts(blob as *const u8, len as usize)) + } else { + // The return value from sqlite3_value_blob() for a zero-length BLOB + // is a NULL pointer. + ValueRef::Blob(&[]) + } + } + _ => unreachable!("sqlite3_value_type returned invalid value"), + } + } +}