Session extension (#459)

Session extension bindings
This commit is contained in:
gwenn 2019-01-13 12:46:19 +01:00 committed by GitHub
parent f6a4feecea
commit dbc4eef657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 996 additions and 116 deletions

View File

@ -43,6 +43,8 @@ vtab_v3 = ["vtab"]
csvtab = ["csv", "vtab"] csvtab = ["csv", "vtab"]
# pointer passing interfaces: 3.20.0 # pointer passing interfaces: 3.20.0
array = ["vtab"] array = ["vtab"]
# session extension: 3.13.0
session = ["libsqlite3-sys/session", "hooks", "fallible-streaming-iterator"]
[dependencies] [dependencies]
time = "0.1.0" time = "0.1.0"
@ -53,6 +55,7 @@ serde_json = { version = "1.0", optional = true }
csv = { version = "1.0", optional = true } csv = { version = "1.0", optional = true }
lazy_static = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true }
byteorder = { version = "1.2", features = ["i128"], optional = true } byteorder = { version = "1.2", features = ["i128"], optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
tempdir = "0.3" tempdir = "0.3"

View File

@ -103,6 +103,7 @@ features](https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-s
* [`csvtab`](https://sqlite.org/csv.html), CSV virtual table written in Rust. * [`csvtab`](https://sqlite.org/csv.html), CSV virtual table written in Rust.
* [`array`](https://sqlite.org/carray.html), The `rarray()` Table-Valued Function. * [`array`](https://sqlite.org/carray.html), The `rarray()` Table-Valued Function.
* `i128_blob` allows storing values of type `i128` type in SQLite databases. Internally, the data is stored as a 16 byte big-endian blob, with the most significant bit flipped, which allows ordering and comparison between different blobs storing i128s to work as expected. * `i128_blob` allows storing values of type `i128` type in SQLite databases. Internally, the data is stored as a 16 byte big-endian blob, with the most significant bit flipped, which allows ordering and comparison between different blobs storing i128s to work as expected.
* [`session`](https://sqlite.org/sessionintro.html), Session module extension.
## Notes on building rusqlite and libsqlite3-sys ## Notes on building rusqlite and libsqlite3-sys

View File

@ -22,6 +22,10 @@ min_sqlite_version_3_7_7 = ["pkg-config", "vcpkg"]
min_sqlite_version_3_7_16 = ["pkg-config", "vcpkg"] min_sqlite_version_3_7_16 = ["pkg-config", "vcpkg"]
# sqlite3_unlock_notify >= 3.6.12 # sqlite3_unlock_notify >= 3.6.12
unlock_notify = [] unlock_notify = []
# 3.13.0
preupdate_hook = []
# 3.13.0
session = ["preupdate_hook"]
[build-dependencies] [build-dependencies]
bindgen = { version = "0.46", optional = true } bindgen = { version = "0.46", optional = true }

View File

@ -54,6 +54,12 @@ mod build {
if cfg!(feature = "unlock_notify") { if cfg!(feature = "unlock_notify") {
cfg.flag("-DSQLITE_ENABLE_UNLOCK_NOTIFY"); cfg.flag("-DSQLITE_ENABLE_UNLOCK_NOTIFY");
} }
if cfg!(feature = "preupdate_hook") {
cfg.flag("-DSQLITE_ENABLE_PREUPDATE_HOOK");
}
if cfg!(feature = "session") {
cfg.flag("-DSQLITE_ENABLE_SESSION");
}
cfg.compile("libsqlite3.a"); cfg.compile("libsqlite3.a");
println!("cargo:lib_dir={}", out_dir); println!("cargo:lib_dir={}", out_dir);
@ -237,10 +243,22 @@ mod bindings {
pub fn write_to_out_dir(header: HeaderLocation, out_path: &Path) { pub fn write_to_out_dir(header: HeaderLocation, out_path: &Path) {
let header: String = header.into(); let header: String = header.into();
let mut output = Vec::new(); let mut output = Vec::new();
bindgen::builder() let mut bindings = bindgen::builder()
.header(header.clone()) .header(header.clone())
.parse_callbacks(Box::new(SqliteTypeChooser)) .parse_callbacks(Box::new(SqliteTypeChooser))
.rustfmt_bindings(true) .rustfmt_bindings(true);
if cfg!(feature = "unlock_notify") {
bindings = bindings.clang_arg("-DSQLITE_ENABLE_UNLOCK_NOTIFY");
}
if cfg!(feature = "preupdate_hook") {
bindings = bindings.clang_arg("-DSQLITE_ENABLE_PREUPDATE_HOOK");
}
if cfg!(feature = "session") {
bindings = bindings.clang_arg("-DSQLITE_ENABLE_SESSION");
}
bindings
.generate() .generate()
.expect(&format!("could not run bindgen on header {}", header)) .expect(&format!("could not run bindgen on header {}", header))
.write(Box::new(&mut output)) .write(Box::new(&mut output))

View File

@ -14,55 +14,6 @@ use crate::types::{ToSqlOutput, ValueRef};
#[cfg(feature = "array")] #[cfg(feature = "array")]
use crate::vtab::array::{free_array, ARRAY_TYPE}; use crate::vtab::array::{free_array, ARRAY_TYPE};
impl<'a> ValueRef<'a> {
pub(crate) unsafe fn from_value(value: *mut sqlite3_value) -> ValueRef<'a> {
use std::slice::from_raw_parts;
match ffi::sqlite3_value_type(value) {
ffi::SQLITE_NULL => ValueRef::Null,
ffi::SQLITE_INTEGER => ValueRef::Integer(ffi::sqlite3_value_int64(value)),
ffi::SQLITE_FLOAT => ValueRef::Real(ffi::sqlite3_value_double(value)),
ffi::SQLITE_TEXT => {
let text = ffi::sqlite3_value_text(value);
assert!(
!text.is_null(),
"unexpected SQLITE_TEXT value type with NULL data"
);
let s = CStr::from_ptr(text as *const c_char);
// sqlite3_value_text returns UTF8 data, so our unwrap here should be fine.
let s = s
.to_str()
.expect("sqlite3_value_text returned invalid UTF-8");
ValueRef::Text(s)
}
ffi::SQLITE_BLOB => {
let (blob, len) = (
ffi::sqlite3_value_blob(value),
ffi::sqlite3_value_bytes(value),
);
assert!(
len >= 0,
"unexpected negative return from sqlite3_value_bytes"
);
if len > 0 {
assert!(
!blob.is_null(),
"unexpected SQLITE_BLOB value type with NULL data"
);
ValueRef::Blob(from_raw_parts(blob as *const u8, len as usize))
} else {
// The return value from sqlite3_value_blob() for a zero-length BLOB
// is a NULL pointer.
ValueRef::Blob(&[])
}
}
_ => unreachable!("sqlite3_value_type returned invalid value"),
}
}
}
pub(crate) unsafe fn set_result<'a>(ctx: *mut sqlite3_context, result: &ToSqlOutput<'a>) { pub(crate) unsafe fn set_result<'a>(ctx: *mut sqlite3_context, result: &ToSqlOutput<'a>) {
let value = match *result { let value = match *result {
ToSqlOutput::Borrowed(v) => v, ToSqlOutput::Borrowed(v) => v,

View File

@ -290,3 +290,12 @@ pub fn error_from_handle(db: *mut ffi::sqlite3, code: c_int) -> Error {
}; };
error_from_sqlite_code(code, message) error_from_sqlite_code(code, message)
} }
macro_rules! check {
($funcall:expr) => {{
let rc = $funcall;
if rc != crate::ffi::SQLITE_OK {
Err(crate::error::error_from_sqlite_code(rc, None))?;
}
}};
}

View File

@ -9,83 +9,21 @@ use crate::ffi;
use crate::{Connection, InnerConnection}; use crate::{Connection, InnerConnection};
/// Authorizer Action Codes /// Action Codes
#[derive(Debug, PartialEq)] #[derive(Clone, Copy, Debug, PartialEq)]
pub enum Action { pub enum Action {
UNKNOWN = -1, UNKNOWN = -1,
SQLITE_CREATE_INDEX = ffi::SQLITE_CREATE_INDEX as isize,
SQLITE_CREATE_TABLE = ffi::SQLITE_CREATE_TABLE as isize,
SQLITE_CREATE_TEMP_INDEX = ffi::SQLITE_CREATE_TEMP_INDEX as isize,
SQLITE_CREATE_TEMP_TABLE = ffi::SQLITE_CREATE_TEMP_TABLE as isize,
SQLITE_CREATE_TEMP_TRIGGER = ffi::SQLITE_CREATE_TEMP_TRIGGER as isize,
SQLITE_CREATE_TEMP_VIEW = ffi::SQLITE_CREATE_TEMP_VIEW as isize,
SQLITE_CREATE_TRIGGER = ffi::SQLITE_CREATE_TRIGGER as isize,
SQLITE_CREATE_VIEW = ffi::SQLITE_CREATE_VIEW as isize,
SQLITE_DELETE = ffi::SQLITE_DELETE as isize, SQLITE_DELETE = ffi::SQLITE_DELETE as isize,
SQLITE_DROP_INDEX = ffi::SQLITE_DROP_INDEX as isize,
SQLITE_DROP_TABLE = ffi::SQLITE_DROP_TABLE as isize,
SQLITE_DROP_TEMP_INDEX = ffi::SQLITE_DROP_TEMP_INDEX as isize,
SQLITE_DROP_TEMP_TABLE = ffi::SQLITE_DROP_TEMP_TABLE as isize,
SQLITE_DROP_TEMP_TRIGGER = ffi::SQLITE_DROP_TEMP_TRIGGER as isize,
SQLITE_DROP_TEMP_VIEW = ffi::SQLITE_DROP_TEMP_VIEW as isize,
SQLITE_DROP_TRIGGER = ffi::SQLITE_DROP_TRIGGER as isize,
SQLITE_DROP_VIEW = ffi::SQLITE_DROP_VIEW as isize,
SQLITE_INSERT = ffi::SQLITE_INSERT as isize, SQLITE_INSERT = ffi::SQLITE_INSERT as isize,
SQLITE_PRAGMA = ffi::SQLITE_PRAGMA as isize,
SQLITE_READ = ffi::SQLITE_READ as isize,
SQLITE_SELECT = ffi::SQLITE_SELECT as isize,
SQLITE_TRANSACTION = ffi::SQLITE_TRANSACTION as isize,
SQLITE_UPDATE = ffi::SQLITE_UPDATE as isize, SQLITE_UPDATE = ffi::SQLITE_UPDATE as isize,
SQLITE_ATTACH = ffi::SQLITE_ATTACH as isize,
SQLITE_DETACH = ffi::SQLITE_DETACH as isize,
SQLITE_ALTER_TABLE = ffi::SQLITE_ALTER_TABLE as isize,
SQLITE_REINDEX = ffi::SQLITE_REINDEX as isize,
SQLITE_ANALYZE = ffi::SQLITE_ANALYZE as isize,
SQLITE_CREATE_VTABLE = ffi::SQLITE_CREATE_VTABLE as isize,
SQLITE_DROP_VTABLE = ffi::SQLITE_DROP_VTABLE as isize,
SQLITE_FUNCTION = ffi::SQLITE_FUNCTION as isize,
SQLITE_SAVEPOINT = ffi::SQLITE_SAVEPOINT as isize,
SQLITE_COPY = ffi::SQLITE_COPY as isize,
SQLITE_RECURSIVE = 33,
} }
impl From<i32> for Action { impl From<i32> for Action {
fn from(code: i32) -> Action { fn from(code: i32) -> Action {
match code { match code {
ffi::SQLITE_CREATE_INDEX => Action::SQLITE_CREATE_INDEX,
ffi::SQLITE_CREATE_TABLE => Action::SQLITE_CREATE_TABLE,
ffi::SQLITE_CREATE_TEMP_INDEX => Action::SQLITE_CREATE_TEMP_INDEX,
ffi::SQLITE_CREATE_TEMP_TABLE => Action::SQLITE_CREATE_TEMP_TABLE,
ffi::SQLITE_CREATE_TEMP_TRIGGER => Action::SQLITE_CREATE_TEMP_TRIGGER,
ffi::SQLITE_CREATE_TEMP_VIEW => Action::SQLITE_CREATE_TEMP_VIEW,
ffi::SQLITE_CREATE_TRIGGER => Action::SQLITE_CREATE_TRIGGER,
ffi::SQLITE_CREATE_VIEW => Action::SQLITE_CREATE_VIEW,
ffi::SQLITE_DELETE => Action::SQLITE_DELETE, ffi::SQLITE_DELETE => Action::SQLITE_DELETE,
ffi::SQLITE_DROP_INDEX => Action::SQLITE_DROP_INDEX,
ffi::SQLITE_DROP_TABLE => Action::SQLITE_DROP_TABLE,
ffi::SQLITE_DROP_TEMP_INDEX => Action::SQLITE_DROP_TEMP_INDEX,
ffi::SQLITE_DROP_TEMP_TABLE => Action::SQLITE_DROP_TEMP_TABLE,
ffi::SQLITE_DROP_TEMP_TRIGGER => Action::SQLITE_DROP_TEMP_TRIGGER,
ffi::SQLITE_DROP_TEMP_VIEW => Action::SQLITE_DROP_TEMP_VIEW,
ffi::SQLITE_DROP_TRIGGER => Action::SQLITE_DROP_TRIGGER,
ffi::SQLITE_DROP_VIEW => Action::SQLITE_DROP_VIEW,
ffi::SQLITE_INSERT => Action::SQLITE_INSERT, ffi::SQLITE_INSERT => Action::SQLITE_INSERT,
ffi::SQLITE_PRAGMA => Action::SQLITE_PRAGMA,
ffi::SQLITE_READ => Action::SQLITE_READ,
ffi::SQLITE_SELECT => Action::SQLITE_SELECT,
ffi::SQLITE_TRANSACTION => Action::SQLITE_TRANSACTION,
ffi::SQLITE_UPDATE => Action::SQLITE_UPDATE, ffi::SQLITE_UPDATE => Action::SQLITE_UPDATE,
ffi::SQLITE_ATTACH => Action::SQLITE_ATTACH,
ffi::SQLITE_DETACH => Action::SQLITE_DETACH,
ffi::SQLITE_ALTER_TABLE => Action::SQLITE_ALTER_TABLE,
ffi::SQLITE_REINDEX => Action::SQLITE_REINDEX,
ffi::SQLITE_ANALYZE => Action::SQLITE_ANALYZE,
ffi::SQLITE_CREATE_VTABLE => Action::SQLITE_CREATE_VTABLE,
ffi::SQLITE_DROP_VTABLE => Action::SQLITE_DROP_VTABLE,
ffi::SQLITE_FUNCTION => Action::SQLITE_FUNCTION,
ffi::SQLITE_SAVEPOINT => Action::SQLITE_SAVEPOINT,
ffi::SQLITE_COPY => Action::SQLITE_COPY,
33 => Action::SQLITE_RECURSIVE,
_ => Action::UNKNOWN, _ => Action::UNKNOWN,
} }
} }

View File

@ -115,6 +115,7 @@ mod busy;
mod cache; mod cache;
#[cfg(any(feature = "functions", feature = "vtab"))] #[cfg(any(feature = "functions", feature = "vtab"))]
mod context; mod context;
#[macro_use]
mod error; mod error;
#[cfg(feature = "functions")] #[cfg(feature = "functions")]
pub mod functions; pub mod functions;
@ -126,6 +127,8 @@ pub mod limits;
mod load_extension_guard; mod load_extension_guard;
mod raw_statement; mod raw_statement;
mod row; mod row;
#[cfg(feature = "session")]
pub mod session;
mod statement; mod statement;
#[cfg(feature = "trace")] #[cfg(feature = "trace")]
pub mod trace; pub mod trace;
@ -193,7 +196,7 @@ pub enum DatabaseName<'a> {
// Currently DatabaseName is only used by the backup and blob mods, so hide // Currently DatabaseName is only used by the backup and blob mods, so hide
// this (private) impl to avoid dead code warnings. // this (private) impl to avoid dead code warnings.
#[cfg(any(feature = "backup", feature = "blob"))] #[cfg(any(feature = "backup", feature = "blob", feature = "session"))]
impl<'a> DatabaseName<'a> { impl<'a> DatabaseName<'a> {
fn to_cstring(&self) -> Result<CString> { fn to_cstring(&self) -> Result<CString> {
use self::DatabaseName::{Attached, Main, Temp}; use self::DatabaseName::{Attached, Main, Temp};

888
src/session.rs Normal file
View File

@ -0,0 +1,888 @@
//! [Session Extension](https://sqlite.org/sessionintro.html)
#![allow(non_camel_case_types)]
use std::ffi::CStr;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::mem;
use std::os::raw::{c_char, c_int, c_uchar, c_void};
use std::panic::{catch_unwind, RefUnwindSafe};
use std::ptr;
use std::slice::{from_raw_parts, from_raw_parts_mut};
use fallible_streaming_iterator::FallibleStreamingIterator;
use crate::error::error_from_sqlite_code;
use crate::ffi;
use crate::hooks::Action;
use crate::types::ValueRef;
use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result};
// https://sqlite.org/session.html
/// An instance of this object is a session that can be used to record changes
/// to a database.
pub struct Session<'conn> {
phantom: PhantomData<&'conn ()>,
s: *mut ffi::sqlite3_session,
filter: Option<Box<dyn Fn(&str) -> bool>>,
}
impl<'conn> Session<'conn> {
/// Create a new session object
pub fn new(db: &'conn Connection) -> Result<Session<'conn>> {
Session::new_with_name(db, DatabaseName::Main)
}
/// Create a new session object
pub fn new_with_name(db: &'conn Connection, name: DatabaseName<'_>) -> Result<Session<'conn>> {
let name = name.to_cstring()?;
let db = db.db.borrow_mut().db;
let mut s: *mut ffi::sqlite3_session = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) });
Ok(Session {
phantom: PhantomData,
s,
filter: None,
})
}
/// Set a table filter
pub fn table_filter<F>(&mut self, filter: Option<F>)
where
F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
{
unsafe extern "C" fn call_boxed_closure<F>(
p_arg: *mut c_void,
tbl_str: *const c_char,
) -> c_int
where
F: Fn(&str) -> bool + RefUnwindSafe,
{
use std::ffi::CStr;
use std::str;
let boxed_filter: *mut F = p_arg as *mut F;
let tbl_name = {
let c_slice = CStr::from_ptr(tbl_str).to_bytes();
str::from_utf8_unchecked(c_slice)
};
if let Ok(true) = catch_unwind(|| (*boxed_filter)(tbl_name)) {
1
} else {
0
}
}
match filter {
Some(filter) => {
let boxed_filter = Box::new(filter);
unsafe {
ffi::sqlite3session_table_filter(
self.s,
Some(call_boxed_closure::<F>),
&*boxed_filter as *const F as *mut _,
);
}
self.filter = Some(boxed_filter);
}
_ => {
unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) }
self.filter = None;
}
};
}
/// Attach a table. `None` means all tables.
pub fn attach(&mut self, table: Option<&str>) -> Result<()> {
let table = if let Some(table) = table {
str_to_cstring(table)?.as_ptr()
} else {
ptr::null()
};
unsafe { check!(ffi::sqlite3session_attach(self.s, table)) };
Ok(())
}
/// Generate a Changeset
pub fn changeset(&mut self) -> Result<Changeset> {
let mut n = 0;
let mut cs: *mut c_void = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) });
Ok(Changeset { cs, n })
}
/// Write the set of changes represented by this session to `output`.
pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
let output_ref = &output;
check!(unsafe {
ffi::sqlite3session_changeset_strm(
self.s,
Some(x_output),
output_ref as *const &mut dyn Write as *mut c_void,
)
});
Ok(())
}
/// Generate a Patchset
pub fn patchset(&mut self) -> Result<Changeset> {
let mut n = 0;
let mut ps: *mut c_void = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) });
// TODO Validate: same struct
Ok(Changeset { cs: ps, n })
}
/// Write the set of patches represented by this session to `output`.
pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> {
let output_ref = &output;
check!(unsafe {
ffi::sqlite3session_patchset_strm(
self.s,
Some(x_output),
output_ref as *const &mut dyn Write as *mut c_void,
)
});
Ok(())
}
/// Load the difference between tables.
pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> {
let from = from.to_cstring()?;
let table = str_to_cstring(table)?.as_ptr();
unsafe {
let mut errmsg: *mut c_char = mem::uninitialized();
let r = ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg);
if r != ffi::SQLITE_OK {
let message = errmsg_to_string(&*errmsg);
ffi::sqlite3_free(errmsg as *mut ::std::os::raw::c_void);
return Err(error_from_sqlite_code(r, Some(message)));
}
}
Ok(())
}
/// Test if a changeset has recorded any changes
pub fn is_empty(&self) -> bool {
unsafe { ffi::sqlite3session_isempty(self.s) != 0 }
}
/// Query the current state of the session
pub fn is_enabled(&self) -> bool {
unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 }
}
/// Enable or disable the recording of changes
pub fn set_enabled(&mut self, enabled: bool) {
unsafe {
ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 });
}
}
/// Query the current state of the indirect flag
pub fn is_indirect(&self) -> bool {
unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 }
}
/// Set or clear the indirect change flag
pub fn set_indirect(&mut self, indirect: bool) {
unsafe {
ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 });
}
}
}
impl<'conn> Drop for Session<'conn> {
fn drop(&mut self) {
if self.filter.is_some() {
self.table_filter(None::<fn(&str) -> bool>);
}
unsafe { ffi::sqlite3session_delete(self.s) };
}
}
/// Invert a changeset
pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> {
let input_ref = &input;
let output_ref = &output;
check!(unsafe {
ffi::sqlite3changeset_invert_strm(
Some(x_input),
input_ref as *const &mut dyn Read as *mut c_void,
Some(x_output),
output_ref as *const &mut dyn Write as *mut c_void,
)
});
Ok(())
}
/// Combine two changesets
pub fn concat_strm(
input_a: &mut dyn Read,
input_b: &mut dyn Read,
output: &mut dyn Write,
) -> Result<()> {
let input_a_ref = &input_a;
let input_b_ref = &input_b;
let output_ref = &output;
check!(unsafe {
ffi::sqlite3changeset_concat_strm(
Some(x_input),
input_a_ref as *const &mut dyn Read as *mut c_void,
Some(x_input),
input_b_ref as *const &mut dyn Read as *mut c_void,
Some(x_output),
output_ref as *const &mut dyn Write as *mut c_void,
)
});
Ok(())
}
/// Changeset or Patchset
pub struct Changeset {
cs: *mut c_void,
n: c_int,
}
impl Changeset {
/// Invert a changeset
pub fn invert(&self) -> Result<Changeset> {
let mut n = 0;
let mut cs: *mut c_void = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs) });
Ok(Changeset { cs, n })
}
/// Create an iterator to traverse a changeset
pub fn iter<'changeset>(&'changeset self) -> Result<ChangesetIter<'changeset>> {
let mut it: *mut ffi::sqlite3_changeset_iter = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3changeset_start(&mut it, self.n, self.cs) });
Ok(ChangesetIter {
phantom: PhantomData,
it,
item: None,
})
}
/// Concatenate two changeset objects
pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> {
let mut n = 0;
let mut cs: *mut c_void = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs) });
Ok(Changeset { cs, n })
}
}
impl Drop for Changeset {
fn drop(&mut self) {
unsafe {
ffi::sqlite3_free(self.cs);
}
}
}
/// Cursor for iterating over the elements of a changeset or patchset.
pub struct ChangesetIter<'changeset> {
phantom: PhantomData<&'changeset ()>,
it: *mut ffi::sqlite3_changeset_iter,
item: Option<ChangesetItem>,
}
impl<'changeset> ChangesetIter<'changeset> {
/// Create an iterator on `input`
pub fn start_strm<'input>(input: &'input mut dyn Read) -> Result<ChangesetIter<'input>> {
let input_ref = &input;
let mut it: *mut ffi::sqlite3_changeset_iter = unsafe { mem::uninitialized() };
check!(unsafe {
ffi::sqlite3changeset_start_strm(
&mut it,
Some(x_input),
input_ref as *const &mut dyn Read as *mut c_void,
)
});
Ok(ChangesetIter {
phantom: PhantomData,
it,
item: None,
})
}
}
impl<'changeset> FallibleStreamingIterator for ChangesetIter<'changeset> {
type Error = crate::error::Error;
type Item = ChangesetItem;
fn advance(&mut self) -> Result<()> {
let rc = unsafe { ffi::sqlite3changeset_next(self.it) };
match rc {
ffi::SQLITE_ROW => {
self.item = Some(ChangesetItem { it: self.it });
Ok(())
}
ffi::SQLITE_DONE => {
self.item = None;
Ok(())
}
code => Err(error_from_sqlite_code(code, None)),
}
}
fn get(&self) -> Option<&ChangesetItem> {
self.item.as_ref()
}
}
pub struct Operation<'item> {
table_name: &'item str,
number_of_columns: i32,
code: Action,
indirect: bool,
}
impl<'item> Operation<'item> {
pub fn table_name(&self) -> &str {
self.table_name
}
pub fn number_of_columns(&self) -> i32 {
self.number_of_columns
}
pub fn code(&self) -> Action {
self.code
}
pub fn indirect(&self) -> bool {
self.indirect
}
}
impl<'changeset> Drop for ChangesetIter<'changeset> {
fn drop(&mut self) {
unsafe {
ffi::sqlite3changeset_finalize(self.it);
}
}
}
/// An item passed to a conflict-handler by `Connection::apply`,
/// or an item generated by `ChangesetIter::next`.
// TODO enum ? Delete, Insert, Update, ...
pub struct ChangesetItem {
it: *mut ffi::sqlite3_changeset_iter,
}
impl ChangesetItem {
/// Obtain conflicting row values
///
/// May only be called with an `SQLITE_CHANGESET_DATA` or
/// `SQLITE_CHANGESET_CONFLICT` conflict handler callback.
pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> {
unsafe {
let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized();
check!(ffi::sqlite3changeset_conflict(
self.it,
col as i32,
&mut p_value
));
Ok(ValueRef::from_value(p_value))
}
}
/// Determine the number of foreign key constraint violations
///
/// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict
/// handler callback.
pub fn fk_conflicts(&self) -> Result<i32> {
unsafe {
let mut p_out = 0;
check!(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out));
Ok(p_out)
}
}
/// Obtain new.* Values
///
/// May only be called if the type of change is either `SQLITE_UPDATE` or
/// `SQLITE_INSERT`.
pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> {
unsafe {
let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized();
check!(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value));
Ok(ValueRef::from_value(p_value))
}
}
/// Obtain old.* Values
///
/// May only be called if the type of change is either `SQLITE_DELETE` or
/// `SQLITE_UPDATE`.
pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> {
unsafe {
let mut p_value: *mut ffi::sqlite3_value = mem::uninitialized();
check!(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value));
Ok(ValueRef::from_value(p_value))
}
}
/// Obtain the current operation
pub fn op(&self) -> Result<Operation<'_>> {
let mut number_of_columns = 0;
let mut code = 0;
let mut indirect = 0;
let tab = unsafe {
let mut pz_tab: *const c_char = mem::uninitialized();
check!(ffi::sqlite3changeset_op(
self.it,
&mut pz_tab,
&mut number_of_columns,
&mut code,
&mut indirect
));
CStr::from_ptr(pz_tab)
};
let table_name = tab.to_str()?;
Ok(Operation {
table_name,
number_of_columns,
code: Action::from(code),
indirect: indirect != 0,
})
}
/// Obtain the primary key definition of a table
pub fn pk(&self) -> Result<&[u8]> {
let mut number_of_columns = 0;
unsafe {
let mut pks: *mut c_uchar = mem::uninitialized();
check!(ffi::sqlite3changeset_pk(
self.it,
&mut pks,
&mut number_of_columns
));
Ok(from_raw_parts(pks, number_of_columns as usize))
}
}
}
/// Used to combine two or more changesets or
/// patchsets
pub struct Changegroup {
cg: *mut ffi::sqlite3_changegroup,
}
impl Changegroup {
pub fn new() -> Result<Self> {
let mut cg: *mut ffi::sqlite3_changegroup = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3changegroup_new(&mut cg) });
Ok(Changegroup { cg })
}
/// Add a changeset
pub fn add(&mut self, cs: &Changeset) -> Result<()> {
check!(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) });
Ok(())
}
/// Add a changeset read from `input` to this change group.
pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> {
let input_ref = &input;
check!(unsafe {
ffi::sqlite3changegroup_add_strm(
self.cg,
Some(x_input),
input_ref as *const &mut dyn Read as *mut c_void,
)
});
Ok(())
}
/// Obtain a composite Changeset
pub fn output(&mut self) -> Result<Changeset> {
let mut n = 0;
let mut output: *mut c_void = unsafe { mem::uninitialized() };
check!(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) });
Ok(Changeset { cs: output, n })
}
/// Write the combined set of changes to `output`.
pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> {
let output_ref = &output;
check!(unsafe {
ffi::sqlite3changegroup_output_strm(
self.cg,
Some(x_output),
output_ref as *const &mut dyn Write as *mut c_void,
)
});
Ok(())
}
}
impl Drop for Changegroup {
fn drop(&mut self) {
unsafe {
ffi::sqlite3changegroup_delete(self.cg);
}
}
}
impl Connection {
/// Apply a changeset to a database
pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()>
where
F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
{
let db = self.db.borrow_mut().db;
let filtered = filter.is_some();
let tuple = &mut (filter, conflict);
check!(unsafe {
if filtered {
ffi::sqlite3changeset_apply(
db,
cs.n,
cs.cs,
Some(call_filter::<F, C>),
Some(call_conflict::<F, C>),
tuple as *mut (Option<F>, C) as *mut c_void,
)
} else {
ffi::sqlite3changeset_apply(
db,
cs.n,
cs.cs,
None,
Some(call_conflict::<F, C>),
tuple as *mut (Option<F>, C) as *mut c_void,
)
}
});
Ok(())
}
/// Apply a changeset to a database
pub fn apply_strm<F, C>(
&self,
input: &mut dyn Read,
filter: Option<F>,
conflict: C,
) -> Result<()>
where
F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
{
let input_ref = &input;
let db = self.db.borrow_mut().db;
let filtered = filter.is_some();
let tuple = &mut (filter, conflict);
check!(unsafe {
if filtered {
ffi::sqlite3changeset_apply_strm(
db,
Some(x_input),
input_ref as *const &mut dyn Read as *mut c_void,
Some(call_filter::<F, C>),
Some(call_conflict::<F, C>),
tuple as *mut (Option<F>, C) as *mut c_void,
)
} else {
ffi::sqlite3changeset_apply_strm(
db,
Some(x_input),
input_ref as *const &mut dyn Read as *mut c_void,
None,
Some(call_conflict::<F, C>),
tuple as *mut (Option<F>, C) as *mut c_void,
)
}
});
Ok(())
}
}
/// Constants passed to the conflict handler
#[derive(Debug, PartialEq)]
pub enum ConflictType {
UNKNOWN = -1,
SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA as isize,
SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND as isize,
SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT as isize,
SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT as isize,
SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY as isize,
}
impl From<i32> for ConflictType {
fn from(code: i32) -> ConflictType {
match code {
ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA,
ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND,
ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT,
ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT,
ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY,
_ => ConflictType::UNKNOWN,
}
}
}
/// Constants returned by the conflict handler
#[derive(Debug, PartialEq)]
pub enum ConflictAction {
SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT as isize,
SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE as isize,
SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT as isize,
}
unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int
where
F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
{
use std::ffi::CStr;
use std::str;
let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
let tbl_name = {
let c_slice = CStr::from_ptr(tbl_str).to_bytes();
str::from_utf8_unchecked(c_slice)
};
match *tuple {
(Some(ref filter), _) => {
if let Ok(true) = catch_unwind(|| filter(tbl_name)) {
1
} else {
0
}
}
_ => unimplemented!(),
}
}
unsafe extern "C" fn call_conflict<F, C>(
p_ctx: *mut c_void,
e_conflict: c_int,
p: *mut ffi::sqlite3_changeset_iter,
) -> c_int
where
F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static,
C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static,
{
let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C);
let conflict_type = ConflictType::from(e_conflict);
let item = ChangesetItem { it: p };
if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) {
action as c_int
} else {
ffi::SQLITE_CHANGESET_ABORT
}
}
unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int {
if p_in.is_null() {
return ffi::SQLITE_MISUSE;
}
let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, len as usize);
let input = p_in as *mut &mut dyn Read;
match (*input).read(bytes) {
Ok(n) => {
*len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to
// produce bytes.
ffi::SQLITE_OK
}
Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate
}
}
unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int {
if p_out.is_null() {
return ffi::SQLITE_MISUSE;
}
// The sessions module never invokes an xOutput callback with the third
// parameter set to a value less than or equal to zero.
let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize);
let output = p_out as *mut &mut dyn Write;
match (*output).write_all(bytes) {
Ok(_) => ffi::SQLITE_OK,
Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate
}
}
#[cfg(test)]
mod test {
use fallible_streaming_iterator::FallibleStreamingIterator;
use std::sync::atomic::{AtomicBool, Ordering};
use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
use crate::hooks::Action;
use crate::Connection;
fn one_changeset() -> Changeset {
let db = Connection::open_in_memory().unwrap();
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
let mut session = Session::new(&db).unwrap();
assert!(session.is_empty());
session.attach(None).unwrap();
db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
.unwrap();
session.changeset().unwrap()
}
fn one_changeset_strm() -> Vec<u8> {
let db = Connection::open_in_memory().unwrap();
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
let mut session = Session::new(&db).unwrap();
assert!(session.is_empty());
session.attach(None).unwrap();
db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
.unwrap();
let mut output = Vec::new();
session.changeset_strm(&mut output).unwrap();
output
}
#[test]
fn test_changeset() {
let changeset = one_changeset();
let mut iter = changeset.iter().unwrap();
let item = iter.next().unwrap();
assert!(item.is_some());
let item = item.unwrap();
let op = item.op().unwrap();
assert_eq!("foo", op.table_name());
assert_eq!(1, op.number_of_columns());
assert_eq!(Action::SQLITE_INSERT, op.code());
assert_eq!(false, op.indirect());
let pk = item.pk().unwrap();
assert_eq!(&[1], pk);
let new_value = item.new_value(0).unwrap();
assert_eq!(Ok("bar"), new_value.as_str());
}
#[test]
fn test_changeset_strm() {
let output = one_changeset_strm();
assert!(!output.is_empty());
assert_eq!(14, output.len());
let mut input = output.as_slice();
let mut iter = ChangesetIter::start_strm(&mut input).unwrap();
let item = iter.next().unwrap();
assert!(item.is_some());
}
#[test]
fn test_changeset_apply() {
let changeset = one_changeset();
let db = Connection::open_in_memory().unwrap();
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
lazy_static! {
static ref called: AtomicBool = AtomicBool::new(false);
}
db.apply(
&changeset,
None::<fn(&str) -> bool>,
|_conflict_type, _item| {
called.store(true, Ordering::Relaxed);
ConflictAction::SQLITE_CHANGESET_OMIT
},
)
.unwrap();
assert!(!called.load(Ordering::Relaxed));
let check = db
.query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| row.get(0))
.unwrap();
assert_eq!(1, check);
// conflict expected when same changeset applied again on the same db
db.apply(
&changeset,
None::<fn(&str) -> bool>,
|conflict_type, item| {
called.store(true, Ordering::Relaxed);
assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type);
let conflict = item.conflict(0).unwrap();
assert_eq!(Ok("bar"), conflict.as_str());
ConflictAction::SQLITE_CHANGESET_OMIT
},
)
.unwrap();
assert!(called.load(Ordering::Relaxed));
}
#[test]
fn test_changeset_apply_strm() {
let output = one_changeset_strm();
let db = Connection::open_in_memory().unwrap();
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
db.apply_strm(
&mut output.as_slice(),
None::<fn(&str) -> bool>,
|_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT,
)
.unwrap();
let check = db
.query_row("SELECT 1 FROM foo WHERE t = ?", &["bar"], |row| row.get(0))
.unwrap();
assert_eq!(1, check);
}
#[test]
fn test_session_empty() {
let db = Connection::open_in_memory().unwrap();
db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")
.unwrap();
let mut session = Session::new(&db).unwrap();
assert!(session.is_empty());
session.attach(None).unwrap();
db.execute("INSERT INTO foo (t) VALUES (?);", &["bar"])
.unwrap();
assert!(!session.is_empty());
}
#[test]
fn test_session_set_enabled() {
let db = Connection::open_in_memory().unwrap();
let mut session = Session::new(&db).unwrap();
assert!(session.is_enabled());
session.set_enabled(false);
assert!(!session.is_enabled());
}
#[test]
fn test_session_set_indirect() {
let db = Connection::open_in_memory().unwrap();
let mut session = Session::new(&db).unwrap();
assert!(!session.is_indirect());
session.set_indirect(true);
assert!(session.is_indirect());
}
}

