mdbx: alter mdbx_cursor_put() paths.

Change-Id: I1836f8237162ffa34ce432038131fb6171fb7104
This commit is contained in:
Leonid Yuriev 2019-10-04 10:53:29 +03:00
parent 9e92ea2372
commit 39d43a5b57

View File

@ -9449,9 +9449,92 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
} }
more:; more:;
if (mdbx_audit_enabled()) {
int err = mdbx_cursor_check(mc, false);
if (unlikely(err != MDBX_SUCCESS))
return err;
}
MDBX_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); MDBX_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
/* overflow page overwrites need special handling */
if (unlikely(F_ISSET(leaf->mn_flags, F_BIGDATA))) {
MDBX_page *omp;
pgno_t pg;
int level, ovpages,
dpages = (LEAFSIZE(key, data) > env->me_nodemax)
? OVPAGES(env, data->iov_len)
: 0;
memcpy(&pg, NODEDATA(leaf), sizeof(pg));
if (unlikely((rc2 = mdbx_page_get(mc, pg, &omp, &level)) != 0))
return rc2;
ovpages = omp->mp_pages;
/* Is the ov page large enough? */
if (unlikely(mc->mc_flags & C_GCFREEZE)
? ovpages >= dpages
: ovpages ==
/* LY: add configurable threshold to keep reserve space */
dpages) {
if (!IS_DIRTY(omp) && (level || (env->me_flags & MDBX_WRITEMAP))) {
rc = mdbx_page_unspill(mc->mc_txn, omp, &omp);
if (unlikely(rc))
return rc;
level = 0; /* dirty in this txn or clean */
}
/* Is it dirty? */
if (IS_DIRTY(omp)) {
/* yes, overwrite it. Note in this case we don't
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold. */
if (unlikely(level > 1)) {
/* It is writable only in a parent txn */
MDBX_page *np = mdbx_page_malloc(mc->mc_txn, ovpages);
if (unlikely(!np))
return MDBX_ENOMEM;
/* Note - this page is already counted in parent's dirtyroom */
rc2 = mdbx_dpl_append(mc->mc_txn->mt_rw_dirtylist, pg, np);
if (unlikely(rc2 != MDBX_SUCCESS)) {
rc = rc2;
mdbx_dpage_free(env, np, ovpages);
goto fail;
}
/* Currently we make the page look as with put() in the
* parent txn, in case the user peeks at MDBX_RESERVEd
* or unused parts. Some users treat ovpages specially. */
const size_t whole = pgno2bytes(env, ovpages);
/* Skip the part where MDBX will put *data.
* Copy end of page, adjusting alignment so
* compiler may copy words instead of bytes. */
const size_t off =
(PAGEHDRSZ + data->iov_len) & -(intptr_t)sizeof(size_t);
memcpy((size_t *)((char *)np + off), (size_t *)((char *)omp + off),
whole - off);
memcpy(np, omp, PAGEHDRSZ); /* Copy header of page */
omp = np;
}
SETDSZ(leaf, data->iov_len);
if (F_ISSET(flags, MDBX_RESERVE))
data->iov_base = PAGEDATA(omp);
else
memcpy(PAGEDATA(omp), data->iov_base, data->iov_len);
if (mdbx_audit_enabled()) {
int err = mdbx_cursor_check(mc, false);
if (unlikely(err != MDBX_SUCCESS))
return err;
}
return MDBX_SUCCESS;
}
}
if ((rc2 = mdbx_ovpage_free(mc, omp)) != MDBX_SUCCESS)
return rc2;
} else {
olddata.iov_len = NODEDSZ(leaf); olddata.iov_len = NODEDSZ(leaf);
olddata.iov_base = NODEDATA(leaf); olddata.iov_base = NODEDATA(leaf);
mdbx_cassert(mc, (char *)olddata.iov_base + olddata.iov_len <=
(char *)(mc->mc_pg[mc->mc_top]) + env->me_psize);
/* DB has dups? */ /* DB has dups? */
if (F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)) { if (F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)) {
@ -9568,7 +9651,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
mdbx_cassert(mc, fp->mp_upper + offset <= UINT16_MAX); mdbx_cassert(mc, fp->mp_upper + offset <= UINT16_MAX);
mp->mp_upper = (indx_t)(fp->mp_upper + offset); mp->mp_upper = (indx_t)(fp->mp_upper + offset);
if (unlikely(fp_flags & P_LEAF2)) { if (unlikely(fp_flags & P_LEAF2)) {
memcpy(PAGEDATA(mp), PAGEDATA(fp), NUMKEYS(fp) * fp->mp_leaf2_ksize); memcpy(PAGEDATA(mp), PAGEDATA(fp),
NUMKEYS(fp) * fp->mp_leaf2_ksize);
} else { } else {
memcpy((char *)mp + mp->mp_upper + PAGEHDRSZ, memcpy((char *)mp + mp->mp_upper + PAGEHDRSZ,
(char *)fp + fp->mp_upper + PAGEHDRSZ, (char *)fp + fp->mp_upper + PAGEHDRSZ,
@ -9589,85 +9673,13 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
mdbx_node_del(mc, 0); mdbx_node_del(mc, 0);
goto new_sub; goto new_sub;
} }
current:
/* MDBX passes F_SUBDATA in 'flags' to write a DB record */ /* MDBX passes F_SUBDATA in 'flags' to write a DB record */
if (unlikely((leaf->mn_flags ^ flags) & F_SUBDATA)) if (unlikely((leaf->mn_flags ^ flags) & F_SUBDATA))
return MDBX_INCOMPATIBLE; return MDBX_INCOMPATIBLE;
/* overflow page overwrites need special handling */
if (F_ISSET(leaf->mn_flags, F_BIGDATA)) {
MDBX_page *omp;
pgno_t pg;
int level, ovpages,
dpages = (LEAFSIZE(key, data) > env->me_nodemax)
? OVPAGES(env, data->iov_len)
: 0;
memcpy(&pg, olddata.iov_base, sizeof(pg)); current:
if (unlikely((rc2 = mdbx_page_get(mc, pg, &omp, &level)) != 0)) if (data->iov_len == olddata.iov_len) {
return rc2;
ovpages = omp->mp_pages;
/* Is the ov page large enough? */
if (unlikely(mc->mc_flags & C_GCFREEZE)
? ovpages >= dpages
: ovpages ==
/* LY: add configurable threshold to keep reserve space */
dpages) {
if (!IS_DIRTY(omp) && (level || (env->me_flags & MDBX_WRITEMAP))) {
rc = mdbx_page_unspill(mc->mc_txn, omp, &omp);
if (unlikely(rc))
return rc;
level = 0; /* dirty in this txn or clean */
}
/* Is it dirty? */
if (IS_DIRTY(omp)) {
/* yes, overwrite it. Note in this case we don't
* bother to try shrinking the page if the new data
* is smaller than the overflow threshold. */
if (unlikely(level > 1)) {
/* It is writable only in a parent txn */
MDBX_page *np = mdbx_page_malloc(mc->mc_txn, ovpages);
if (unlikely(!np))
return MDBX_ENOMEM;
/* Note - this page is already counted in parent's dirtyroom */
rc2 = mdbx_dpl_append(mc->mc_txn->mt_rw_dirtylist, pg, np);
if (unlikely(rc2 != MDBX_SUCCESS)) {
rc = rc2;
mdbx_dpage_free(env, np, ovpages);
goto fail;
}
/* Currently we make the page look as with put() in the
* parent txn, in case the user peeks at MDBX_RESERVEd
* or unused parts. Some users treat ovpages specially. */
const size_t whole = pgno2bytes(env, ovpages);
/* Skip the part where MDBX will put *data.
* Copy end of page, adjusting alignment so
* compiler may copy words instead of bytes. */
const size_t off =
(PAGEHDRSZ + data->iov_len) & -(intptr_t)sizeof(size_t);
memcpy((size_t *)((char *)np + off), (size_t *)((char *)omp + off),
whole - off);
memcpy(np, omp, PAGEHDRSZ); /* Copy header of page */
omp = np;
}
SETDSZ(leaf, data->iov_len);
if (F_ISSET(flags, MDBX_RESERVE))
data->iov_base = PAGEDATA(omp);
else
memcpy(PAGEDATA(omp), data->iov_base, data->iov_len);
if (mdbx_audit_enabled()) {
int err = mdbx_cursor_check(mc, false);
if (unlikely(err != MDBX_SUCCESS))
return err;
}
return MDBX_SUCCESS;
}
}
if ((rc2 = mdbx_ovpage_free(mc, omp)) != MDBX_SUCCESS)
return rc2;
} else if (data->iov_len == olddata.iov_len) {
mdbx_cassert(mc, EVEN(key->iov_len) == EVEN(leaf->mn_ksize)); mdbx_cassert(mc, EVEN(key->iov_len) == EVEN(leaf->mn_ksize));
/* same size, just replace it. Note that we could /* same size, just replace it. Note that we could
* also reuse this node if the new data is smaller, * also reuse this node if the new data is smaller,
@ -9696,6 +9708,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
} }
return MDBX_SUCCESS; return MDBX_SUCCESS;
} }
}
mdbx_node_del(mc, 0); mdbx_node_del(mc, 0);
} }