mdbx: support for txnid of the last modification for sub-dbs/kv-spaces.

Change-Id: Ifb6684df57608cda88aa9134b275f442358ff46d
This commit is contained in:
Leonid Yuriev 2019-10-25 13:51:21 +03:00
parent 64ecfe171a
commit 3ee269ddb6
3 changed files with 112 additions and 88 deletions

3
mdbx.h
View File

@ -1571,12 +1571,13 @@ LIBMDBX_API int mdbx_env_copy2fd(MDBX_env *env, mdbx_filehandle_t fd,
/* Statistics for a database in the environment */
typedef struct MDBX_stat {
uint32_t ms_psize; /* Size of a database page.
* This is currently the same for all databases. */
* This is the same for all databases. */
uint32_t ms_depth; /* Depth (height) of the B-tree */
uint64_t ms_branch_pages; /* Number of internal (non-leaf) pages */
uint64_t ms_leaf_pages; /* Number of leaf pages */
uint64_t ms_overflow_pages; /* Number of overflow pages */
uint64_t ms_entries; /* Number of data items */
uint64_t ms_mod_txnid; /* Transaction ID of commited last modification */
} MDBX_stat;
/* Return statistics about the MDBX environment.

View File

@ -2024,6 +2024,7 @@ static int __must_check_result mdbx_xcursor_init2(MDBX_cursor *mc,
int force);
static int __must_check_result mdbx_drop0(MDBX_cursor *mc, int subs);
static int __must_check_result mdbx_fetch_sdb(MDBX_txn *txn, MDBX_dbi dbi);
static MDBX_cmp_func mdbx_cmp_memn, mdbx_cmp_memnr, mdbx_cmp_int_align4,
mdbx_cmp_int_align2, mdbx_cmp_int_unaligned;
@ -5342,6 +5343,8 @@ static __cold int mdbx_audit_ex(MDBX_txn *txn, unsigned retired_stored,
for (unsigned j = 0; j < page_numkeys(mp); j++) {
MDBX_node *node = page_node(mp, j);
if (node_flags(node) == F_SUBDATA) {
if (unlikely(node_ds(node) < sizeof(MDBX_db)))
return MDBX_CORRUPTED;
MDBX_db db_copy, *db;
memcpy(db = &db_copy, node_data(node), sizeof(db_copy));
if ((txn->mt_flags & MDBX_RDONLY) == 0) {
@ -6500,7 +6503,9 @@ int mdbx_txn_commit(MDBX_txn *txn) {
rc = MDBX_BAD_DBI;
goto fail;
}
data.iov_base = &txn->mt_dbs[i];
MDBX_db *db = &txn->mt_dbs[i];
db->md_mod_txnid = txn->mt_txnid;
data.iov_base = db;
WITH_CURSOR_TRACKING(mc,
rc = mdbx_cursor_put(&mc, &txn->mt_dbxs[i].md_name,
&data, F_SUBDATA));
@ -6522,8 +6527,10 @@ int mdbx_txn_commit(MDBX_txn *txn) {
rc = mdbx_page_flush(txn, 0);
if (likely(rc == MDBX_SUCCESS)) {
MDBX_meta meta, *head = mdbx_meta_head(env);
if (txn->mt_dbs[MAIN_DBI].md_flags & DB_DIRTY)
txn->mt_dbs[MAIN_DBI].md_mod_txnid = txn->mt_txnid;
MDBX_meta meta, *head = mdbx_meta_head(env);
meta.mm_magic_and_version = head->mm_magic_and_version;
meta.mm_extra_flags = head->mm_extra_flags;
meta.mm_validator_id = head->mm_validator_id;
@ -8578,6 +8585,7 @@ int __cold mdbx_env_open(MDBX_env *env, const char *path, unsigned flags,
mdbx_debug("leaf pages: %" PRIaPGNO, db->md_leaf_pages);
mdbx_debug("overflow pages: %" PRIaPGNO, db->md_overflow_pages);
mdbx_debug("root: %" PRIaPGNO, db->md_root);
mdbx_debug("schema_altered: %" PRIaTXN, db->md_mod_txnid);
}
#endif
@ -9101,6 +9109,42 @@ __hot static int mdbx_page_search_root(MDBX_cursor *mc, MDBX_val *key,
return MDBX_SUCCESS;
}
static int mdbx_fetch_sdb(MDBX_txn *txn, MDBX_dbi dbi) {
MDBX_cursor mc;
if (unlikely(TXN_DBI_CHANGED(txn, dbi)))
return MDBX_BAD_DBI;
int rc = mdbx_cursor_init(&mc, txn, MAIN_DBI);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
rc = mdbx_page_search(&mc, &txn->mt_dbxs[dbi].md_name, 0);
if (unlikely(rc != MDBX_SUCCESS))
return (rc == MDBX_NOTFOUND) ? MDBX_BAD_DBI : rc;
MDBX_val data;
int exact = 0;
MDBX_node *node = mdbx_node_search(&mc, &txn->mt_dbxs[dbi].md_name, &exact);
if (unlikely(!exact))
return MDBX_BAD_DBI;
if (unlikely((node_flags(node) & (F_DUPDATA | F_SUBDATA)) != F_SUBDATA))
return MDBX_INCOMPATIBLE; /* not a named DB */
rc = mdbx_node_read(&mc, node, &data);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(data.iov_len < sizeof(MDBX_db)))
return MDBX_INCOMPATIBLE; /* not a named DB */
uint16_t md_flags = UNALIGNED_PEEK_16(data.iov_base, MDBX_db, md_flags);
/* The txn may not know this DBI, or another process may
* have dropped and recreated the DB with other flags. */
if (unlikely((txn->mt_dbs[dbi].md_flags & PERSISTENT_FLAGS) != md_flags))
return MDBX_INCOMPATIBLE;
memcpy(&txn->mt_dbs[dbi], data.iov_base, sizeof(MDBX_db));
txn->mt_dbflags[dbi] &= ~DB_STALE;
return MDBX_SUCCESS;
}
/* Search for the lowest key under the current branch page.
* This just bypasses a numkeys check in the current page
* before calling mdbx_page_search_root(), because the callers
@ -9148,35 +9192,9 @@ __hot static int mdbx_page_search(MDBX_cursor *mc, MDBX_val *key, int flags) {
/* Make sure we're using an up-to-date root */
if (unlikely(*mc->mc_dbflag & DB_STALE)) {
MDBX_cursor mc2;
if (unlikely(TXN_DBI_CHANGED(mc->mc_txn, mc->mc_dbi)))
return MDBX_BAD_DBI;
rc = mdbx_cursor_init(&mc2, mc->mc_txn, MAIN_DBI);
rc = mdbx_fetch_sdb(mc->mc_txn, mc->mc_dbi);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
rc = mdbx_page_search(&mc2, &mc->mc_dbx->md_name, 0);
if (unlikely(rc != MDBX_SUCCESS))
return (rc == MDBX_NOTFOUND) ? MDBX_BAD_DBI : rc;
{
MDBX_val data;
int exact = 0;
MDBX_node *node = mdbx_node_search(&mc2, &mc->mc_dbx->md_name, &exact);
if (!exact)
return MDBX_BAD_DBI;
if (unlikely((node_flags(node) & (F_DUPDATA | F_SUBDATA)) != F_SUBDATA))
return MDBX_INCOMPATIBLE; /* not a named DB */
rc = mdbx_node_read(&mc2, node, &data);
if (rc)
return rc;
uint16_t md_flags = UNALIGNED_PEEK_16(data.iov_base, MDBX_db, md_flags);
/* The txn may not know this DBI, or another process may
* have dropped and recreated the DB with other flags. */
if (unlikely((mc->mc_db->md_flags & PERSISTENT_FLAGS) != md_flags))
return MDBX_INCOMPATIBLE;
memcpy(mc->mc_db, data.iov_base, sizeof(MDBX_db));
}
*mc->mc_dbflag &= ~DB_STALE;
}
root = mc->mc_db->md_root;
@ -10080,7 +10098,7 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
MDBX_page *fp, *sub_root = NULL;
uint16_t fp_flags;
MDBX_val xdata, *rdata, dkey, olddata;
MDBX_db dummy;
MDBX_db nested_dupdb;
unsigned mcount = 0, dcount = 0, nospill;
size_t nsize;
int rc2;
@ -10485,30 +10503,30 @@ int mdbx_cursor_put(MDBX_cursor *mc, MDBX_val *key, MDBX_val *data,
/* Too big for a sub-page, convert to sub-DB */
fp_flags &= ~P_SUBP;
prep_subDB:
dummy.md_xsize = 0;
dummy.md_flags = 0;
nested_dupdb.md_xsize = 0;
nested_dupdb.md_flags = 0;
if (mc->mc_db->md_flags & MDBX_DUPFIXED) {
fp_flags |= P_LEAF2;
dummy.md_xsize = fp->mp_leaf2_ksize;
dummy.md_flags = MDBX_DUPFIXED;
nested_dupdb.md_xsize = fp->mp_leaf2_ksize;
nested_dupdb.md_flags = MDBX_DUPFIXED;
if (mc->mc_db->md_flags & MDBX_INTEGERDUP)
dummy.md_flags |= MDBX_INTEGERKEY;
nested_dupdb.md_flags |= MDBX_INTEGERKEY;
}
dummy.md_depth = 1;
dummy.md_branch_pages = 0;
dummy.md_leaf_pages = 1;
dummy.md_overflow_pages = 0;
dummy.md_entries = page_numkeys(fp);
xdata.iov_len = sizeof(MDBX_db);
xdata.iov_base = &dummy;
nested_dupdb.md_depth = 1;
nested_dupdb.md_branch_pages = 0;
nested_dupdb.md_leaf_pages = 1;
nested_dupdb.md_overflow_pages = 0;
nested_dupdb.md_entries = page_numkeys(fp);
xdata.iov_len = sizeof(nested_dupdb);
xdata.iov_base = &nested_dupdb;
if ((rc = mdbx_page_alloc(mc, 1, &mp, MDBX_ALLOC_ALL)))
return rc;
mc->mc_db->md_leaf_pages += 1;
mdbx_cassert(mc, env->me_psize > olddata.iov_len);
offset = env->me_psize - (unsigned)olddata.iov_len;
flags |= F_DUPDATA | F_SUBDATA;
dummy.md_root = mp->mp_pgno;
dummy.md_seq = dummy.md_merkle = 0;
nested_dupdb.md_root = mp->mp_pgno;
nested_dupdb.md_seq = nested_dupdb.md_mod_txnid = 0;
sub_root = mp;
}
if (mp != fp) {
@ -11290,12 +11308,16 @@ static int mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) {
return MDBX_CORRUPTED;
if (node_flags(node) & F_SUBDATA) {
if (unlikely(node_ds(node) != sizeof(MDBX_db)))
return MDBX_CORRUPTED;
memcpy(&mx->mx_db, node_data(node), sizeof(MDBX_db));
mx->mx_cursor.mc_pg[0] = 0;
mx->mx_cursor.mc_snum = 0;
mx->mx_cursor.mc_top = 0;
mx->mx_cursor.mc_flags = C_SUB;
} else {
if (unlikely(node_ds(node) <= PAGEHDRSZ))
return MDBX_CORRUPTED;
MDBX_page *fp = node_data(node);
mx->mx_db.md_xsize = 0;
mx->mx_db.md_flags = 0;
@ -11320,12 +11342,6 @@ static int mdbx_xcursor_init1(MDBX_cursor *mc, MDBX_node *node) {
mdbx_debug("Sub-db -%u root page %" PRIaPGNO, mx->mx_cursor.mc_dbi,
mx->mx_db.md_root);
mx->mx_dbflag = DB_VALID | DB_USRVALID | DB_DUPDATA;
/* FIXME: #if UINT_MAX < SIZE_MAX
if (mx->mx_dbx.md_cmp == mdbx_cmp_int && mx->mx_db.md_pad ==
sizeof(size_t))
mx->mx_dbx.md_cmp = mdbx_cmp_clong;
#endif */
return MDBX_SUCCESS;
}
@ -12402,8 +12418,8 @@ static __cold int mdbx_page_check(MDBX_env *env, const MDBX_page *const mp,
case 0 /* usual */:
break;
case F_SUBDATA /* sub-db */:
mdbx_assert(env, dsize == sizeof(MDBX_db));
if (unlikely(dsize != sizeof(MDBX_db)))
mdbx_assert(env, dsize >= sizeof(MDBX_db));
if (unlikely(dsize < sizeof(MDBX_db)))
return MDBX_CORRUPTED;
break;
case F_SUBDATA | F_DUPDATA /* dupsorted sub-tree */:
@ -13433,7 +13449,10 @@ static int __cold mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
toggle = my->mc_toggle;
}
} else if (node_flags(node) & F_SUBDATA) {
MDBX_db db;
if (node_ds(node) < sizeof(MDBX_db)) {
rc = MDBX_CORRUPTED;
goto done;
}
/* Need writable leaf */
if (mp != leaf) {
@ -13443,13 +13462,14 @@ static int __cold mdbx_env_cwalk(mdbx_copy *my, pgno_t *pg, int flags) {
node = page_node(mp, i);
}
memcpy(&db, node_data(node), sizeof(db));
MDBX_db db;
memcpy(&db, node_data(node), sizeof(MDBX_db));
my->mc_toggle = (short)toggle;
rc = mdbx_env_cwalk(my, &db.md_root, node_flags(node) & F_DUPDATA);
if (rc)
goto done;
toggle = my->mc_toggle;
memcpy(node_data(node), &db, sizeof(db));
memcpy(node_data(node), &db, sizeof(MDBX_db));
}
}
}
@ -13978,24 +13998,26 @@ int __cold mdbx_env_get_fd(MDBX_env *env, mdbx_filehandle_t *arg) {
* [in] db the MDBX_db record containing the stats to return.
* [out] arg the address of an MDBX_stat structure to receive the stats.
* Returns 0, this function always succeeds. */
static int __cold mdbx_stat0(const MDBX_env *env, const MDBX_db *db,
MDBX_stat *arg) {
arg->ms_psize = env->me_psize;
arg->ms_depth = db->md_depth;
arg->ms_branch_pages = db->md_branch_pages;
arg->ms_leaf_pages = db->md_leaf_pages;
arg->ms_overflow_pages = db->md_overflow_pages;
arg->ms_entries = db->md_entries;
return MDBX_SUCCESS;
static void mdbx_stat0(const MDBX_env *env, const MDBX_db *db, MDBX_stat *dest,
size_t bytes) {
dest->ms_psize = env->me_psize;
dest->ms_depth = db->md_depth;
dest->ms_branch_pages = db->md_branch_pages;
dest->ms_leaf_pages = db->md_leaf_pages;
dest->ms_overflow_pages = db->md_overflow_pages;
dest->ms_entries = db->md_entries;
if (likely(bytes >=
offsetof(MDBX_stat, ms_mod_txnid) + sizeof(dest->ms_mod_txnid)))
dest->ms_mod_txnid = db->md_mod_txnid;
}
int __cold mdbx_env_stat(MDBX_env *env, MDBX_stat *arg, size_t bytes) {
return mdbx_env_stat_ex(env, NULL, arg, bytes);
int __cold mdbx_env_stat(MDBX_env *env, MDBX_stat *dest, size_t bytes) {
return mdbx_env_stat_ex(env, NULL, dest, bytes);
}
int __cold mdbx_env_stat_ex(const MDBX_env *env, const MDBX_txn *txn,
MDBX_stat *arg, size_t bytes) {
if (unlikely((env == NULL && txn == NULL) || arg == NULL))
MDBX_stat *dest, size_t bytes) {
if (unlikely((env == NULL && txn == NULL) || dest == NULL))
return MDBX_EINVAL;
if (txn) {
@ -14010,18 +14032,19 @@ int __cold mdbx_env_stat_ex(const MDBX_env *env, const MDBX_txn *txn,
return MDBX_EINVAL;
}
if (unlikely(bytes != sizeof(MDBX_stat)))
const size_t size_before_modtxnid = offsetof(MDBX_stat, ms_mod_txnid);
if (unlikely(bytes != sizeof(MDBX_stat)) && bytes != size_before_modtxnid)
return MDBX_EINVAL;
if (txn)
return mdbx_stat0(txn->mt_env, &txn->mt_dbs[MAIN_DBI], arg);
if (txn) {
mdbx_stat0(txn->mt_env, &txn->mt_dbs[MAIN_DBI], dest, bytes);
return MDBX_SUCCESS;
}
while (1) {
const MDBX_meta *const recent_meta = mdbx_meta_head(env);
const txnid_t txnid = mdbx_meta_txnid_fluid(env, recent_meta);
const int err = mdbx_stat0(env, &recent_meta->mm_dbs[MAIN_DBI], arg);
if (unlikely(err != MDBX_SUCCESS))
return err;
mdbx_stat0(env, &recent_meta->mm_dbs[MAIN_DBI], dest, bytes);
mdbx_compiler_barrier();
if (likely(txnid == mdbx_meta_txnid_fluid(env, recent_meta) &&
recent_meta == mdbx_meta_head(env)))
@ -14292,6 +14315,8 @@ int mdbx_dbi_open_ex(MDBX_txn *txn, const char *table_name, unsigned user_flags,
MDBX_node *node = page_node(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]);
if (unlikely((node_flags(node) & (F_DUPDATA | F_SUBDATA)) != F_SUBDATA))
return MDBX_INCOMPATIBLE;
if (unlikely(data.iov_len < sizeof(MDBX_db)))
return MDBX_CORRUPTED;
}
if (rc != MDBX_SUCCESS && unlikely(txn->mt_flags & MDBX_RDONLY))
@ -14395,32 +14420,32 @@ int mdbx_dbi_open(MDBX_txn *txn, const char *table_name, unsigned table_flags,
return mdbx_dbi_open_ex(txn, table_name, table_flags, dbi, nullptr, nullptr);
}
int __cold mdbx_dbi_stat(MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *arg,
int __cold mdbx_dbi_stat(MDBX_txn *txn, MDBX_dbi dbi, MDBX_stat *dest,
size_t bytes) {
int rc = check_txn(txn, MDBX_TXN_BLOCKED);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
if (unlikely(!arg))
if (unlikely(!dest))
return MDBX_EINVAL;
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_VALID)))
return MDBX_EINVAL;
if (unlikely(bytes != sizeof(MDBX_stat)))
const size_t size_before_modtxnid = offsetof(MDBX_stat, ms_mod_txnid);
if (unlikely(bytes != sizeof(MDBX_stat)) && bytes != size_before_modtxnid)
return MDBX_EINVAL;
if (unlikely(txn->mt_flags & MDBX_TXN_BLOCKED))
return MDBX_BAD_TXN;
if (unlikely(txn->mt_dbflags[dbi] & DB_STALE)) {
MDBX_cursor_couple cx;
/* Stale, must read the DB's root. cursor_init does it for us. */
rc = mdbx_cursor_init(&cx.outer, txn, dbi);
rc = mdbx_fetch_sdb(txn, dbi);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}
return mdbx_stat0(txn->mt_env, &txn->mt_dbs[dbi], arg);
mdbx_stat0(txn->mt_env, &txn->mt_dbs[dbi], dest, bytes);
return MDBX_SUCCESS;
}
static int mdbx_dbi_close_locked(MDBX_env *env, MDBX_dbi dbi) {
@ -15232,9 +15257,9 @@ static int __cold mdbx_env_walk(mdbx_walk_ctx_t *ctx, const char *dbi,
case F_SUBDATA /* sub-db */: {
const size_t namelen = node_ks(node);
if (unlikely(namelen == 0 || node_ds(node) != sizeof(MDBX_db)))
if (unlikely(namelen == 0 || node_ds(node) < sizeof(MDBX_db)))
return MDBX_CORRUPTED;
payload_size += sizeof(MDBX_db);
payload_size += node_ds(node);
} break;
case F_SUBDATA | F_DUPDATA /* dupsorted sub-tree */: {
@ -16114,9 +16139,7 @@ int mdbx_dbi_sequence(MDBX_txn *txn, MDBX_dbi dbi, uint64_t *result,
return MDBX_BAD_DBI;
if (unlikely(txn->mt_dbflags[dbi] & DB_STALE)) {
MDBX_cursor_couple cx;
/* Stale, must read the DB's root. cursor_init does it for us. */
rc = mdbx_cursor_init(&cx.outer, txn, dbi);
rc = mdbx_fetch_sdb(txn, dbi);
if (unlikely(rc != MDBX_SUCCESS))
return rc;
}

View File

@ -365,7 +365,7 @@ typedef struct MDBX_db {
pgno_t md_overflow_pages; /* number of overflow pages */
uint64_t md_seq; /* table sequence counter */
uint64_t md_entries; /* number of data items */
uint64_t md_merkle; /* Merkle tree checksum */
uint64_t md_mod_txnid; /* txnid of last commited modification */
} MDBX_db;
/* database size-related parameters */