mdbx: fix MDB_DUPSORT with MDB_CURRENT update bug.

This commit is contained in:
Leo Yuriev 2017-05-19 16:20:02 +03:00
parent 82d3595b76
commit d0793a1daf
2 changed files with 61 additions and 37 deletions

35
mdbx.h
View File

@ -1317,24 +1317,28 @@ LIBMDBX_API int mdbx_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
/* Store by cursor.
*
* This function stores key/data pairs into the database.
*
* The cursor is positioned at the new item, or on failure usually near it.
* Note: Earlier documentation incorrectly said errors would leave the
* state of the cursor unchanged.
* [in] cursor A cursor handle returned by mdbx_cursor_open()
* [in] key The key operated on.
* [in] data The data operated on.
* [in] flags Options for this operation. This parameter
* must be set to 0 or one of the values described here.
* - MDB_CURRENT - replace the item at the current cursor position.
* The key parameter must still be provided, and must match
*it.
* If using sorted duplicates (MDB_DUPSORT) the data item must
*still
* sort into the same place. This is intended to be used when the
* new data is the same size as the old. Otherwise it will simply
* perform a delete of the old record followed by an insert.
* - MDB_NODUPDATA - enter the new key/data pair only if it does not
* already appear in the database. This flag may only be
*
* [in] cursor A cursor handle returned by mdbx_cursor_open()
* [in] key The key operated on.
* [in] data The data operated on.
* [in] flags Options for this operation. This parameter
* must be set to 0 or one of the values described here.
*
* - MDB_CURRENT - replace the item at the current cursor position. The
* key parameter must still be provided, and must match it.
*
* If using sorted duplicates (MDB_DUPSORT) the data item
* must still sort into the same place. This is intended to
* be used when the new data is the same size as the old.
* Otherwise it will simply perform a delete of the old
* record followed by an insert.
*
* - MDB_NODUPDATA - enter the new key/data pair only if it does not already
* appear in the database. This flag may only be
*specified
* if the database was opened with MDB_DUPSORT. The function
*will
@ -1380,6 +1384,7 @@ LIBMDBX_API int mdbx_cursor_get(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
*
* Returns A non-zero error value on failure and 0 on success, some
* possible errors are:
* - MDBX_EKEYMISMATCH
* - MDB_MAP_FULL - the database is full, see mdbx_env_set_mapsize().
* - MDB_TXN_FULL - the transaction has too many dirty pages.
* - EACCES - an attempt was made to write in a read-only transaction.

View File

@ -5622,7 +5622,7 @@ static int mdbx_cursor_touch(MDB_cursor *mc) {
return rc;
}
/** Do not spill pages to disk if txn is getting full, may fail instead */
/* Do not spill pages to disk if txn is getting full, may fail instead */
#define MDB_NOSPILL 0x8000
int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
@ -5696,15 +5696,29 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
int dupdata_flag = 0;
if (flags & MDB_CURRENT) {
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return MDBX_EINVAL;
/* Опция MDB_CURRENT означает, что запрошено обновление текущей записи,
* на которой сейчас стоит курсор. Проверяем что переданный ключ совпадает
* со значением в текущей позиции курсора.
* Здесь проще вызвать mdbx_cursor_get(), так как для обслуживания таблиц
* с MDB_DUPSORT также требуется текущий размер данных. */
MDB_val current_key, current_data;
rc = mdbx_cursor_get(mc, &current_key, &current_data, MDB_GET_CURRENT);
if (unlikely(rc != MDB_SUCCESS))
return rc;
if (mc->mc_dbx->md_cmp(key, &current_key) != 0)
return MDBX_EKEYMISMATCH;
if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) {
MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdbx_cassert(mc,
mc->mc_xcursor != NULL &&
(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED));
if (mc->mc_xcursor->mx_db.md_entries > 1) {
/* Если за ключом более одного значения, либо если размер данных
* отличается, то вместо inplace обновления требуется удаление и
* последующая вставка. */
if (mc->mc_xcursor->mx_db.md_entries > 1 ||
current_data.mv_size != data->mv_size) {
rc = mdbx_cursor_del(mc, 0);
if (rc != MDB_SUCCESS)
return rc;
@ -5712,14 +5726,15 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
}
}
}
rc = MDB_SUCCESS;
} else if (mc->mc_db->md_root == P_INVALID) {
}
if (mc->mc_db->md_root == P_INVALID) {
/* new database, cursor has nothing to point to */
mc->mc_snum = 0;
mc->mc_top = 0;
mc->mc_flags &= ~C_INITIALIZED;
rc = MDB_NO_ROOT;
} else {
} else if ((flags & MDB_CURRENT) == 0) {
int exact = 0;
MDB_val d2;
if (flags & MDB_APPEND) {
@ -5790,8 +5805,7 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if ((mc->mc_db->md_flags & MDB_DUPSORT) &&
LEAFSIZE(key, data) > env->me_nodemax) {
/* Too big for a node, insert in sub-DB. Set up an empty
* "old sub-page" for prep_subDB to expand to a full page.
*/
* "old sub-page" for prep_subDB to expand to a full page. */
fp_flags = P_LEAF | P_DIRTY;
fp = env->me_pbuf;
fp->mp_leaf2_ksize = (uint16_t)data->mv_size; /* used if MDB_DUPFIXED */
@ -5810,8 +5824,7 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
memcpy(ptr, key->mv_data, ksize);
fix_parent:
/* if overwriting slot 0 of leaf, need to
* update branch key if there is a parent page
*/
* update branch key if there is a parent page */
if (mc->mc_top && !mc->mc_ki[mc->mc_top]) {
unsigned dtop = 1;
mc->mc_top--;
@ -5841,20 +5854,13 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
/* Prepare (sub-)page/sub-DB to accept the new item,
* if needed. fp: old sub-page or a header faking
* it. mp: new (sub-)page. offset: growth in page
* size. xdata: node data with new page or DB.
*/
* size. xdata: node data with new page or DB. */
unsigned i, offset = 0;
MDB_page *mp = fp = xdata.mv_data = env->me_pbuf;
mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno;
/* Was a single item before, must convert now */
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
/* Just overwrite the current item */
if (flags & MDB_CURRENT) {
if ((flags & MDB_NODUPDATA) && !mc->mc_dbx->md_dcmp(data, &olddata))
return MDB_KEYEXIST;
goto current;
}
/* does data match? */
if (!mc->mc_dbx->md_dcmp(data, &olddata)) {
@ -5864,6 +5870,10 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
goto current;
}
/* Just overwrite the current item */
if (flags & MDB_CURRENT)
goto current;
/* Back up original data item */
dupdata_flag = 1;
dkey.mv_size = olddata.mv_size;
@ -6029,16 +6039,25 @@ int mdbx_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if ((rc2 = mdbx_ovpage_free(mc, omp)) != MDB_SUCCESS)
return rc2;
} else if (data->mv_size == olddata.mv_size) {
assert(EVEN(key->mv_size) == EVEN(leaf->mn_ksize));
/* same size, just replace it. Note that we could
* also reuse this node if the new data is smaller,
* but instead we opt to shrink the node in that case.
*/
* but instead we opt to shrink the node in that case. */
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = olddata.mv_data;
else if (!(mc->mc_flags & C_SUB))
memcpy(olddata.mv_data, data->mv_data, data->mv_size);
else {
memcpy(NODEKEY(leaf), key->mv_data, key->mv_size);
assert(NUMKEYS(mc->mc_pg[mc->mc_top]) == 1);
assert(mc->mc_pg[mc->mc_top]->mp_upper ==
mc->mc_pg[mc->mc_top]->mp_lower);
assert(IS_LEAF(mc->mc_pg[mc->mc_top]) &&
!IS_LEAF2(mc->mc_pg[mc->mc_top]));
assert(NODEDSZ(leaf) == 0);
assert(leaf->mn_flags == 0);
memcpy(NODEKEY(leaf), key->mv_data, leaf->mn_ksize = key->mv_size);
assert((char *)NODEDATA(leaf) + NODEDSZ(leaf) <
(char *)(mc->mc_pg[mc->mc_top]) + env->me_psize);
goto fix_parent;
}
return MDB_SUCCESS;