mdbx: Merge branch 'positive' into 'devel' branch.

This commit is contained in:
Leo Yuriev 2017-01-08 14:27:31 +03:00
commit f8a14d7d54
5 changed files with 283 additions and 31 deletions

3
lmdb.h
View File

@ -354,7 +354,8 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
* For mdb_cursor_del: remove all duplicate data items.
*/
#define MDB_NODUPDATA 0x20
/** For mdb_cursor_put: overwrite the current key/data pair */
/** For mdb_cursor_put: overwrite the current key/data pair
* MDBX allows this flag for mdb_put() for explicit overwrite/update without insertion. */
#define MDB_CURRENT 0x40
/** For put: Just reserve space for data, don't copy it. Return a
* pointer to the reserved space.

102
mdb.c
View File

@ -782,6 +782,10 @@ typedef struct MDB_meta {
volatile uint64_t mm_datasync_sign;
#define META_IS_WEAK(meta) ((meta)->mm_datasync_sign == MDB_DATASIGN_WEAK)
#define META_IS_STEADY(meta) ((meta)->mm_datasync_sign > MDB_DATASIGN_WEAK)
#if MDBX_MODE_ENABLED
volatile mdbx_canary mm_canary;
#endif
} MDB_meta;
/** Buffer for a stack-allocated meta page.
@ -819,7 +823,7 @@ typedef struct MDB_dbx {
* Every operation requires a transaction handle.
*/
struct MDB_txn {
#define MDBX_MT_SIGNATURE (0x706C553B^MDBX_MODE_SALT)
#define MDBX_MT_SIGNATURE (0x93D53A31^MDBX_MODE_SALT)
unsigned mt_signature;
MDB_txn *mt_parent; /**< parent of a nested txn */
/** Nested txn under this txn, set together with flag #MDB_TXN_HAS_CHILD */
@ -906,6 +910,10 @@ struct MDB_txn {
* dirty_list into mt_parent after freeing hidden mt_parent pages.
*/
unsigned mt_dirty_room;
#if MDBX_MODE_ENABLED
mdbx_canary mt_canary;
#endif
};
/** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty.
@ -2847,6 +2855,9 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
txn->mt_next_pgno = meta->mm_last_pg+1;
/* Copy the DB info and flags */
memcpy(txn->mt_dbs, meta->mm_dbs, CORE_DBS * sizeof(MDB_db));
#if MDBX_MODE_ENABLED
txn->mt_canary = meta->mm_canary;
#endif
break;
}
}
@ -2863,6 +2874,9 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
pthread_mutex_lock(&tsan_mutex);
#endif
MDB_meta *meta = mdb_meta_head_w(env);
#if MDBX_MODE_ENABLED
txn->mt_canary = meta->mm_canary;
#endif
txn->mt_txnid = meta->mm_txnid + 1;
txn->mt_flags = flags;
#ifdef __SANITIZE_THREAD__
@ -3917,6 +3931,9 @@ mdb_txn_commit(MDB_txn *txn)
meta.mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI];
meta.mm_last_pg = txn->mt_next_pgno - 1;
meta.mm_txnid = txn->mt_txnid;
#if MDBX_MODE_ENABLED
meta.mm_canary = txn->mt_canary;
#endif
rc = mdb_env_sync0(env, env->me_flags | txn->mt_flags, &meta);
}
@ -4153,6 +4170,9 @@ mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending)
target->mm_dbs[FREE_DBI] = pending->mm_dbs[FREE_DBI];
target->mm_dbs[MAIN_DBI] = pending->mm_dbs[MAIN_DBI];
target->mm_last_pg = pending->mm_last_pg;
#if MDBX_MODE_ENABLED
target->mm_canary = pending->mm_canary;
#endif
/* LY: 'commit' the meta */
target->mm_txnid = pending->mm_txnid;
target->mm_datasync_sign = pending->mm_datasync_sign;
@ -6144,9 +6164,6 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_node *leaf = NULL;
DKBUF;
if (unlikely(key->mv_size == 0))
return MDB_BAD_VALSIZE;
if ( (mc->mc_db->md_flags & MDB_INTEGERKEY)
&& unlikely( key->mv_size != sizeof(unsigned)
&& key->mv_size != sizeof(size_t) )) {
@ -6317,7 +6334,6 @@ set1:
rc = 0;
}
*data = olddata;
} else {
if (mc->mc_xcursor)
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
@ -6462,6 +6478,12 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_GET_KEY(leaf, key);
if (data) {
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED))) {
mdb_xcursor_init1(mc, leaf);
rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
if (unlikely(rc))
break;
}
rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT);
} else {
rc = mdb_node_read(mc, leaf, data);
@ -6689,7 +6711,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
if (unlikely(key->mv_size-1 >= ENV_MAXKEY(env)))
if (unlikely(key->mv_size > ENV_MAXKEY(env)))
return MDB_BAD_VALSIZE;
#if SIZE_MAX > MAXDATASIZE
@ -6719,7 +6741,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
dkey.mv_size = 0;
if (flags == MDB_CURRENT) {
if (flags & MDB_CURRENT) {
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return EINVAL;
rc = MDB_SUCCESS;
@ -6910,6 +6932,7 @@ more:
break;
}
/* FALLTHRU: Big enough MDB_DUPFIXED sub-page */
case MDB_CURRENT | MDB_NODUPDATA:
case MDB_CURRENT:
fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
@ -7106,12 +7129,15 @@ put_sub:
xdata.mv_size = 0;
xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
xflags = MDB_NOSPILL;
if (flags & MDB_NODUPDATA)
xflags |= MDB_NOOVERWRITE;
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT|MDB_NOSPILL;
xflags |= MDB_CURRENT;
} else {
mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ?
MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
}
if (sub_root)
mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root;
@ -7145,8 +7171,6 @@ put_sub:
}
}
ecount = mc->mc_xcursor->mx_db.md_entries;
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf);
@ -7832,35 +7856,51 @@ mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
int
mdb_cursor_count(MDB_cursor *mc, size_t *countp)
{
MDB_node *leaf;
if (unlikely(mc == NULL || countp == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_xcursor == NULL))
return MDB_INCOMPATIBLE;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
return MDB_BAD_TXN;
if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return EINVAL;
#if MDBX_MODE_ENABLED
MDB_page *mp = mc->mc_pg[mc->mc_top];
int nkeys = NUMKEYS(mp);
if (!nkeys || mc->mc_ki[mc->mc_top] >= nkeys) {
*countp = 0;
return MDB_NOTFOUND;
} else if (mc->mc_xcursor == NULL || IS_LEAF2(mp)) {
*countp = 1;
} else {
MDB_node *leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
if (!F_ISSET(leaf->mn_flags, F_DUPDATA))
*countp = 1;
else if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)))
return EINVAL;
else
*countp = mc->mc_xcursor->mx_db.md_entries;
}
#else
if (unlikely(mc->mc_xcursor == NULL))
return MDB_INCOMPATIBLE;
if (unlikely(!mc->mc_snum || (mc->mc_flags & C_EOF)))
return MDB_NOTFOUND;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
*countp = 1;
} else {
if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)))
return EINVAL;
*countp = mc->mc_xcursor->mx_db.md_entries;
}
#endif /* MDBX_MODE_ENABLED */
return MDB_SUCCESS;
}
@ -9159,7 +9199,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
{
MDB_cursor mc;
MDB_xcursor mx;
int rc;
if (unlikely(!key || !data || !txn))
return EINVAL;
@ -9170,7 +9209,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID)))
return EINVAL;
if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)))
if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP
/* LY: MDB_CURRENT indicates explicit overwrite (update) for MDBX */
| (MDBX_MODE_ENABLED ? MDB_CURRENT : 0))))
return EINVAL;
if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
@ -9179,8 +9220,25 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_put(&mc, key, data, flags);
int rc = MDB_SUCCESS;
#if MDBX_MODE_ENABLED
/* LY: support for update (explicit overwrite) */
if (flags & MDB_CURRENT) {
rc = mdb_cursor_get(&mc, key, NULL, MDB_SET);
if (likely(rc == MDB_SUCCESS) && (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)) {
/* LY: allows update (explicit overwrite) only for unique keys */
MDB_node *leaf = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_tassert(txn, XCURSOR_INITED(&mc) && mc.mc_xcursor->mx_db.md_entries > 1);
rc = MDB_KEYEXIST;
}
}
}
#endif /* MDBX_MODE_ENABLED */
if (likely(rc == MDB_SUCCESS))
rc = mdb_cursor_put(&mc, key, data, flags);
txn->mt_cursors[dbi] = mc.mc_next;
return rc;
}