View File

@ -22,6 +22,18 @@ pub enum FromSqlError {
Other(Box<dyn Error + Send + Sync>), Other(Box<dyn Error + Send + Sync>),
} }
impl PartialEq for FromSqlError {
fn eq(&self, other: &FromSqlError) -> bool {
match (self, other) {
(FromSqlError::InvalidType, FromSqlError::InvalidType) => true,
(FromSqlError::OutOfRange(n1), FromSqlError::OutOfRange(n2)) => n1 == n2,
#[cfg(feature = "i128_blob")]
(FromSqlError::InvalidI128Size(s1), FromSqlError::InvalidI128Size(s2)) => s1 == s2,
(_, _) => false,
}
}
}
impl fmt::Display for FromSqlError { impl fmt::Display for FromSqlError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {

View File

@ -104,3 +104,56 @@ impl<'a> From<&'a Value> for ValueRef<'a> {
} }
} }
} }
#[cfg(any(feature = "functions", feature = "session", feature = "vtab"))]
impl<'a> ValueRef<'a> {
pub(crate) unsafe fn from_value(value: *mut crate::ffi::sqlite3_value) -> ValueRef<'a> {
use crate::ffi;
use std::ffi::CStr;
use std::os::raw::c_char;
use std::slice::from_raw_parts;
match ffi::sqlite3_value_type(value) {
ffi::SQLITE_NULL => ValueRef::Null,
ffi::SQLITE_INTEGER => ValueRef::Integer(ffi::sqlite3_value_int64(value)),
ffi::SQLITE_FLOAT => ValueRef::Real(ffi::sqlite3_value_double(value)),
ffi::SQLITE_TEXT => {
let text = ffi::sqlite3_value_text(value);
assert!(
!text.is_null(),
"unexpected SQLITE_TEXT value type with NULL data"
);
let s = CStr::from_ptr(text as *const c_char);
// sqlite3_value_text returns UTF8 data, so our unwrap here should be fine.
let s = s
.to_str()
.expect("sqlite3_value_text returned invalid UTF-8");
ValueRef::Text(s)
}
ffi::SQLITE_BLOB => {
let (blob, len) = (
ffi::sqlite3_value_blob(value),
ffi::sqlite3_value_bytes(value),
);
assert!(
len >= 0,
"unexpected negative return from sqlite3_value_bytes"
);
if len > 0 {
assert!(
!blob.is_null(),
"unexpected SQLITE_BLOB value type with NULL data"
);
ValueRef::Blob(from_raw_parts(blob as *const u8, len as usize))
} else {
// The return value from sqlite3_value_blob() for a zero-length BLOB
// is a NULL pointer.
ValueRef::Blob(&[])
}
}
_ => unreachable!("sqlite3_value_type returned invalid value"),
}
}
}