From d93dec52c0bb7ab4826abd6b544997b6b46207c0 Mon Sep 17 00:00:00 2001 From: gwenn Date: Sun, 3 Feb 2019 14:01:42 +0100 Subject: [PATCH] Make `Rows` implement `FallibleStreamingIterator` --- Cargo.toml | 4 +- src/lib.rs | 23 ++++++----- src/row.rs | 101 ++++++++++++++++++++++++--------------------- src/session.rs | 9 ++-- src/statement.rs | 17 ++++---- src/types/mod.rs | 2 +- src/vtab/csvtab.rs | 2 +- 7 files changed, 84 insertions(+), 74 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb288ef..293779b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ csvtab = ["csv", "vtab"] # pointer passing interfaces: 3.20.0 array = ["vtab"] # session extension: 3.13.0 -session = ["libsqlite3-sys/session", "hooks", "fallible-streaming-iterator"] +session = ["libsqlite3-sys/session", "hooks"] [dependencies] time = "0.1.0" @@ -58,7 +58,7 @@ serde_json = { version = "1.0", optional = true } csv = { version = "1.0", optional = true } lazy_static = { version = "1.0", optional = true } byteorder = { version = "1.2", features = ["i128"], optional = true } -fallible-streaming-iterator = { version = "0.1", optional = true } +fallible-streaming-iterator = "0.1" [dev-dependencies] tempdir = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 76ea093..6a72e12 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,8 @@ use std::str; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; +pub use fallible_streaming_iterator::FallibleStreamingIterator; + use crate::cache::StatementCache; use crate::inner_connection::{InnerConnection, BYPASS_SQLITE_INIT}; use crate::raw_statement::RawStatement; @@ -89,7 +91,7 @@ pub use crate::cache::CachedStatement; pub use crate::error::Error; pub use crate::ffi::ErrorCode; #[cfg(feature = "hooks")] -pub use crate::hooks::*; +pub use crate::hooks::Action; #[cfg(feature = "load_extension")] pub use crate::load_extension_guard::LoadExtensionGuard; pub use crate::row::{AndThenRows, MappedRows, Row, RowIndex, Rows}; @@ -474,7 +476,7 @@ impl Connection { where P: IntoIterator, P::Item: ToSql, - F: FnOnce(&Row<'_, '_>) -> T, + F: FnOnce(&Row<'_>) -> T, { let mut stmt = self.prepare(sql)?; stmt.query_row(params, f) @@ -496,7 +498,7 @@ impl Connection { /// or if the underlying SQLite call fails. pub fn query_row_named(&self, sql: &str, params: &[(&str, &dyn ToSql)], f: F) -> Result where - F: FnOnce(&Row<'_, '_>) -> T, + F: FnOnce(&Row<'_>) -> T, { let mut stmt = self.prepare(sql)?; let mut rows = stmt.query_named(params)?; @@ -533,7 +535,7 @@ impl Connection { where P: IntoIterator, P::Item: ToSql, - F: FnOnce(&Row<'_, '_>) -> result::Result, + F: FnOnce(&Row<'_>) -> result::Result, E: convert::From, { let mut stmt = self.prepare(sql)?; @@ -1061,8 +1063,8 @@ mod test { let mut rows = query.query(&[4i32]).unwrap(); let mut v = Vec::::new(); - while let Some(row) = rows.next() { - v.push(row.unwrap().get(0)); + while let Some(row) = rows.next().unwrap() { + v.push(row.get(0)); } assert_eq!(v, [3i32, 2, 1]); @@ -1072,8 +1074,8 @@ mod test { let mut rows = query.query(&[3i32]).unwrap(); let mut v = Vec::::new(); - while let Some(row) = rows.next() { - v.push(row.unwrap().get(0)); + while let Some(row) = rows.next().unwrap() { + v.push(row.get(0)); } assert_eq!(v, [2i32, 1]); @@ -1215,7 +1217,7 @@ mod test { { let mut rows = stmt.query(NO_PARAMS).unwrap(); assert!(!db.is_busy()); - let row = rows.next(); + let row = rows.next().unwrap(); assert!(db.is_busy()); assert!(row.is_some()); } @@ -1327,8 +1329,7 @@ mod test { let mut query = db.prepare("SELECT i, x FROM foo").unwrap(); let mut rows = query.query(NO_PARAMS).unwrap(); - while let Some(res) = rows.next() { - let row = res.unwrap(); + while let Some(row) = rows.next().unwrap() { let i = row.get_raw(0).as_i64().unwrap(); let expect = vals[i as usize]; let x = row.get_raw("x").as_str().unwrap(); diff --git a/src/row.rs b/src/row.rs index 4431a3d..0ae8f3d 100644 --- a/src/row.rs +++ b/src/row.rs @@ -1,12 +1,12 @@ -use std::marker::PhantomData; use std::{convert, result}; -use super::{Error, Result, Statement}; +use super::{Error, FallibleStreamingIterator, Result, Statement}; use crate::types::{FromSql, FromSqlError, ValueRef}; /// An handle for the resulting rows of a query. pub struct Rows<'stmt> { stmt: Option<&'stmt Statement<'stmt>>, + row: Option>, } impl<'stmt> Rows<'stmt> { @@ -15,45 +15,19 @@ impl<'stmt> Rows<'stmt> { stmt.reset(); } } - - /// Attempt to get the next row from the query. Returns `Some(Ok(Row))` if - /// there is another row, `Some(Err(...))` if there was an error - /// getting the next row, and `None` if all rows have been retrieved. - /// - /// ## Note - /// - /// This interface is not compatible with Rust's `Iterator` trait, because - /// the lifetime of the returned row is tied to the lifetime of `self`. - /// This is a "streaming iterator". For a more natural interface, - /// consider using `query_map` or `query_and_then` instead, which - /// return types that implement `Iterator`. - #[allow(clippy::should_implement_trait)] // cannot implement Iterator - pub fn next<'a>(&'a mut self) -> Option>> { - self.stmt.and_then(|stmt| match stmt.step() { - Ok(true) => Some(Ok(Row { - stmt, - phantom: PhantomData, - })), - Ok(false) => { - self.reset(); - None - } - Err(err) => { - self.reset(); - Some(Err(err)) - } - }) - } } impl<'stmt> Rows<'stmt> { pub(crate) fn new(stmt: &'stmt Statement<'stmt>) -> Rows<'stmt> { - Rows { stmt: Some(stmt) } + Rows { + stmt: Some(stmt), + row: None, + } } - pub(crate) fn get_expected_row<'a>(&'a mut self) -> Result> { - match self.next() { - Some(row) => row, + pub(crate) fn get_expected_row(&mut self) -> Result<&Row<'stmt>> { + match self.next()? { + Some(row) => Ok(row), None => Err(Error::QueryReturnedNoRows), } } @@ -73,7 +47,7 @@ pub struct MappedRows<'stmt, F> { impl<'stmt, T, F> MappedRows<'stmt, F> where - F: FnMut(&Row<'_, '_>) -> T, + F: FnMut(&Row<'_>) -> T, { pub(crate) fn new(rows: Rows<'stmt>, f: F) -> MappedRows<'stmt, F> { MappedRows { rows, map: f } @@ -82,7 +56,7 @@ where impl Iterator for MappedRows<'_, F> where - F: FnMut(&Row<'_, '_>) -> T, + F: FnMut(&Row<'_>) -> T, { type Item = Result; @@ -90,6 +64,7 @@ where let map = &mut self.map; self.rows .next() + .transpose() .map(|row_result| row_result.map(|row| (map)(&row))) } } @@ -103,7 +78,7 @@ pub struct AndThenRows<'stmt, F> { impl<'stmt, T, E, F> AndThenRows<'stmt, F> where - F: FnMut(&Row<'_, '_>) -> result::Result, + F: FnMut(&Row<'_>) -> result::Result, { pub(crate) fn new(rows: Rows<'stmt>, f: F) -> AndThenRows<'stmt, F> { AndThenRows { rows, map: f } @@ -113,7 +88,7 @@ where impl Iterator for AndThenRows<'_, F> where E: convert::From, - F: FnMut(&Row<'_, '_>) -> result::Result, + F: FnMut(&Row<'_>) -> result::Result, { type Item = result::Result; @@ -121,17 +96,51 @@ where let map = &mut self.map; self.rows .next() + .transpose() .map(|row_result| row_result.map_err(E::from).and_then(|row| (map)(&row))) } } -/// A single result row of a query. -pub struct Row<'a, 'stmt: 'a> { - stmt: &'stmt Statement<'stmt>, - phantom: PhantomData<&'a ()>, +impl<'stmt> FallibleStreamingIterator for Rows<'stmt> { + type Error = Error; + type Item = Row<'stmt>; + + fn advance(&mut self) -> Result<()> { + match self.stmt { + Some(ref stmt) => match stmt.step() { + Ok(true) => { + self.row = Some(Row { stmt }); + Ok(()) + } + Ok(false) => { + self.reset(); + self.row = None; + Ok(()) + } + Err(e) => { + self.reset(); + self.row = None; + Err(e) + } + }, + None => { + self.row = None; + Ok(()) + } + } + } + + fn get(&self) -> Option<&Row<'stmt>> { + self.row.as_ref() + } } -impl<'a, 'stmt> Row<'a, 'stmt> { +/// A single result row of a query. +pub struct Row<'stmt> { + stmt: &'stmt Statement<'stmt>, +} + +impl<'stmt> Row<'stmt> { /// Get the value of a particular column of the result row. /// /// ## Failure @@ -193,7 +202,7 @@ impl<'a, 'stmt> Row<'a, 'stmt> { /// /// Returns an `Error::InvalidColumnName` if `idx` is not a valid column /// name for this row. - pub fn get_raw_checked(&self, idx: I) -> Result> { + pub fn get_raw_checked(&self, idx: I) -> Result> { let idx = idx.idx(self.stmt)?; // Narrowing from `ValueRef<'stmt>` (which `self.stmt.value_ref(idx)` // returns) to `ValueRef<'a>` is needed because it's only valid until @@ -217,7 +226,7 @@ impl<'a, 'stmt> Row<'a, 'stmt> { /// /// * If `idx` is outside the range of columns in the returned query. /// * If `idx` is not a valid column name for this row. - pub fn get_raw(&self, idx: I) -> ValueRef<'a> { + pub fn get_raw(&self, idx: I) -> ValueRef<'_> { self.get_raw_checked(idx).unwrap() } diff --git a/src/session.rs b/src/session.rs index ac6e9f9..f56b372 100644 --- a/src/session.rs +++ b/src/session.rs @@ -10,13 +10,13 @@ use std::panic::{catch_unwind, RefUnwindSafe}; use std::ptr; use std::slice::{from_raw_parts, from_raw_parts_mut}; -use fallible_streaming_iterator::FallibleStreamingIterator; - use crate::error::error_from_sqlite_code; use crate::ffi; use crate::hooks::Action; use crate::types::ValueRef; -use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result}; +use crate::{ + errmsg_to_string, str_to_cstring, Connection, DatabaseName, FallibleStreamingIterator, Result, +}; // https://sqlite.org/session.html @@ -719,12 +719,11 @@ unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_in #[cfg(test)] mod test { - use fallible_streaming_iterator::FallibleStreamingIterator; use std::sync::atomic::{AtomicBool, Ordering}; use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session}; use crate::hooks::Action; - use crate::Connection; + use crate::{Connection, FallibleStreamingIterator}; fn one_changeset() -> Changeset { let db = Connection::open_in_memory().unwrap(); diff --git a/src/statement.rs b/src/statement.rs index a32aa28..69629f4 100644 --- a/src/statement.rs +++ b/src/statement.rs @@ -9,7 +9,8 @@ use std::{convert, fmt, mem, ptr, result, str}; use super::ffi; use super::str_to_cstring; use super::{ - AndThenRows, Connection, Error, MappedRows, RawStatement, Result, Row, Rows, ValueRef, + AndThenRows, Connection, Error, FallibleStreamingIterator, MappedRows, RawStatement, Result, + Row, Rows, ValueRef, }; use crate::types::{ToSql, ToSqlOutput}; #[cfg(feature = "array")] @@ -267,7 +268,7 @@ impl Statement<'_> { where P: IntoIterator, P::Item: ToSql, - F: FnMut(&Row<'_, '_>) -> T, + F: FnMut(&Row<'_>) -> T, { let rows = self.query(params)?; Ok(MappedRows::new(rows, f)) @@ -306,7 +307,7 @@ impl Statement<'_> { f: F, ) -> Result> where - F: FnMut(&Row<'_, '_>) -> T, + F: FnMut(&Row<'_>) -> T, { let rows = self.query_named(params)?; Ok(MappedRows::new(rows, f)) @@ -324,7 +325,7 @@ impl Statement<'_> { P: IntoIterator, P::Item: ToSql, E: convert::From, - F: FnMut(&Row<'_, '_>) -> result::Result, + F: FnMut(&Row<'_>) -> result::Result, { let rows = self.query(params)?; Ok(AndThenRows::new(rows, f)) @@ -375,7 +376,7 @@ impl Statement<'_> { ) -> Result> where E: convert::From, - F: FnMut(&Row<'_, '_>) -> result::Result, + F: FnMut(&Row<'_>) -> result::Result, { let rows = self.query_named(params)?; Ok(AndThenRows::new(rows, f)) @@ -389,7 +390,7 @@ impl Statement<'_> { P::Item: ToSql, { let mut rows = self.query(params)?; - let exists = rows.next().is_some(); + let exists = rows.next()?.is_some(); Ok(exists) } @@ -410,7 +411,7 @@ impl Statement<'_> { where P: IntoIterator, P::Item: ToSql, - F: FnOnce(&Row<'_, '_>) -> T, + F: FnOnce(&Row<'_>) -> T, { let mut rows = self.query(params)?; @@ -732,7 +733,7 @@ pub enum StatementStatus { #[cfg(test)] mod test { use crate::types::ToSql; - use crate::{Connection, Error, Result, NO_PARAMS}; + use crate::{Connection, Error, FallibleStreamingIterator, Result, NO_PARAMS}; #[test] fn test_execute_named() { diff --git a/src/types/mod.rs b/src/types/mod.rs index 508b273..c704caa 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -110,7 +110,7 @@ mod test { use time; use super::Value; - use crate::{Connection, Error, NO_PARAMS}; + use crate::{Connection, Error, FallibleStreamingIterator, NO_PARAMS}; use std::f64::EPSILON; use std::os::raw::{c_double, c_int}; diff --git a/src/vtab/csvtab.rs b/src/vtab/csvtab.rs index 67659f3..173a4c2 100644 --- a/src/vtab/csvtab.rs +++ b/src/vtab/csvtab.rs @@ -346,7 +346,7 @@ impl From for Error { #[cfg(test)] mod test { use crate::vtab::csvtab; - use crate::{Connection, Result, NO_PARAMS}; + use crate::{Connection, FallibleStreamingIterator, Result, NO_PARAMS}; #[test] fn test_csv_module() {