View File

@ -432,7 +432,6 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
fflush(NULL);
}
skipped_subdb++;
mdbx_dbi_close(env, dbi);
return MDB_SUCCESS;
}
@ -444,14 +443,12 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
rc = mdbx_dbi_flags(txn, dbi, &flags);
if (rc) {
error(" - mdbx_dbi_flags failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc;
}
rc = mdbx_stat(txn, dbi, &ms, sizeof(ms));
if (rc) {
error(" - mdbx_stat failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc;
}
@ -475,7 +472,6 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
rc = mdbx_cursor_open(txn, dbi, &mc);
if (rc) {
error(" - mdbx_cursor_open failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc;
}
@ -565,7 +561,6 @@ bailout:
}
mdbx_cursor_close(mc);
mdbx_dbi_close(env, dbi);
return rc || problems_count;
}
@ -686,7 +681,11 @@ int main(int argc, char *argv[])
}
maxkeysize = rc;
mdbx_env_set_maxdbs(env, 3);
rc = mdbx_env_set_maxdbs(env, MAX_DBI);
if (rc < 0) {
error("mdbx_env_set_maxdbs failed, error %d %s\n", rc, mdbx_strerror(rc));
goto bailout;
}
rc = mdbx_env_open_ex(env, envname, envflags, 0664, &exclusive);
if (rc) {
@ -747,7 +746,7 @@ int main(int argc, char *argv[])
meta_lt(info.me_meta1_txnid, info.me_meta1_sign,
info.me_meta2_txnid, info.me_meta2_sign) ? "tail" : "head");
if (info.me_meta1_txnid > info.base.me_last_txnid)
print(", rolled-back %zu (%zu >>> %zu)\n",
print(", rolled-back %zu (%zu >>> %zu)",
info.me_meta1_txnid - info.base.me_last_txnid,
info.me_meta1_txnid, info.base.me_last_txnid);
print("\n");
@ -757,7 +756,7 @@ int main(int argc, char *argv[])
meta_lt(info.me_meta2_txnid, info.me_meta2_sign,
info.me_meta1_txnid, info.me_meta1_sign) ? "tail" : "head");
if (info.me_meta2_txnid > info.base.me_last_txnid)
print(", rolled-back %zu (%zu >>> %zu)\n",
print(", rolled-back %zu (%zu >>> %zu)",
info.me_meta2_txnid - info.base.me_last_txnid,
info.me_meta2_txnid, info.base.me_last_txnid);
print("\n");

179
mdbx.c
View File

@ -325,3 +325,182 @@ mdbx_env_pgwalk(MDB_txn *txn, MDBX_pgvisitor_func* visitor, void* user)
rc = visitor(P_INVALID, 0, user, NULL, NULL, 0, 0, 0, 0);
return rc;
}
int mdbx_canary_put(MDB_txn *txn, const mdbx_canary* canary)
{
if (unlikely(!txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)))
return EACCES;
if (likely(canary)) {
txn->mt_canary.x = canary->x;
txn->mt_canary.y = canary->y;
txn->mt_canary.z = canary->z;
}
txn->mt_canary.v = txn->mt_txnid;
return MDB_SUCCESS;
}
size_t mdbx_canary_get(MDB_txn *txn, mdbx_canary* canary)
{
if(unlikely(!txn || txn->mt_signature != MDBX_MT_SIGNATURE))
return 0;
if (likely(canary))
*canary = txn->mt_canary;
return txn->mt_txnid;
}
int mdbx_cursor_eof(MDB_cursor *mc)
{
if (unlikely(mc == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
return (mc->mc_flags & C_INITIALIZED) ? 0 : 1;
}
static int mdbx_is_samedata(const MDB_val* a, const MDB_val* b) {
return a->iov_len == b->iov_len
&& memcmp(a->iov_base, b->iov_base, a->iov_len) == 0;
}
/* Позволяет обновить или удалить существующую запись с получением
* в old_data предыдущего значения данных. При этом если new_data равен
* нулю, то выполняется удаление, иначе обновление/вставка.
*
* Текущее значение может находиться в уже измененной (грязной) странице.
* В этом случае страница будет перезаписана при обновлении, а само старое
* значение утрачено. Поэтому исходно в old_data должен быть передан
* дополнительный буфер для копирования старого значения.
* Если переданный буфер слишком мал, то функция вернет -1, установив
* old_data->iov_len в соответствующее значение.
*
* Для не-уникальных ключей также возможен второй сценарий использования,
* когда посредством old_data из записей с одинаковым ключом для
* удаления/обновления выбирается конкретная. Для выбора этого сценария
* во flags следует одновременно указать MDB_CURRENT и MDB_NOOVERWRITE.
*
* Функция может быть замещена соответствующими операциями с курсорами
* после двух доработок (TODO):
* - внешняя аллокация курсоров, в том числе на стеке (без malloc).
* - получения статуса страницы по адресу (знать о P_DIRTY).
*/
int mdbx_replace(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *new_data, MDB_val *old_data, unsigned flags)
{
MDB_cursor mc;
MDB_xcursor mx;
if (unlikely(!key || !old_data || !txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(old_data->iov_base == NULL && old_data->iov_len))
return EINVAL;
if (unlikely(new_data == NULL && !(flags & MDB_CURRENT)))
return EINVAL;
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID)))
return EINVAL;
if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP|MDB_CURRENT)))
return EINVAL;
if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
int rc;
MDB_val present_key = *key;
if (F_ISSET(flags, MDB_CURRENT | MDB_NOOVERWRITE)
&& (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)) {
/* в old_data значение для выбора конкретного дубликата */
rc = mdbx_cursor_get(&mc, &present_key, old_data, MDB_GET_BOTH);
if (rc != MDB_SUCCESS)
goto bailout;
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(old_data, new_data))
goto bailout;
} else {
/* в old_data буфер получения предыдущего значения */
MDB_val present_data;
rc = mdbx_cursor_get(&mc, &present_key, &present_data, MDB_SET_KEY);
if (unlikely(rc != MDB_SUCCESS)) {
old_data->iov_base = NULL;
old_data->iov_len = rc;
if (rc != MDB_NOTFOUND || (flags & MDB_CURRENT))
goto bailout;
} else if (flags & MDB_NOOVERWRITE) {
rc = MDB_KEYEXIST;
*old_data = present_data;
goto bailout;
} else {
MDB_page *page = mc.mc_pg[mc.mc_top];
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
if (flags & MDB_CURRENT) {
/* для не-уникальных ключей позволяем update/delete только если ключ один */
MDB_node *leaf = NODEPTR(page, mc.mc_ki[mc.mc_top]);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_tassert(txn, XCURSOR_INITED(&mc) && mc.mc_xcursor->mx_db.md_entries > 1);
rc = MDB_KEYEXIST;
goto bailout;
}
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(&present_data, new_data)) {
*old_data = *new_data;
goto bailout;
}
} else if ((flags & MDB_NODUPDATA) && mdbx_is_samedata(&present_data, new_data)) {
/* если данные совпадают и установлен MDB_NODUPDATA */
rc = MDB_KEYEXIST;
goto bailout;
}
} else {
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(&present_data, new_data)) {
*old_data = *new_data;
goto bailout;
}
flags |= MDB_CURRENT;
}
if (page->mp_flags & P_DIRTY) {
if (unlikely(old_data->iov_len < present_data.iov_len)) {
old_data->iov_base = NULL;
old_data->iov_len = present_data.iov_len;
rc = -1;
goto bailout;
}
memcpy(old_data->iov_base, present_data.iov_base, present_data.iov_len);
old_data->iov_len = present_data.iov_len;
} else {
*old_data = present_data;
}
}
}
if (likely(new_data))
rc = mdbx_cursor_put(&mc, key, new_data, flags);
else
rc = mdbx_cursor_del(&mc, 0);
bailout:
txn->mt_cursors[dbi] = mc.mc_next;
return rc;
}

15
mdbx.h
View File

@ -211,6 +211,21 @@ typedef int MDBX_pgvisitor_func(size_t pgno, unsigned pgnumber, void* ctx,
const char* dbi, const char *type, int nentries,
int payload_bytes, int header_bytes, int unused_bytes);
int mdbx_env_pgwalk(MDB_txn *txn, MDBX_pgvisitor_func* visitor, void* ctx);
typedef struct mdbx_canary {
size_t x, y, z, v;
} mdbx_canary;
int mdbx_canary_put(MDB_txn *txn, const mdbx_canary* canary);
size_t mdbx_canary_get(MDB_txn *txn, mdbx_canary* canary);
/** Returns 1 when no more data available or cursor not positioned,
* 0 otherwise or less that zero in error case. */
int mdbx_cursor_eof(MDB_cursor *mc);
int mdbx_replace(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *new_data, MDB_val *old_data, unsigned flags);
/** @} */
#ifdef __cplusplus