diff --git a/src/elements/core.c b/src/elements/core.c index 7d145ae8..8da58836 100644 --- a/src/elements/core.c +++ b/src/elements/core.c @@ -9449,152 +9449,15 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, } more:; - MDBX_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); - olddata.iov_len = NODEDSZ(leaf); - olddata.iov_base = NODEDATA(leaf); - - /* DB has dups? */ - if (F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)) { - /* Prepare (sub-)page/sub-DB to accept the new item, if needed. - * fp: old sub-page or a header faking it. - * mp: new (sub-)page. offset: growth in page size. - * xdata: node data with new page or DB. */ - unsigned i, offset = 0; - MDBX_page *mp = fp = xdata.iov_base = env->me_pbuf; - mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; - - /* Was a single item before, must convert now */ - if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { - - /* does data match? */ - if (!mc->mc_dbx->md_dcmp(data, &olddata)) { - if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP))) - return MDBX_KEYEXIST; - /* overwrite it */ - goto current; - } - - /* Just overwrite the current item */ - if (flags & MDBX_CURRENT) - goto current; - - /* Back up original data item */ - dupdata_flag = 1; - dkey.iov_len = olddata.iov_len; - dkey.iov_base = memcpy(fp + 1, olddata.iov_base, olddata.iov_len); - - /* Make sub-page header for the dup items, with dummy body */ - fp->mp_flags = P_LEAF | P_DIRTY | P_SUBP; - fp->mp_lower = 0; - xdata.iov_len = PAGEHDRSZ + dkey.iov_len + data->iov_len; - if (mc->mc_db->md_flags & MDBX_DUPFIXED) { - fp->mp_flags |= P_LEAF2; - fp->mp_leaf2_ksize = (uint16_t)data->iov_len; - xdata.iov_len += 2 * data->iov_len; /* leave space for 2 more */ - mdbx_cassert(mc, xdata.iov_len <= env->me_psize); - } else { - xdata.iov_len += 2 * (sizeof(indx_t) + NODESIZE) + - (dkey.iov_len & 1) + (data->iov_len & 1); - mdbx_cassert(mc, xdata.iov_len <= env->me_psize); - } - fp->mp_upper = (uint16_t)(xdata.iov_len - PAGEHDRSZ); - olddata.iov_len = xdata.iov_len; /* pretend olddata is fp */ - } else if (leaf->mn_flags & F_SUBDATA) { - /* Data is on sub-DB, just store it */ - flags |= F_DUPDATA | F_SUBDATA; - goto put_sub; - } else { - /* Data is on sub-page */ - fp = olddata.iov_base; - switch (flags) { - default: - if (!(mc->mc_db->md_flags & MDBX_DUPFIXED)) { - offset = EVEN(NODESIZE + sizeof(indx_t) + data->iov_len); - break; - } - offset = fp->mp_leaf2_ksize; - if (SIZELEFT(fp) < offset) { - offset *= 4; /* space for 4 more */ - break; - } - /* FALLTHRU: Big enough MDBX_DUPFIXED sub-page */ - __fallthrough; - case MDBX_CURRENT | MDBX_NODUPDATA: - case MDBX_CURRENT: - fp->mp_flags |= P_DIRTY; - fp->mp_pgno = mp->mp_pgno; - mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; - flags |= F_DUPDATA; - goto put_sub; - } - xdata.iov_len = olddata.iov_len + offset; - } - - fp_flags = fp->mp_flags; - if (NODESIZE + NODEKSZ(leaf) + xdata.iov_len > env->me_nodemax) { - /* Too big for a sub-page, convert to sub-DB */ - fp_flags &= ~P_SUBP; - prep_subDB: - dummy.md_xsize = 0; - dummy.md_flags = 0; - if (mc->mc_db->md_flags & MDBX_DUPFIXED) { - fp_flags |= P_LEAF2; - dummy.md_xsize = fp->mp_leaf2_ksize; - dummy.md_flags = MDBX_DUPFIXED; - if (mc->mc_db->md_flags & MDBX_INTEGERDUP) - dummy.md_flags |= MDBX_INTEGERKEY; - } - dummy.md_depth = 1; - dummy.md_branch_pages = 0; - dummy.md_leaf_pages = 1; - dummy.md_overflow_pages = 0; - dummy.md_entries = NUMKEYS(fp); - xdata.iov_len = sizeof(MDBX_db); - xdata.iov_base = &dummy; - if ((rc = mdbx_page_alloc(mc, 1, &mp, MDBX_ALLOC_ALL))) - return rc; - mc->mc_db->md_leaf_pages += 1; - mdbx_cassert(mc, env->me_psize > olddata.iov_len); - offset = env->me_psize - (unsigned)olddata.iov_len; - flags |= F_DUPDATA | F_SUBDATA; - dummy.md_root = mp->mp_pgno; - dummy.md_seq = dummy.md_merkle = 0; - sub_root = mp; - } - if (mp != fp) { - mp->mp_flags = fp_flags | P_DIRTY; - mp->mp_leaf2_ksize = fp->mp_leaf2_ksize; - mp->mp_lower = fp->mp_lower; - mdbx_cassert(mc, fp->mp_upper + offset <= UINT16_MAX); - mp->mp_upper = (indx_t)(fp->mp_upper + offset); - if (unlikely(fp_flags & P_LEAF2)) { - memcpy(PAGEDATA(mp), PAGEDATA(fp), NUMKEYS(fp) * fp->mp_leaf2_ksize); - } else { - memcpy((char *)mp + mp->mp_upper + PAGEHDRSZ, - (char *)fp + fp->mp_upper + PAGEHDRSZ, - olddata.iov_len - fp->mp_upper - PAGEHDRSZ); - memcpy((char *)(&mp->mp_ptrs), (char *)(&fp->mp_ptrs), - NUMKEYS(fp) * sizeof(mp->mp_ptrs[0])); - for (i = 0; i < NUMKEYS(fp); i++) { - mdbx_cassert(mc, mp->mp_ptrs[i] + offset <= UINT16_MAX); - mp->mp_ptrs[i] += (indx_t)offset; - } - } - } - - rdata = &xdata; - flags |= F_DUPDATA; - do_sub = true; - if (!insert_key) - mdbx_node_del(mc, 0); - goto new_sub; + if (mdbx_audit_enabled()) { + int err = mdbx_cursor_check(mc, false); + if (unlikely(err != MDBX_SUCCESS)) + return err; } - current: - /* MDBX passes F_SUBDATA in 'flags' to write a DB record */ - if (unlikely((leaf->mn_flags ^ flags) & F_SUBDATA)) - return MDBX_INCOMPATIBLE; + MDBX_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + /* overflow page overwrites need special handling */ - if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + if (unlikely(F_ISSET(leaf->mn_flags, F_BIGDATA))) { MDBX_page *omp; pgno_t pg; int level, ovpages, @@ -9602,7 +9465,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, ? OVPAGES(env, data->iov_len) : 0; - memcpy(&pg, olddata.iov_base, sizeof(pg)); + memcpy(&pg, NODEDATA(leaf), sizeof(pg)); if (unlikely((rc2 = mdbx_page_get(mc, pg, &omp, &level)) != 0)) return rc2; ovpages = omp->mp_pages; @@ -9667,34 +9530,184 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data, } if ((rc2 = mdbx_ovpage_free(mc, omp)) != MDBX_SUCCESS) return rc2; - } else if (data->iov_len == olddata.iov_len) { - mdbx_cassert(mc, EVEN(key->iov_len) == EVEN(leaf->mn_ksize)); - /* same size, just replace it. Note that we could - * also reuse this node if the new data is smaller, - * but instead we opt to shrink the node in that case. */ - if (F_ISSET(flags, MDBX_RESERVE)) - data->iov_base = olddata.iov_base; - else if (!(mc->mc_flags & C_SUB)) - memcpy(olddata.iov_base, data->iov_base, data->iov_len); - else { - mdbx_cassert(mc, NUMKEYS(mc->mc_pg[mc->mc_top]) == 1); - mdbx_cassert(mc, PAGETYPE(mc->mc_pg[mc->mc_top]) == P_LEAF); - mdbx_cassert(mc, NODEDSZ(leaf) == 0); - mdbx_cassert(mc, leaf->mn_flags == 0); - mdbx_cassert(mc, key->iov_len < UINT16_MAX); - leaf->mn_ksize = (uint16_t)key->iov_len; - memcpy(NODEKEY(leaf), key->iov_base, key->iov_len); - mdbx_cassert(mc, (char *)NODEKEY(leaf) + NODEDSZ(leaf) < - (char *)(mc->mc_pg[mc->mc_top]) + env->me_psize); - goto fix_parent; + } else { + olddata.iov_len = NODEDSZ(leaf); + olddata.iov_base = NODEDATA(leaf); + mdbx_cassert(mc, (char *)olddata.iov_base + olddata.iov_len <= + (char *)(mc->mc_pg[mc->mc_top]) + env->me_psize); + + /* DB has dups? */ + if (F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)) { + /* Prepare (sub-)page/sub-DB to accept the new item, if needed. + * fp: old sub-page or a header faking it. + * mp: new (sub-)page. offset: growth in page size. + * xdata: node data with new page or DB. */ + unsigned i, offset = 0; + MDBX_page *mp = fp = xdata.iov_base = env->me_pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + + /* Was a single item before, must convert now */ + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + + /* does data match? */ + if (!mc->mc_dbx->md_dcmp(data, &olddata)) { + if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP))) + return MDBX_KEYEXIST; + /* overwrite it */ + goto current; + } + + /* Just overwrite the current item */ + if (flags & MDBX_CURRENT) + goto current; + + /* Back up original data item */ + dupdata_flag = 1; + dkey.iov_len = olddata.iov_len; + dkey.iov_base = memcpy(fp + 1, olddata.iov_base, olddata.iov_len); + + /* Make sub-page header for the dup items, with dummy body */ + fp->mp_flags = P_LEAF | P_DIRTY | P_SUBP; + fp->mp_lower = 0; + xdata.iov_len = PAGEHDRSZ + dkey.iov_len + data->iov_len; + if (mc->mc_db->md_flags & MDBX_DUPFIXED) { + fp->mp_flags |= P_LEAF2; + fp->mp_leaf2_ksize = (uint16_t)data->iov_len; + xdata.iov_len += 2 * data->iov_len; /* leave space for 2 more */ + mdbx_cassert(mc, xdata.iov_len <= env->me_psize); + } else { + xdata.iov_len += 2 * (sizeof(indx_t) + NODESIZE) + + (dkey.iov_len & 1) + (data->iov_len & 1); + mdbx_cassert(mc, xdata.iov_len <= env->me_psize); + } + fp->mp_upper = (uint16_t)(xdata.iov_len - PAGEHDRSZ); + olddata.iov_len = xdata.iov_len; /* pretend olddata is fp */ + } else if (leaf->mn_flags & F_SUBDATA) { + /* Data is on sub-DB, just store it */ + flags |= F_DUPDATA | F_SUBDATA; + goto put_sub; + } else { + /* Data is on sub-page */ + fp = olddata.iov_base; + switch (flags) { + default: + if (!(mc->mc_db->md_flags & MDBX_DUPFIXED)) { + offset = EVEN(NODESIZE + sizeof(indx_t) + data->iov_len); + break; + } + offset = fp->mp_leaf2_ksize; + if (SIZELEFT(fp) < offset) { + offset *= 4; /* space for 4 more */ + break; + } + /* FALLTHRU: Big enough MDBX_DUPFIXED sub-page */ + __fallthrough; + case MDBX_CURRENT | MDBX_NODUPDATA: + case MDBX_CURRENT: + fp->mp_flags |= P_DIRTY; + fp->mp_pgno = mp->mp_pgno; + mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; + flags |= F_DUPDATA; + goto put_sub; + } + xdata.iov_len = olddata.iov_len + offset; + } + + fp_flags = fp->mp_flags; + if (NODESIZE + NODEKSZ(leaf) + xdata.iov_len > env->me_nodemax) { + /* Too big for a sub-page, convert to sub-DB */ + fp_flags &= ~P_SUBP; + prep_subDB: + dummy.md_xsize = 0; + dummy.md_flags = 0; + if (mc->mc_db->md_flags & MDBX_DUPFIXED) { + fp_flags |= P_LEAF2; + dummy.md_xsize = fp->mp_leaf2_ksize; + dummy.md_flags = MDBX_DUPFIXED; + if (mc->mc_db->md_flags & MDBX_INTEGERDUP) + dummy.md_flags |= MDBX_INTEGERKEY; + } + dummy.md_depth = 1; + dummy.md_branch_pages = 0; + dummy.md_leaf_pages = 1; + dummy.md_overflow_pages = 0; + dummy.md_entries = NUMKEYS(fp); + xdata.iov_len = sizeof(MDBX_db); + xdata.iov_base = &dummy; + if ((rc = mdbx_page_alloc(mc, 1, &mp, MDBX_ALLOC_ALL))) + return rc; + mc->mc_db->md_leaf_pages += 1; + mdbx_cassert(mc, env->me_psize > olddata.iov_len); + offset = env->me_psize - (unsigned)olddata.iov_len; + flags |= F_DUPDATA | F_SUBDATA; + dummy.md_root = mp->mp_pgno; + dummy.md_seq = dummy.md_merkle = 0; + sub_root = mp; + } + if (mp != fp) { + mp->mp_flags = fp_flags | P_DIRTY; + mp->mp_leaf2_ksize = fp->mp_leaf2_ksize; + mp->mp_lower = fp->mp_lower; + mdbx_cassert(mc, fp->mp_upper + offset <= UINT16_MAX); + mp->mp_upper = (indx_t)(fp->mp_upper + offset); + if (unlikely(fp_flags & P_LEAF2)) { + memcpy(PAGEDATA(mp), PAGEDATA(fp), + NUMKEYS(fp) * fp->mp_leaf2_ksize); + } else { + memcpy((char *)mp + mp->mp_upper + PAGEHDRSZ, + (char *)fp + fp->mp_upper + PAGEHDRSZ, + olddata.iov_len - fp->mp_upper - PAGEHDRSZ); + memcpy((char *)(&mp->mp_ptrs), (char *)(&fp->mp_ptrs), + NUMKEYS(fp) * sizeof(mp->mp_ptrs[0])); + for (i = 0; i < NUMKEYS(fp); i++) { + mdbx_cassert(mc, mp->mp_ptrs[i] + offset <= UINT16_MAX); + mp->mp_ptrs[i] += (indx_t)offset; + } + } + } + + rdata = &xdata; + flags |= F_DUPDATA; + do_sub = true; + if (!insert_key) + mdbx_node_del(mc, 0); + goto new_sub; } - if (mdbx_audit_enabled()) { - int err = mdbx_cursor_check(mc, false); - if (unlikely(err != MDBX_SUCCESS)) - return err; + /* MDBX passes F_SUBDATA in 'flags' to write a DB record */ + if (unlikely((leaf->mn_flags ^ flags) & F_SUBDATA)) + return MDBX_INCOMPATIBLE; + + current: + if (data->iov_len == olddata.iov_len) { + mdbx_cassert(mc, EVEN(key->iov_len) == EVEN(leaf->mn_ksize)); + /* same size, just replace it. Note that we could + * also reuse this node if the new data is smaller, + * but instead we opt to shrink the node in that case. */ + if (F_ISSET(flags, MDBX_RESERVE)) + data->iov_base = olddata.iov_base; + else if (!(mc->mc_flags & C_SUB)) + memcpy(olddata.iov_base, data->iov_base, data->iov_len); + else { + mdbx_cassert(mc, NUMKEYS(mc->mc_pg[mc->mc_top]) == 1); + mdbx_cassert(mc, PAGETYPE(mc->mc_pg[mc->mc_top]) == P_LEAF); + mdbx_cassert(mc, NODEDSZ(leaf) == 0); + mdbx_cassert(mc, leaf->mn_flags == 0); + mdbx_cassert(mc, key->iov_len < UINT16_MAX); + leaf->mn_ksize = (uint16_t)key->iov_len; + memcpy(NODEKEY(leaf), key->iov_base, key->iov_len); + mdbx_cassert(mc, (char *)NODEKEY(leaf) + NODEDSZ(leaf) < + (char *)(mc->mc_pg[mc->mc_top]) + env->me_psize); + goto fix_parent; + } + + if (mdbx_audit_enabled()) { + int err = mdbx_cursor_check(mc, false); + if (unlikely(err != MDBX_SUCCESS)) + return err; + } + return MDBX_SUCCESS; } - return MDBX_SUCCESS; } mdbx_node_del(mc, 0); }