Make Rows implement FallibleStreamingIterator

This commit is contained in:
gwenn 2019-02-03 14:01:42 +01:00
parent df493bb217
commit d93dec52c0
7 changed files with 84 additions and 74 deletions

View File

@ -47,7 +47,7 @@ csvtab = ["csv", "vtab"]
# pointer passing interfaces: 3.20.0 # pointer passing interfaces: 3.20.0
array = ["vtab"] array = ["vtab"]
# session extension: 3.13.0 # session extension: 3.13.0
session = ["libsqlite3-sys/session", "hooks", "fallible-streaming-iterator"] session = ["libsqlite3-sys/session", "hooks"]
[dependencies] [dependencies]
time = "0.1.0" time = "0.1.0"
@ -58,7 +58,7 @@ serde_json = { version = "1.0", optional = true }
csv = { version = "1.0", optional = true } csv = { version = "1.0", optional = true }
lazy_static = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true }
byteorder = { version = "1.2", features = ["i128"], optional = true } byteorder = { version = "1.2", features = ["i128"], optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true } fallible-streaming-iterator = "0.1"
[dev-dependencies] [dev-dependencies]
tempdir = "0.3" tempdir = "0.3"

View File

@ -80,6 +80,8 @@ use std::str;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub use fallible_streaming_iterator::FallibleStreamingIterator;
use crate::cache::StatementCache; use crate::cache::StatementCache;
use crate::inner_connection::{InnerConnection, BYPASS_SQLITE_INIT}; use crate::inner_connection::{InnerConnection, BYPASS_SQLITE_INIT};
use crate::raw_statement::RawStatement; use crate::raw_statement::RawStatement;
@ -89,7 +91,7 @@ pub use crate::cache::CachedStatement;
pub use crate::error::Error; pub use crate::error::Error;
pub use crate::ffi::ErrorCode; pub use crate::ffi::ErrorCode;
#[cfg(feature = "hooks")] #[cfg(feature = "hooks")]
pub use crate::hooks::*; pub use crate::hooks::Action;
#[cfg(feature = "load_extension")] #[cfg(feature = "load_extension")]
pub use crate::load_extension_guard::LoadExtensionGuard; pub use crate::load_extension_guard::LoadExtensionGuard;
pub use crate::row::{AndThenRows, MappedRows, Row, RowIndex, Rows}; pub use crate::row::{AndThenRows, MappedRows, Row, RowIndex, Rows};
@ -474,7 +476,7 @@ impl Connection {
where where
P: IntoIterator, P: IntoIterator,
P::Item: ToSql, P::Item: ToSql,
F: FnOnce(&Row<'_, '_>) -> T, F: FnOnce(&Row<'_>) -> T,
{ {
let mut stmt = self.prepare(sql)?; let mut stmt = self.prepare(sql)?;
stmt.query_row(params, f) stmt.query_row(params, f)
@ -496,7 +498,7 @@ impl Connection {
/// or if the underlying SQLite call fails. /// or if the underlying SQLite call fails.
pub fn query_row_named<T, F>(&self, sql: &str, params: &[(&str, &dyn ToSql)], f: F) -> Result<T> pub fn query_row_named<T, F>(&self, sql: &str, params: &[(&str, &dyn ToSql)], f: F) -> Result<T>
where where
F: FnOnce(&Row<'_, '_>) -> T, F: FnOnce(&Row<'_>) -> T,
{ {
let mut stmt = self.prepare(sql)?; let mut stmt = self.prepare(sql)?;
let mut rows = stmt.query_named(params)?; let mut rows = stmt.query_named(params)?;
@ -533,7 +535,7 @@ impl Connection {
where where
P: IntoIterator, P: IntoIterator,
P::Item: ToSql, P::Item: ToSql,
F: FnOnce(&Row<'_, '_>) -> result::Result<T, E>, F: FnOnce(&Row<'_>) -> result::Result<T, E>,
E: convert::From<Error>, E: convert::From<Error>,
{ {
let mut stmt = self.prepare(sql)?; let mut stmt = self.prepare(sql)?;
@ -1061,8 +1063,8 @@ mod test {
let mut rows = query.query(&[4i32]).unwrap(); let mut rows = query.query(&[4i32]).unwrap();
let mut v = Vec::<i32>::new(); let mut v = Vec::<i32>::new();
while let Some(row) = rows.next() { while let Some(row) = rows.next().unwrap() {
v.push(row.unwrap().get(0)); v.push(row.get(0));
} }
assert_eq!(v, [3i32, 2, 1]); assert_eq!(v, [3i32, 2, 1]);
@ -1072,8 +1074,8 @@ mod test {
let mut rows = query.query(&[3i32]).unwrap(); let mut rows = query.query(&[3i32]).unwrap();
let mut v = Vec::<i32>::new(); let mut v = Vec::<i32>::new();
while let Some(row) = rows.next() { while let Some(row) = rows.next().unwrap() {
v.push(row.unwrap().get(0)); v.push(row.get(0));
} }
assert_eq!(v, [2i32, 1]); assert_eq!(v, [2i32, 1]);
@ -1215,7 +1217,7 @@ mod test {
{ {
let mut rows = stmt.query(NO_PARAMS).unwrap(); let mut rows = stmt.query(NO_PARAMS).unwrap();
assert!(!db.is_busy()); assert!(!db.is_busy());
let row = rows.next(); let row = rows.next().unwrap();
assert!(db.is_busy()); assert!(db.is_busy());
assert!(row.is_some()); assert!(row.is_some());
} }
@ -1327,8 +1329,7 @@ mod test {
let mut query = db.prepare("SELECT i, x FROM foo").unwrap(); let mut query = db.prepare("SELECT i, x FROM foo").unwrap();
let mut rows = query.query(NO_PARAMS).unwrap(); let mut rows = query.query(NO_PARAMS).unwrap();
while let Some(res) = rows.next() { while let Some(row) = rows.next().unwrap() {
let row = res.unwrap();
let i = row.get_raw(0).as_i64().unwrap(); let i = row.get_raw(0).as_i64().unwrap();
let expect = vals[i as usize]; let expect = vals[i as usize];
let x = row.get_raw("x").as_str().unwrap(); let x = row.get_raw("x").as_str().unwrap();

View File

@ -1,12 +1,12 @@
use std::marker::PhantomData;
use std::{convert, result}; use std::{convert, result};
use super::{Error, Result, Statement}; use super::{Error, FallibleStreamingIterator, Result, Statement};
use crate::types::{FromSql, FromSqlError, ValueRef}; use crate::types::{FromSql, FromSqlError, ValueRef};
/// An handle for the resulting rows of a query. /// An handle for the resulting rows of a query.
pub struct Rows<'stmt> { pub struct Rows<'stmt> {
stmt: Option<&'stmt Statement<'stmt>>, stmt: Option<&'stmt Statement<'stmt>>,
row: Option<Row<'stmt>>,
} }
impl<'stmt> Rows<'stmt> { impl<'stmt> Rows<'stmt> {
@ -15,45 +15,19 @@ impl<'stmt> Rows<'stmt> {
stmt.reset(); stmt.reset();
} }
} }
/// Attempt to get the next row from the query. Returns `Some(Ok(Row))` if
/// there is another row, `Some(Err(...))` if there was an error
/// getting the next row, and `None` if all rows have been retrieved.
///
/// ## Note
///
/// This interface is not compatible with Rust's `Iterator` trait, because
/// the lifetime of the returned row is tied to the lifetime of `self`.
/// This is a "streaming iterator". For a more natural interface,
/// consider using `query_map` or `query_and_then` instead, which
/// return types that implement `Iterator`.
#[allow(clippy::should_implement_trait)] // cannot implement Iterator
pub fn next<'a>(&'a mut self) -> Option<Result<Row<'a, 'stmt>>> {
self.stmt.and_then(|stmt| match stmt.step() {
Ok(true) => Some(Ok(Row {
stmt,
phantom: PhantomData,
})),
Ok(false) => {
self.reset();
None
}
Err(err) => {
self.reset();
Some(Err(err))
}
})
}
} }
impl<'stmt> Rows<'stmt> { impl<'stmt> Rows<'stmt> {
pub(crate) fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt> { pub(crate) fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt> {
Rows { stmt: Some(stmt) } Rows {
stmt: Some(stmt),
row: None,
}
} }
pub(crate) fn get_expected_row<'a>(&'a mut self) -> Result<Row<'a, 'stmt>> { pub(crate) fn get_expected_row(&mut self) -> Result<&Row<'stmt>> {
match self.next() { match self.next()? {
Some(row) => row, Some(row) => Ok(row),
None => Err(Error::QueryReturnedNoRows), None => Err(Error::QueryReturnedNoRows),
} }
} }
@ -73,7 +47,7 @@ pub struct MappedRows<'stmt, F> {
impl<'stmt, T, F> MappedRows<'stmt, F> impl<'stmt, T, F> MappedRows<'stmt, F>
where where
F: FnMut(&Row<'_, '_>) -> T, F: FnMut(&Row<'_>) -> T,
{ {
pub(crate) fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F> { pub(crate) fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F> {
MappedRows { rows, map: f } MappedRows { rows, map: f }
@ -82,7 +56,7 @@ where
impl<T, F> Iterator for MappedRows<'_, F> impl<T, F> Iterator for MappedRows<'_, F>
where where
F: FnMut(&Row<'_, '_>) -> T, F: FnMut(&Row<'_>) -> T,
{ {
type Item = Result<T>; type Item = Result<T>;
@ -90,6 +64,7 @@ where
let map = &mut self.map; let map = &mut self.map;
self.rows self.rows
.next() .next()
.transpose()
.map(|row_result| row_result.map(|row| (map)(&row))) .map(|row_result| row_result.map(|row| (map)(&row)))
} }
} }
@ -103,7 +78,7 @@ pub struct AndThenRows<'stmt, F> {
impl<'stmt, T, E, F> AndThenRows<'stmt, F> impl<'stmt, T, E, F> AndThenRows<'stmt, F>
where where
F: FnMut(&Row<'_, '_>) -> result::Result<T, E>, F: FnMut(&Row<'_>) -> result::Result<T, E>,
{ {
pub(crate) fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F> { pub(crate) fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F> {
AndThenRows { rows, map: f } AndThenRows { rows, map: f }
@ -113,7 +88,7 @@ where
impl<T, E, F> Iterator for AndThenRows<'_, F> impl<T, E, F> Iterator for AndThenRows<'_, F>
where where
E: convert::From<Error>, E: convert::From<Error>,
F: FnMut(&Row<'_, '_>) -> result::Result<T, E>, F: FnMut(&Row<'_>) -> result::Result<T, E>,
{ {
type Item = result::Result<T, E>; type Item = result::Result<T, E>;
@ -121,17 +96,51 @@ where
let map = &mut self.map; let map = &mut self.map;
self.rows self.rows
.next() .next()
.transpose()
.map(|row_result| row_result.map_err(E::from).and_then(|row| (map)(&row))) .map(|row_result| row_result.map_err(E::from).and_then(|row| (map)(&row)))
} }
} }
/// A single result row of a query. impl<'stmt> FallibleStreamingIterator for Rows<'stmt> {
pub struct Row<'a, 'stmt: 'a> { type Error = Error;
stmt: &'stmt Statement<'stmt>, type Item = Row<'stmt>;
phantom: PhantomData<&'a ()>,
fn advance(&mut self) -> Result<()> {
match self.stmt {
Some(ref stmt) => match stmt.step() {
Ok(true) => {
self.row = Some(Row { stmt });
Ok(())
}
Ok(false) => {
self.reset();
self.row = None;
Ok(())
}
Err(e) => {
self.reset();
self.row = None;
Err(e)
}
},
None => {
self.row = None;
Ok(())
}
}
}
fn get(&self) -> Option<&Row<'stmt>> {
self.row.as_ref()
}
} }
impl<'a, 'stmt> Row<'a, 'stmt> { /// A single result row of a query.
pub struct Row<'stmt> {
stmt: &'stmt Statement<'stmt>,
}
impl<'stmt> Row<'stmt> {
/// Get the value of a particular column of the result row. /// Get the value of a particular column of the result row.
/// ///
/// ## Failure /// ## Failure
@ -193,7 +202,7 @@ impl<'a, 'stmt> Row<'a, 'stmt> {
/// ///
/// Returns an `Error::InvalidColumnName` if `idx` is not a valid column /// Returns an `Error::InvalidColumnName` if `idx` is not a valid column
/// name for this row. /// name for this row.
pub fn get_raw_checked<I: RowIndex>(&self, idx: I) -> Result<ValueRef<'a>> { pub fn get_raw_checked<I: RowIndex>(&self, idx: I) -> Result<ValueRef<'_>> {
let idx = idx.idx(self.stmt)?; let idx = idx.idx(self.stmt)?;
// Narrowing from `ValueRef<'stmt>` (which `self.stmt.value_ref(idx)` // Narrowing from `ValueRef<'stmt>` (which `self.stmt.value_ref(idx)`
// returns) to `ValueRef<'a>` is needed because it's only valid until // returns) to `ValueRef<'a>` is needed because it's only valid until
@ -217,7 +226,7 @@ impl<'a, 'stmt> Row<'a, 'stmt> {
/// ///
/// * If `idx` is outside the range of columns in the returned query. /// * If `idx` is outside the range of columns in the returned query.
/// * If `idx` is not a valid column name for this row. /// * If `idx` is not a valid column name for this row.
pub fn get_raw<I: RowIndex>(&self, idx: I) -> ValueRef<'a> { pub fn get_raw<I: RowIndex>(&self, idx: I) -> ValueRef<'_> {
self.get_raw_checked(idx).unwrap() self.get_raw_checked(idx).unwrap()
} }

View File

@ -10,13 +10,13 @@ use std::panic::{catch_unwind, RefUnwindSafe};
use std::ptr; use std::ptr;
use std::slice::{from_raw_parts, from_raw_parts_mut}; use std::slice::{from_raw_parts, from_raw_parts_mut};
use fallible_streaming_iterator::FallibleStreamingIterator;
use crate::error::error_from_sqlite_code; use crate::error::error_from_sqlite_code;
use crate::ffi; use crate::ffi;
use crate::hooks::Action; use crate::hooks::Action;
use crate::types::ValueRef; use crate::types::ValueRef;
use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result}; use crate::{
errmsg_to_string, str_to_cstring, Connection, DatabaseName, FallibleStreamingIterator, Result,
};
// https://sqlite.org/session.html // https://sqlite.org/session.html
@ -719,12 +719,11 @@ unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_in
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use fallible_streaming_iterator::FallibleStreamingIterator;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session}; use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session};
use crate::hooks::Action; use crate::hooks::Action;
use crate::Connection; use crate::{Connection, FallibleStreamingIterator};
fn one_changeset() -> Changeset { fn one_changeset() -> Changeset {
let db = Connection::open_in_memory().unwrap(); let db = Connection::open_in_memory().unwrap();

View File

@ -9,7 +9,8 @@ use std::{convert, fmt, mem, ptr, result, str};
use super::ffi; use super::ffi;
use super::str_to_cstring; use super::str_to_cstring;
use super::{ use super::{
AndThenRows, Connection, Error, MappedRows, RawStatement, Result, Row, Rows, ValueRef, AndThenRows, Connection, Error, FallibleStreamingIterator, MappedRows, RawStatement, Result,
Row, Rows, ValueRef,
}; };
use crate::types::{ToSql, ToSqlOutput}; use crate::types::{ToSql, ToSqlOutput};
#[cfg(feature = "array")] #[cfg(feature = "array")]
@ -267,7 +268,7 @@ impl Statement<'_> {
where where
P: IntoIterator, P: IntoIterator,
P::Item: ToSql, P::Item: ToSql,
F: FnMut(&Row<'_, '_>) -> T, F: FnMut(&Row<'_>) -> T,
{ {
let rows = self.query(params)?; let rows = self.query(params)?;
Ok(MappedRows::new(rows, f)) Ok(MappedRows::new(rows, f))
@ -306,7 +307,7 @@ impl Statement<'_> {
f: F, f: F,
) -> Result<MappedRows<'_, F>> ) -> Result<MappedRows<'_, F>>
where where
F: FnMut(&Row<'_, '_>) -> T, F: FnMut(&Row<'_>) -> T,
{ {
let rows = self.query_named(params)?; let rows = self.query_named(params)?;
Ok(MappedRows::new(rows, f)) Ok(MappedRows::new(rows, f))
@ -324,7 +325,7 @@ impl Statement<'_> {
P: IntoIterator, P: IntoIterator,
P::Item: ToSql, P::Item: ToSql,
E: convert::From<Error>, E: convert::From<Error>,
F: FnMut(&Row<'_, '_>) -> result::Result<T, E>, F: FnMut(&Row<'_>) -> result::Result<T, E>,
{ {
let rows = self.query(params)?; let rows = self.query(params)?;
Ok(AndThenRows::new(rows, f)) Ok(AndThenRows::new(rows, f))
@ -375,7 +376,7 @@ impl Statement<'_> {
) -> Result<AndThenRows<'_, F>> ) -> Result<AndThenRows<'_, F>>
where where
E: convert::From<Error>, E: convert::From<Error>,
F: FnMut(&Row<'_, '_>) -> result::Result<T, E>, F: FnMut(&Row<'_>) -> result::Result<T, E>,
{ {
let rows = self.query_named(params)?; let rows = self.query_named(params)?;
Ok(AndThenRows::new(rows, f)) Ok(AndThenRows::new(rows, f))
@ -389,7 +390,7 @@ impl Statement<'_> {
P::Item: ToSql, P::Item: ToSql,
{ {
let mut rows = self.query(params)?; let mut rows = self.query(params)?;
let exists = rows.next().is_some(); let exists = rows.next()?.is_some();
Ok(exists) Ok(exists)
} }
@ -410,7 +411,7 @@ impl Statement<'_> {
where where
P: IntoIterator, P: IntoIterator,
P::Item: ToSql, P::Item: ToSql,
F: FnOnce(&Row<'_, '_>) -> T, F: FnOnce(&Row<'_>) -> T,
{ {
let mut rows = self.query(params)?; let mut rows = self.query(params)?;
@ -732,7 +733,7 @@ pub enum StatementStatus {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::types::ToSql; use crate::types::ToSql;
use crate::{Connection, Error, Result, NO_PARAMS}; use crate::{Connection, Error, FallibleStreamingIterator, Result, NO_PARAMS};
#[test] #[test]
fn test_execute_named() { fn test_execute_named() {

View File

@ -110,7 +110,7 @@ mod test {
use time; use time;
use super::Value; use super::Value;
use crate::{Connection, Error, NO_PARAMS}; use crate::{Connection, Error, FallibleStreamingIterator, NO_PARAMS};
use std::f64::EPSILON; use std::f64::EPSILON;
use std::os::raw::{c_double, c_int}; use std::os::raw::{c_double, c_int};

View File

@ -346,7 +346,7 @@ impl From<csv::Error> for Error {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::vtab::csvtab; use crate::vtab::csvtab;
use crate::{Connection, Result, NO_PARAMS}; use crate::{Connection, FallibleStreamingIterator, Result, NO_PARAMS};
#[test] #[test]
fn test_csv_module() { fn test_csv_module() {