mdbx: выделение cursor_put() для уменьшения кол-ва проверок.

This commit is contained in:
Леонид Юрьев (Leonid Yuriev) 2022-12-27 14:51:07 +03:00
parent 61d21b0a02
commit 66a5704949

View File

@ -3329,6 +3329,14 @@ static int __must_check_result audit_ex(MDBX_txn *txn, size_t retired_stored,
static int __must_check_result page_check(const MDBX_cursor *const mc,
const MDBX_page *const mp);
static int __must_check_result cursor_check(const MDBX_cursor *mc);
static int __must_check_result cursor_put_checklen(MDBX_cursor *mc,
const MDBX_val *key,
MDBX_val *data,
unsigned flags);
static int __must_check_result cursor_put_nochecklen(MDBX_cursor *mc,
const MDBX_val *key,
MDBX_val *data,
unsigned flags);
static int __must_check_result cursor_check_updating(MDBX_cursor *mc);
static int __must_check_result cursor_del(MDBX_cursor *mc);
static int __must_check_result delete(MDBX_txn *txn, MDBX_dbi dbi,
@ -10420,7 +10428,7 @@ retry:
? env->me_maxgc_ov1page
: left;
data.iov_len = (chunk + 1) * sizeof(pgno_t);
rc = mdbx_cursor_put(&ctx->cursor, &key, &data, MDBX_RESERVE);
rc = cursor_put_nochecklen(&ctx->cursor, &key, &data, MDBX_RESERVE);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
@ -10458,7 +10466,7 @@ retry:
do {
gcu_prepare_backlog(txn, ctx);
data.iov_len = MDBX_PNL_SIZEOF(txn->tw.retired_pages);
rc = mdbx_cursor_put(&ctx->cursor, &key, &data, MDBX_RESERVE);
rc = cursor_put_nochecklen(&ctx->cursor, &key, &data, MDBX_RESERVE);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
/* Retry if tw.retired_pages[] grew during the Put() */
@ -10753,8 +10761,8 @@ retry:
TRACE("%s: reserve %zu [%zu...%zu) @%" PRIaTXN, dbg_prefix_mode, chunk,
ctx->settled + 1, ctx->settled + chunk + 1, reservation_gc_id);
gcu_prepare_backlog(txn, ctx);
rc = mdbx_cursor_put(&ctx->cursor, &key, &data,
MDBX_RESERVE | MDBX_NOOVERWRITE);
rc = cursor_put_nochecklen(&ctx->cursor, &key, &data,
MDBX_RESERVE | MDBX_NOOVERWRITE);
tASSERT(txn, pnl_check_allocated(txn->tw.relist,
txn->mt_next_pgno - MDBX_ENABLE_REFUND));
if (unlikely(rc != MDBX_SUCCESS))
@ -10859,8 +10867,8 @@ retry:
}
chunk = left;
}
rc = mdbx_cursor_put(&ctx->cursor, &key, &data,
MDBX_CURRENT | MDBX_RESERVE);
rc = cursor_put_nochecklen(&ctx->cursor, &key, &data,
MDBX_CURRENT | MDBX_RESERVE);
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
gcu_clean_reserved(env, data);
@ -11634,10 +11642,10 @@ int mdbx_txn_commit_ex(MDBX_txn *txn, MDBX_commit_latency *latency) {
/* Может быть mod_txnid > front после коммита вложенных тразакций */
db->md_mod_txnid = txn->mt_txnid;
data.iov_base = db;
WITH_CURSOR_TRACKING(couple.outer,
rc = mdbx_cursor_put(&couple.outer,
&txn->mt_dbxs[i].md_name,
&data, F_SUBDATA));
WITH_CURSOR_TRACKING(
couple.outer,
rc = cursor_put_nochecklen(&couple.outer, &txn->mt_dbxs[i].md_name,
&data, F_SUBDATA));
if (unlikely(rc != MDBX_SUCCESS))
goto fail;
}
@ -16969,130 +16977,14 @@ static __hot int cursor_touch(MDBX_cursor *const mc, const MDBX_val *key,
return rc;
}
__hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
unsigned flags) {
MDBX_env *env;
MDBX_page *sub_root = NULL;
static __hot int cursor_put_nochecklen(MDBX_cursor *mc, const MDBX_val *key,
MDBX_val *data, unsigned flags) {
MDBX_page *sub_root = nullptr;
MDBX_val xdata, *rdata, dkey, olddata;
MDBX_db nested_dupdb;
int err;
DKBUF_DEBUG;
if (unlikely(mc == NULL || key == NULL || data == NULL))
return MDBX_EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_LIVE))
return (mc->mc_signature == MDBX_MC_READY4CLOSE) ? MDBX_EINVAL
: MDBX_EBADSIGN;
int rc = check_txn_rw(mc->mc_txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(dbi_changed(mc->mc_txn, mc->mc_dbi)))
return MDBX_BAD_DBI;
cASSERT(mc, cursor_is_tracked(mc));
env = mc->mc_txn->mt_env;
/* Check this first so counter will always be zero on any early failures. */
size_t mcount = 0, dcount = 0;
if (unlikely(flags & MDBX_MULTIPLE)) {
if (unlikely(flags & MDBX_RESERVE))
return MDBX_EINVAL;
if (unlikely(!(mc->mc_db->md_flags & MDBX_DUPFIXED)))
return MDBX_INCOMPATIBLE;
dcount = data[1].iov_len;
if (unlikely(dcount < 2 || data->iov_len == 0))
return MDBX_BAD_VALSIZE;
if (unlikely(mc->mc_db->md_xsize != data->iov_len) && mc->mc_db->md_xsize)
return MDBX_BAD_VALSIZE;
if (unlikely(dcount > MAX_MAPSIZE / 2 /
(BRANCH_NODE_MAX(MAX_PAGESIZE) - NODESIZE))) {
/* checking for multiplication overflow */
if (unlikely(dcount > MAX_MAPSIZE / 2 / data->iov_len))
return MDBX_TOO_LARGE;
}
data[1].iov_len = 0 /* reset done item counter */;
}
if (flags & MDBX_RESERVE) {
if (unlikely(mc->mc_db->md_flags & (MDBX_DUPSORT | MDBX_REVERSEDUP |
MDBX_INTEGERDUP | MDBX_DUPFIXED)))
return MDBX_INCOMPATIBLE;
data->iov_base = nullptr;
}
if (unlikely(mc->mc_txn->mt_flags & (MDBX_TXN_RDONLY | MDBX_TXN_BLOCKED)))
return (mc->mc_txn->mt_flags & MDBX_TXN_RDONLY) ? MDBX_EACCESS
: MDBX_BAD_TXN;
uint64_t aligned_keybytes, aligned_databytes;
MDBX_val aligned_key, aligned_data;
if (likely((mc->mc_flags & C_SUB) == 0)) {
if (unlikely(key->iov_len < mc->mc_dbx->md_klen_min ||
key->iov_len > mc->mc_dbx->md_klen_max)) {
cASSERT(mc, !"Invalid key-size");
return MDBX_BAD_VALSIZE;
}
if (unlikely(data->iov_len < mc->mc_dbx->md_vlen_min ||
data->iov_len > mc->mc_dbx->md_vlen_max)) {
cASSERT(mc, !"Invalid data-size");
return MDBX_BAD_VALSIZE;
}
if (mc->mc_db->md_flags & MDBX_INTEGERKEY) {
switch (key->iov_len) {
default:
cASSERT(mc, !"key-size is invalid for MDBX_INTEGERKEY");
return MDBX_BAD_VALSIZE;
case 4:
if (unlikely(3 & (uintptr_t)key->iov_base)) {
/* copy instead of return error to avoid break compatibility */
aligned_key.iov_base =
memcpy(&aligned_keybytes, key->iov_base, aligned_key.iov_len = 4);
key = &aligned_key;
}
break;
case 8:
if (unlikely(7 & (uintptr_t)key->iov_base)) {
/* copy instead of return error to avoid break compatibility */
aligned_key.iov_base =
memcpy(&aligned_keybytes, key->iov_base, aligned_key.iov_len = 8);
key = &aligned_key;
}
break;
}
}
if (mc->mc_db->md_flags & MDBX_INTEGERDUP) {
switch (data->iov_len) {
default:
cASSERT(mc, !"data-size is invalid for MDBX_INTEGERKEY");
return MDBX_BAD_VALSIZE;
case 4:
if (unlikely(3 & (uintptr_t)data->iov_base)) {
if (unlikely(flags & MDBX_MULTIPLE))
return MDBX_BAD_VALSIZE;
/* copy instead of return error to avoid break compatibility */
aligned_data.iov_base = memcpy(&aligned_databytes, data->iov_base,
aligned_data.iov_len = 4);
data = &aligned_data;
}
break;
case 8:
if (unlikely(7 & (uintptr_t)data->iov_base)) {
if (unlikely(flags & MDBX_MULTIPLE))
return MDBX_BAD_VALSIZE;
/* copy instead of return error to avoid break compatibility */
aligned_data.iov_base = memcpy(&aligned_databytes, data->iov_base,
aligned_data.iov_len = 8);
data = &aligned_data;
}
break;
}
}
}
MDBX_env *const env = mc->mc_txn->mt_env;
DEBUG("==> put db %d key [%s], size %" PRIuPTR ", data [%s] size %" PRIuPTR,
DDBI(mc), DKEY_DEBUG(key), key->iov_len,
DVAL_DEBUG((flags & MDBX_RESERVE) ? nullptr : data), data->iov_len);
@ -17107,9 +16999,9 @@ __hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
* Здесь проще вызвать mdbx_cursor_get(), так как для обслуживания таблиц
* с MDBX_DUPSORT также требуется текущий размер данных. */
MDBX_val current_key, current_data;
rc = mdbx_cursor_get(mc, &current_key, &current_data, MDBX_GET_CURRENT);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
err = mdbx_cursor_get(mc, &current_key, &current_data, MDBX_GET_CURRENT);
if (unlikely(err != MDBX_SUCCESS))
return err;
if (mc->mc_dbx->md_cmp(key, &current_key) != 0)
return MDBX_EKEYMISMATCH;
@ -17127,16 +17019,16 @@ __hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
if (mc->mc_xcursor->mx_db.md_entries > 1 ||
current_data.iov_len != data->iov_len) {
drop_current:
rc = mdbx_cursor_del(mc, flags & MDBX_ALLDUPS);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
err = mdbx_cursor_del(mc, flags & MDBX_ALLDUPS);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_CURRENT;
goto skip_check_samedata;
}
} else if (unlikely(node_size(key, data) > env->me_leaf_nodemax)) {
rc = mdbx_cursor_del(mc, 0);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
err = mdbx_cursor_del(mc, 0);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_CURRENT;
goto skip_check_samedata;
}
@ -17147,6 +17039,7 @@ __hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
skip_check_samedata:;
}
int rc = MDBX_SUCCESS;
if (mc->mc_db->md_root == P_INVALID) {
/* new database, cursor has nothing to point to */
mc->mc_snum = 0;
@ -17196,9 +17089,9 @@ __hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
}
if (unlikely(flags & MDBX_ALLDUPS) && mc->mc_xcursor &&
(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
rc = mdbx_cursor_del(mc, MDBX_ALLDUPS);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
err = mdbx_cursor_del(mc, MDBX_ALLDUPS);
if (unlikely(err != MDBX_SUCCESS))
return err;
flags -= MDBX_ALLDUPS;
rc = MDBX_NOTFOUND;
exact = false;
@ -17227,12 +17120,16 @@ __hot int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
}
mc->mc_flags &= ~C_DEL;
/* Cursor is positioned, check for room in the dirty list */
rdata = data;
size_t mcount = 0, dcount = 0;
if (unlikely(flags & MDBX_MULTIPLE)) {
dcount = data[1].iov_len;
data[1].iov_len = 0 /* reset done item counter */;
rdata = &xdata;
xdata.iov_len = data->iov_len * dcount;
}
/* Cursor is positioned, check for room in the dirty list */
err = cursor_touch(mc, key, rdata);
if (unlikely(err))
return err;
@ -17670,7 +17567,8 @@ new_sub:;
mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root;
/* converted, write the original data first */
if (dupdata_flag) {
rc = mdbx_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags);
rc = cursor_put_nochecklen(&mc->mc_xcursor->mx_cursor, &dkey, &xdata,
xflags);
if (unlikely(rc))
goto bad_sub;
/* we've done our job */
@ -17706,7 +17604,8 @@ new_sub:;
STATIC_ASSERT((MDBX_APPENDDUP >> SHIFT_MDBX_APPENDDUP_TO_MDBX_APPEND) ==
MDBX_APPEND);
xflags |= (flags & MDBX_APPENDDUP) >> SHIFT_MDBX_APPENDDUP_TO_MDBX_APPEND;
rc = mdbx_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
rc = cursor_put_nochecklen(&mc->mc_xcursor->mx_cursor, data, &xdata,
xflags);
if (flags & F_SUBDATA) {
void *db = node_data(node);
mc->mc_xcursor->mx_db.md_mod_txnid = mc->mc_txn->mt_txnid;
@ -17752,6 +17651,126 @@ new_sub:;
return rc;
}
static __hot int cursor_put_checklen(MDBX_cursor *mc, const MDBX_val *key,
MDBX_val *data, unsigned flags) {
cASSERT(mc, (mc->mc_flags & C_SUB) == 0);
uint64_t aligned_keybytes, aligned_databytes;
MDBX_val aligned_key, aligned_data;
if (unlikely(key->iov_len < mc->mc_dbx->md_klen_min ||
key->iov_len > mc->mc_dbx->md_klen_max)) {
cASSERT(mc, !"Invalid key-size");
return MDBX_BAD_VALSIZE;
}
if (unlikely(data->iov_len < mc->mc_dbx->md_vlen_min ||
data->iov_len > mc->mc_dbx->md_vlen_max)) {
cASSERT(mc, !"Invalid data-size");
return MDBX_BAD_VALSIZE;
}
if (mc->mc_db->md_flags & MDBX_INTEGERKEY) {
switch (key->iov_len) {
default:
cASSERT(mc, !"key-size is invalid for MDBX_INTEGERKEY");
return MDBX_BAD_VALSIZE;
case 4:
if (unlikely(3 & (uintptr_t)key->iov_base)) {
/* copy instead of return error to avoid break compatibility */
aligned_key.iov_base =
memcpy(&aligned_keybytes, key->iov_base, aligned_key.iov_len = 4);
key = &aligned_key;
}
break;
case 8:
if (unlikely(7 & (uintptr_t)key->iov_base)) {
/* copy instead of return error to avoid break compatibility */
aligned_key.iov_base =
memcpy(&aligned_keybytes, key->iov_base, aligned_key.iov_len = 8);
key = &aligned_key;
}
break;
}
}
if (mc->mc_db->md_flags & MDBX_INTEGERDUP) {
switch (data->iov_len) {
default:
cASSERT(mc, !"data-size is invalid for MDBX_INTEGERKEY");
return MDBX_BAD_VALSIZE;
case 4:
if (unlikely(3 & (uintptr_t)data->iov_base)) {
if (unlikely(flags & MDBX_MULTIPLE))
return MDBX_BAD_VALSIZE;
/* copy instead of return error to avoid break compatibility */
aligned_data.iov_base = memcpy(&aligned_databytes, data->iov_base,
aligned_data.iov_len = 4);
data = &aligned_data;
}
break;
case 8:
if (unlikely(7 & (uintptr_t)data->iov_base)) {
if (unlikely(flags & MDBX_MULTIPLE))
return MDBX_BAD_VALSIZE;
/* copy instead of return error to avoid break compatibility */
aligned_data.iov_base = memcpy(&aligned_databytes, data->iov_base,
aligned_data.iov_len = 8);
data = &aligned_data;
}
break;
}
}
return cursor_put_nochecklen(mc, key, data, flags);
}
int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
unsigned flags) {
if (unlikely(mc == NULL || key == NULL || data == NULL))
return MDBX_EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_LIVE))
return (mc->mc_signature == MDBX_MC_READY4CLOSE) ? MDBX_EINVAL
: MDBX_EBADSIGN;
int rc = check_txn_rw(mc->mc_txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(dbi_changed(mc->mc_txn, mc->mc_dbi)))
return MDBX_BAD_DBI;
cASSERT(mc, cursor_is_tracked(mc));
/* Check this first so counter will always be zero on any early failures. */
if (unlikely(flags & MDBX_MULTIPLE)) {
if (unlikely(flags & MDBX_RESERVE))
return MDBX_EINVAL;
if (unlikely(!(mc->mc_db->md_flags & MDBX_DUPFIXED)))
return MDBX_INCOMPATIBLE;
const size_t dcount = data[1].iov_len;
if (unlikely(dcount < 2 || data->iov_len == 0))
return MDBX_BAD_VALSIZE;
if (unlikely(mc->mc_db->md_xsize != data->iov_len) && mc->mc_db->md_xsize)
return MDBX_BAD_VALSIZE;
if (unlikely(dcount > MAX_MAPSIZE / 2 /
(BRANCH_NODE_MAX(MAX_PAGESIZE) - NODESIZE))) {
/* checking for multiplication overflow */
if (unlikely(dcount > MAX_MAPSIZE / 2 / data->iov_len))
return MDBX_TOO_LARGE;
}
}
if (flags & MDBX_RESERVE) {
if (unlikely(mc->mc_db->md_flags & (MDBX_DUPSORT | MDBX_REVERSEDUP |
MDBX_INTEGERDUP | MDBX_DUPFIXED)))
return MDBX_INCOMPATIBLE;
data->iov_base = nullptr;
}
if (unlikely(mc->mc_txn->mt_flags & (MDBX_TXN_RDONLY | MDBX_TXN_BLOCKED)))
return (mc->mc_txn->mt_flags & MDBX_TXN_RDONLY) ? MDBX_EACCESS
: MDBX_BAD_TXN;
return cursor_put_checklen(mc, key, data, flags);
}
__hot int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
if (unlikely(!mc))
return MDBX_EINVAL;
@ -20871,7 +20890,7 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
}
if (likely(rc == MDBX_SUCCESS))
rc = mdbx_cursor_put(&cx.outer, key, data, flags);
rc = cursor_put_checklen(&cx.outer, key, data, flags);
txn->mt_cursors[dbi] = cx.outer.mc_next;
return rc;
@ -22431,9 +22450,9 @@ static int dbi_open(MDBX_txn *txn, const MDBX_val *const table_name,
db_dummy.md_flags = user_flags & DB_PERSISTENT_FLAGS;
data.iov_len = sizeof(db_dummy);
data.iov_base = &db_dummy;
WITH_CURSOR_TRACKING(couple.outer,
rc = mdbx_cursor_put(&couple.outer, &key, &data,
F_SUBDATA | MDBX_NOOVERWRITE));
WITH_CURSOR_TRACKING(
couple.outer, rc = cursor_put_checklen(&couple.outer, &key, &data,
F_SUBDATA | MDBX_NOOVERWRITE));
if (unlikely(rc != MDBX_SUCCESS))
goto bailout;
@ -24079,7 +24098,7 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
}
if (likely(new_data))
rc = mdbx_cursor_put(&cx.outer, key, new_data, flags);
rc = cursor_put_checklen(&cx.outer, key, new_data, flags);
else
rc = mdbx_cursor_del(&cx.outer, flags & MDBX_ALLDUPS);