mdbx: add MDBX_ALLDUPS & MDBX_UPSERT, rework handling of others.

Change-Id: I27d437540d883935d78242e4fc7e28951ab9f496
This commit is contained in:
Leonid Yuriev
2020-08-27 22:24:07 +03:00
parent aa7695c513
commit 04a77d3bf3
3 changed files with 177 additions and 105 deletions

View File

@@ -10688,6 +10688,12 @@ static int __hot cmp_lenfast(const MDBX_val *a, const MDBX_val *b) {
return likely(diff) ? diff : memcmp(a->iov_base, b->iov_base, a->iov_len);
}
static bool unsure_equal(MDBX_cmp_func cmp, const MDBX_val *a,
const MDBX_val *b) {
return cmp == cmp_lenfast || cmp == cmp_lexical || cmp == cmp_reverse ||
cmp == cmp_int_unaligned || cmp_lenfast(a, b) == 0;
}
/* Search for key within a page, using binary search.
* Returns the smallest entry larger or equal to the key.
* If exactp is non-null, stores whether the found entry was an exact match
@@ -12077,7 +12083,9 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
/* Check this first so counter will always be zero on any early failures. */
size_t mcount = 0, dcount = 0;
if (flags & MDBX_MULTIPLE) {
if (unlikely(flags & MDBX_MULTIPLE)) {
if (unlikely(flags & MDBX_RESERVE))
return MDBX_EINVAL;
if (unlikely(!F_ISSET(mc->mc_db->md_flags, MDBX_DUPFIXED)))
return MDBX_INCOMPATIBLE;
dcount = data[1].iov_len;
@@ -12181,6 +12189,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
int dupdata_flag = 0;
if ((flags & MDBX_CURRENT) != 0 && (mc->mc_flags & C_SUB) == 0) {
if (unlikely(flags & (MDBX_APPEND | MDBX_NOOVERWRITE)))
return MDBX_EINVAL;
/* Опция MDBX_CURRENT означает, что запрошено обновление текущей записи,
* на которой сейчас стоит курсор. Проверяем что переданный ключ совпадает
* со значением в текущей позиции курсора.
@@ -12193,6 +12203,9 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
if (mc->mc_dbx->md_cmp(key, &current_key) != 0)
return MDBX_EKEYMISMATCH;
if (unlikely((flags & MDBX_MULTIPLE)))
goto drop_current;
if (F_ISSET(mc->mc_db->md_flags, MDBX_DUPSORT)) {
MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (F_ISSET(node_flags(node), F_DUPDATA)) {
@@ -12200,24 +12213,31 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
mc->mc_xcursor != NULL &&
(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED));
/* Если за ключом более одного значения, либо если размер данных
* отличается, то вместо inplace обновления требуется удаление и
* отличается, то вместо обновления требуется удаление и
* последующая вставка. */
if (mc->mc_xcursor->mx_db.md_entries > 1 ||
current_data.iov_len != data->iov_len) {
rc = mdbx_cursor_del(mc, 0);
if (rc != MDBX_SUCCESS)
drop_current:
rc = mdbx_cursor_del(mc, flags & MDBX_ALLDUPS);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
flags -= MDBX_CURRENT;
goto skip_check_samedata;
}
} else if (unlikely(node_size(key, data) >
/* See note inside leaf_size() */
env->me_branch_nodemax)) {
rc = mdbx_cursor_del(mc, 0);
if (rc != MDBX_SUCCESS)
if (unlikely(rc != MDBX_SUCCESS))
return rc;
flags -= MDBX_CURRENT;
goto skip_check_samedata;
}
}
if (!(flags & MDBX_RESERVE) &&
unlikely(cmp_lenfast(&current_data, data) == 0))
return MDBX_SUCCESS /* the same data, nothing to update */;
skip_check_samedata:;
}
if (mc->mc_db->md_root == P_INVALID) {
@@ -12227,38 +12247,65 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
mc->mc_flags &= ~C_INITIALIZED;
rc = MDBX_NO_ROOT;
} else if ((flags & MDBX_CURRENT) == 0) {
int exact = 0;
if ((flags & MDBX_APPEND) != 0 && mc->mc_db->md_entries > 0) {
int exact = false;
if ((flags & MDBX_APPEND) && mc->mc_db->md_entries > 0) {
rc = mdbx_cursor_last(mc, &dkey, &olddata);
if (rc == 0) {
if (likely(rc == MDBX_SUCCESS)) {
rc = mc->mc_dbx->md_cmp(key, &dkey);
if (rc > 0) {
if (likely(rc > 0)) {
mc->mc_ki[mc->mc_top]++; /* step forward for appending */
rc = MDBX_NOTFOUND;
mc->mc_ki[mc->mc_top]++;
} else if (unlikely(rc < 0 || (flags & MDBX_APPENDDUP) == 0)) {
/* new key is <= last key */
rc = MDBX_EKEYMISMATCH;
} else {
if (unlikely(rc != 0 || !(flags & MDBX_APPENDDUP)))
/* new-key < last-key
* or new-key == last-key without MDBX_APPENDDUP */
return MDBX_EKEYMISMATCH;
exact = true;
}
}
} else {
rc = mdbx_cursor_set(mc, (MDBX_val *)key, &olddata, MDBX_SET, &exact);
}
if ((flags & MDBX_NOOVERWRITE) &&
(rc == MDBX_SUCCESS || rc == MDBX_EKEYMISMATCH)) {
mdbx_debug("duplicate key [%s]", DKEY(key));
*data = olddata;
return MDBX_KEYEXIST;
}
if (likely(rc == MDBX_SUCCESS)) {
if (exact) {
if (mc->mc_flags & C_SUB) {
mdbx_assert(env, data->iov_len == 0);
return (flags & MDBX_NODUPDATA) ? MDBX_KEYEXIST : MDBX_SUCCESS;
if (unlikely(flags & MDBX_NOOVERWRITE)) {
mdbx_debug("duplicate key [%s]", DKEY(key));
*data = olddata;
return MDBX_KEYEXIST;
}
if (unlikely(mc->mc_flags & C_SUB)) {
/* nested subtree of DUPSORT-database with the same key,
* nothing to update */
mdbx_assert(env, data->iov_len == 0 && olddata.iov_len == 0);
return MDBX_SUCCESS;
}
if (unlikely(flags & MDBX_ALLDUPS) && mc->mc_xcursor &&
(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)) {
rc = mdbx_cursor_del(mc, MDBX_ALLDUPS);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
flags -= MDBX_ALLDUPS;
rc = MDBX_NOTFOUND;
exact = false;
} else /* checking for early exit without dirtying pages */
if (!(flags & (MDBX_RESERVE | MDBX_MULTIPLE)) &&
unlikely(mc->mc_dbx->md_dcmp(data, &olddata) == 0)) {
if (!mc->mc_xcursor)
/* the same data, nothing to update */
return MDBX_SUCCESS;
if (flags & MDBX_NODUPDATA)
return MDBX_KEYEXIST;
if (flags & MDBX_APPENDDUP)
return MDBX_EKEYMISMATCH;
if (likely(unsure_equal(mc->mc_dbx->md_dcmp, data, &olddata)))
/* data is match exactly byte-to-byte, nothing to update */
return MDBX_SUCCESS;
else {
/* The data has differences, but the user-provided comparator
* considers them equal. So continue update since called without.
* Continue to update since was called without MDBX_NODUPDATA. */
}
}
if (!(flags & MDBX_RESERVE) &&
unlikely(mc->mc_dbx->md_dcmp(data, &olddata) == 0))
return ((flags & MDBX_NODUPDATA) && mc->mc_xcursor) ? MDBX_KEYEXIST
: MDBX_SUCCESS;
}
} else if (unlikely(rc != MDBX_NOTFOUND))
return rc;
@@ -12268,7 +12315,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
/* Cursor is positioned, check for room in the dirty list */
if (!nospill) {
if (flags & MDBX_MULTIPLE) {
if (unlikely(flags & MDBX_MULTIPLE)) {
rdata = &xdata;
xdata.iov_len = data->iov_len * dcount;
} else {
@@ -12476,8 +12523,25 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
/* Was a single item before, must convert now */
if (!F_ISSET(node_flags(node), F_DUPDATA)) {
/* Just overwrite the current item */
if (flags & MDBX_CURRENT) {
/* does data match? */
const int cmp = mc->mc_dbx->md_dcmp(data, &olddata);
if ((flags & MDBX_APPENDDUP) && unlikely(cmp <= 0))
return MDBX_EKEYMISMATCH;
if (cmp == 0) {
if (flags & MDBX_NODUPDATA)
return MDBX_KEYEXIST;
if (likely(unsure_equal(mc->mc_dbx->md_dcmp, data, &olddata))) {
/* data is match exactly byte-to-byte, nothing to update */
if (unlikely(flags & MDBX_MULTIPLE)) {
rc = MDBX_SUCCESS;
goto continue_multiple;
}
return MDBX_SUCCESS;
} else {
/* The data has differences, but the user-provided comparator
* considers them equal. So continue update since called without.
* Continue to update since was called without MDBX_NODUPDATA. */
}
mdbx_cassert(
mc,
node_size(key, data) <=
@@ -12485,11 +12549,8 @@ int mdbx_cursor_put(MDBX_cursor *mc, const MDBX_val *key, MDBX_val *data,
goto current;
}
/* does data match? */
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
if (unlikely(flags & (MDBX_NODUPDATA | MDBX_APPENDDUP)))
return MDBX_KEYEXIST;
/* overwrite it */
/* Just overwrite the current item */
if (flags & MDBX_CURRENT) {
mdbx_cassert(
mc,
node_size(key, data) <=
@@ -12702,16 +12763,18 @@ new_sub:;
xdata.iov_len = 0;
xdata.iov_base = nullptr;
MDBX_node *node = page_node(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (flags & MDBX_CURRENT) {
xflags = (flags & MDBX_NODUPDATA)
? MDBX_CURRENT | MDBX_NOOVERWRITE | MDBX_NOSPILL
: MDBX_CURRENT | MDBX_NOSPILL;
} else {
#define SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE 1
STATIC_ASSERT(
(MDBX_NODUPDATA >> SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE) ==
MDBX_NOOVERWRITE);
xflags = MDBX_CURRENT | MDBX_NOSPILL |
((flags & MDBX_NODUPDATA) >>
SHIFT_MDBX_NODUPDATA_TO_MDBX_NOOVERWRITE);
if ((flags & MDBX_CURRENT) == 0) {
xflags -= MDBX_CURRENT;
rc2 = mdbx_xcursor_init1(mc, node);
if (unlikely(rc2 != MDBX_SUCCESS))
return rc2;
xflags = (flags & MDBX_NODUPDATA) ? MDBX_NOOVERWRITE | MDBX_NOSPILL
: MDBX_NOSPILL;
}
if (sub_root)
mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root;
@@ -12773,6 +12836,7 @@ new_sub:;
}
if (flags & MDBX_MULTIPLE) {
if (!rc) {
continue_multiple:
mcount++;
/* let caller know how many succeeded, if any */
data[1].iov_len = mcount;
@@ -12828,7 +12892,7 @@ int mdbx_cursor_del(MDBX_cursor *mc, MDBX_put_flags_t flags) {
MDBX_node *node = page_node(mp, mc->mc_ki[mc->mc_top]);
if (F_ISSET(node_flags(node), F_DUPDATA)) {
if (flags & MDBX_NODUPDATA) {
if (flags & (MDBX_ALLDUPS | /* for compatibility */ MDBX_NODUPDATA)) {
/* mdbx_cursor_del0() will subtract the final entry */
mc->mc_db->md_entries -= mc->mc_xcursor->mx_db.md_entries - 1;
mc->mc_xcursor->mx_cursor.mc_flags &= ~C_INITIALIZED;
@@ -14964,7 +15028,7 @@ static int mdbx_del0(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
data = &rdata;
} else {
op = MDBX_SET;
flags |= MDBX_NODUPDATA;
flags |= MDBX_ALLDUPS;
}
rc =
mdbx_cursor_set(&cx.outer, (MDBX_val *)key, (MDBX_val *)data, op, &exact);
@@ -15479,8 +15543,9 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
if (unlikely(!mdbx_txn_dbi_exists(txn, dbi, DBI_USRVALID)))
return MDBX_BAD_DBI;
if (unlikely(flags & ~(MDBX_NOOVERWRITE | MDBX_NODUPDATA | MDBX_RESERVE |
MDBX_APPEND | MDBX_APPENDDUP | MDBX_CURRENT)))
if (unlikely(flags & ~(MDBX_NOOVERWRITE | MDBX_NODUPDATA | MDBX_ALLDUPS |
MDBX_ALLDUPS | MDBX_RESERVE | MDBX_APPEND |
MDBX_APPENDDUP | MDBX_CURRENT | MDBX_MULTIPLE)))
return MDBX_EINVAL;
if (unlikely(txn->mt_flags & (MDBX_TXN_RDONLY | MDBX_TXN_BLOCKED)))
@@ -15497,7 +15562,8 @@ int mdbx_put(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key, MDBX_val *data,
if (flags & MDBX_CURRENT) {
rc = mdbx_cursor_get(&cx.outer, (MDBX_val *)key, NULL, MDBX_SET);
if (likely(rc == MDBX_SUCCESS) &&
(txn->mt_dbs[dbi].md_flags & MDBX_DUPSORT)) {
(txn->mt_dbs[dbi].md_flags & MDBX_DUPSORT) &&
(flags & MDBX_ALLDUPS) == 0) {
/* LY: allows update (explicit overwrite) only for unique keys */
MDBX_node *node = page_node(cx.outer.mc_pg[cx.outer.mc_top],
cx.outer.mc_ki[cx.outer.mc_top]);
@@ -18276,8 +18342,9 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
if (unlikely(!mdbx_txn_dbi_exists(txn, dbi, DBI_USRVALID)))
return MDBX_BAD_DBI;
if (unlikely(flags & ~(MDBX_NOOVERWRITE | MDBX_NODUPDATA | MDBX_RESERVE |
MDBX_APPEND | MDBX_APPENDDUP | MDBX_CURRENT)))
if (unlikely(flags &
~(MDBX_NOOVERWRITE | MDBX_NODUPDATA | MDBX_ALLDUPS |
MDBX_RESERVE | MDBX_APPEND | MDBX_APPENDDUP | MDBX_CURRENT)))
return MDBX_EINVAL;
MDBX_cursor_couple cx;
@@ -18301,16 +18368,6 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
rc = mdbx_cursor_get(&cx.outer, &present_key, old_data, MDBX_GET_BOTH);
if (rc != MDBX_SUCCESS)
goto bailout;
if (new_data) {
/* обновление конкретного дубликата */
/* (!!!) We can skip update only when a data exactly the same, but user's
* md_dcmp() may returns zero even data is NOT matches byte-to-byte.
* So to skip update the cmp_len fast() should be used. */
if (cmp_lenfast(old_data, new_data) == 0)
/* если данные совпадают, то ничего делать не надо */
goto bailout;
}
} else {
/* в old_data буфер для сохранения предыдущего значения */
if (unlikely(new_data && old_data->iov_base == new_data->iov_base))
@@ -18340,33 +18397,19 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
goto bailout;
}
}
/* если данные совпадают, то ничего делать не надо */
if (new_data && /* the cmp_lenfast() must be used, see above */
cmp_lenfast(&present_data, new_data) == 0) {
*old_data = *new_data;
goto bailout;
}
/* В оригинальной LMDB флажок MDBX_CURRENT здесь приведет
* к замене данных без учета MDBX_DUPSORT сортировки,
* но здесь это в любом случае допустимо, так как мы
* проверили что для ключа есть только одно значение. */
} else if ((flags & MDBX_NODUPDATA) &&
cx.outer.mc_dbx->md_dcmp(&present_data, new_data) == 0) {
/* если данные совпадают и установлен MDBX_NODUPDATA */
rc = MDBX_KEYEXIST;
goto bailout;
}
} else {
/* если данные совпадают, то ничего делать не надо */
if (new_data && /* the cmp_lenfast() must be used, see comment above */
cmp_lenfast(&present_data, new_data) == 0) {
*old_data = *new_data;
goto bailout;
}
flags |= MDBX_CURRENT;
}
if (IS_DIRTY(page)) {
if (new_data && cmp_lenfast(&present_data, new_data) == 0) {
/* если данные совпадают, то ничего делать не надо */
*old_data = *new_data;
goto bailout;
}
rc = preserver ? preserver(preserver_context, old_data,
present_data.iov_base, present_data.iov_len)
: MDBX_SUCCESS;
@@ -18375,13 +18418,14 @@ int mdbx_replace_ex(MDBX_txn *txn, MDBX_dbi dbi, const MDBX_val *key,
} else {
*old_data = present_data;
}
flags |= MDBX_CURRENT;
}
}
if (likely(new_data))
rc = mdbx_cursor_put(&cx.outer, key, new_data, flags);
else
rc = mdbx_cursor_del(&cx.outer, 0);
rc = mdbx_cursor_del(&cx.outer, flags & MDBX_ALLDUPS);
bailout:
txn->mt_cursors[dbi] = cx.outer.mc_next;