Merge pull request #866 from phiresky/pass-context-to-aggregate

This commit is contained in:
Thom Chiovoloni 2020-12-19 12:47:13 -08:00 committed by GitHub
commit 73f59a3a38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -270,7 +270,7 @@ where
/// Initializes the aggregation context. Will be called prior to the first /// Initializes the aggregation context. Will be called prior to the first
/// call to [`step()`](Aggregate::step) to set up the context for an invocation of the /// call to [`step()`](Aggregate::step) to set up the context for an invocation of the
/// function. (Note: `init()` will not be called if there are no rows.) /// function. (Note: `init()` will not be called if there are no rows.)
fn init(&self) -> A; fn init(&self, _: &mut Context<'_>) -> Result<A>;
/// "step" function called once for each row in an aggregate group. May be /// "step" function called once for each row in an aggregate group. May be
/// called 0 times if there are no rows. /// called 0 times if there are no rows.
@ -281,7 +281,9 @@ where
/// once, will be given `Some(A)` (the same `A` as was created by /// once, will be given `Some(A)` (the same `A` as was created by
/// [`init`](Aggregate::init) and given to [`step`](Aggregate::step)); if [`step()`](Aggregate::step) was not called (because /// [`init`](Aggregate::init) and given to [`step`](Aggregate::step)); if [`step()`](Aggregate::step) was not called (because
/// the function is running against 0 rows), will be given `None`. /// the function is running against 0 rows), will be given `None`.
fn finalize(&self, _: Option<A>) -> Result<T>; ///
/// The passed context will have no arguments.
fn finalize(&self, _: &mut Context<'_>, _: Option<A>) -> Result<T>;
} }
/// `feature = "window"` WindowAggregate is the callback interface for /// `feature = "window"` WindowAggregate is the callback interface for
@ -624,13 +626,15 @@ unsafe extern "C" fn call_boxed_step<A, D, T>(
!boxed_aggr.is_null(), !boxed_aggr.is_null(),
"Internal error - null aggregate pointer" "Internal error - null aggregate pointer"
); );
if (*pac as *mut A).is_null() {
*pac = Box::into_raw(Box::new((*boxed_aggr).init()));
}
let mut ctx = Context { let mut ctx = Context {
ctx, ctx,
args: slice::from_raw_parts(argv, argc as usize), args: slice::from_raw_parts(argv, argc as usize),
}; };
if (*pac as *mut A).is_null() {
*pac = Box::into_raw(Box::new((*boxed_aggr).init(&mut ctx)?));
}
(*boxed_aggr).step(&mut ctx, &mut **pac) (*boxed_aggr).step(&mut ctx, &mut **pac)
}); });
let r = match r { let r = match r {
@ -715,7 +719,8 @@ where
!boxed_aggr.is_null(), !boxed_aggr.is_null(),
"Internal error - null aggregate pointer" "Internal error - null aggregate pointer"
); );
(*boxed_aggr).finalize(a) let mut ctx = Context { ctx, args: &mut [] };
(*boxed_aggr).finalize(&mut ctx, a)
}); });
let t = match r { let t = match r {
Err(_) => { Err(_) => {
@ -939,8 +944,8 @@ mod test {
struct Count; struct Count;
impl Aggregate<i64, Option<i64>> for Sum { impl Aggregate<i64, Option<i64>> for Sum {
fn init(&self) -> i64 { fn init(&self, _: &mut Context<'_>) -> Result<i64> {
0 Ok(0)
} }
fn step(&self, ctx: &mut Context<'_>, sum: &mut i64) -> Result<()> { fn step(&self, ctx: &mut Context<'_>, sum: &mut i64) -> Result<()> {
@ -948,14 +953,14 @@ mod test {
Ok(()) Ok(())
} }
fn finalize(&self, sum: Option<i64>) -> Result<Option<i64>> { fn finalize(&self, _: &mut Context<'_>, sum: Option<i64>) -> Result<Option<i64>> {
Ok(sum) Ok(sum)
} }
} }
impl Aggregate<i64, i64> for Count { impl Aggregate<i64, i64> for Count {
fn init(&self) -> i64 { fn init(&self, _: &mut Context<'_>) -> Result<i64> {
0 Ok(0)
} }
fn step(&self, _ctx: &mut Context<'_>, sum: &mut i64) -> Result<()> { fn step(&self, _ctx: &mut Context<'_>, sum: &mut i64) -> Result<()> {
@ -963,7 +968,7 @@ mod test {
Ok(()) Ok(())
} }
fn finalize(&self, sum: Option<i64>) -> Result<i64> { fn finalize(&self, _: &mut Context<'_>, sum: Option<i64>) -> Result<i64> {
Ok(sum.unwrap_or(0)) Ok(sum.unwrap_or(0))
} }
} }