mdbx: rework attributes.

Change-Id: Id9d436a54ac14ed82c593710b1d5939871c89d1a
This commit is contained in:
Leo Yuriev
2016-03-29 04:50:29 +03:00
parent ce06c8df9e
commit 46b8915087
4 changed files with 353 additions and 388 deletions

314
mdb.c
View File

@@ -591,11 +591,6 @@ typedef struct MDB_node {
/** @} */
unsigned short mn_flags; /**< @ref mdb_node */
unsigned short mn_ksize; /**< key size */
#if BYTE_ORDER == LITTLE_ENDIAN
unsigned int mn_attr_lo, mn_attr_hi; /**< node attribute */
#else
unsigned int mn_attr_hi, mn_attr_lo;
#endif
char mn_data[1]; /**< key and data are appended here */
} MDB_node;
@@ -640,13 +635,6 @@ typedef struct MDB_node {
(node)->mn_lo = (size) & 0xffff; (node)->mn_hi = (size) >> 16;} while(0)
/** The size of a key in a node */
#define NODEKSZ(node) ((node)->mn_ksize)
/** The attribute of the node as uint64_t */
#define NODEATTR(node) \
((uint64_t)(node)->mn_attr_lo | ((uint64_t)(node)->mn_attr_hi << 32))
/** Set node attribute */
#define SETATTR(node,attr) do { \
(node)->mn_attr_lo = (attr) & 0xffffffffUL; \
(node)->mn_attr_hi = (attr) >> 32; } while (0)
/** Copy a page number from src to dst */
#ifdef MISALIGNED_OK
@@ -1059,7 +1047,7 @@ static int mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst);
#define MDB_SPLIT_REPLACE MDB_APPENDDUP /**< newkey is not new */
static int mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata,
pgno_t newpgno, uint64_t newattr, unsigned nflags);
pgno_t newpgno, unsigned nflags);
static int mdb_env_read_header(MDB_env *env, MDB_meta *meta);
static int mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending);
@@ -1067,8 +1055,7 @@ static void mdb_env_close0(MDB_env *env);
static MDB_node *mdb_node_search(MDB_cursor *mc, MDB_val *key, int *exactp);
static int mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno,
uint64_t attr, unsigned flags);
MDB_val *key, MDB_val *data, pgno_t pgno, unsigned flags);
static void mdb_node_del(MDB_cursor *mc, int ksize);
static void mdb_node_shrink(MDB_page *mp, indx_t indx);
static int mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft);
@@ -1088,7 +1075,7 @@ static int mdb_cursor_sibling(MDB_cursor *mc, int move_right);
static int mdb_cursor_next(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op);
static int mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op);
static int mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op,
int *exactp, uint64_t *attrp);
int *exactp);
static int mdb_cursor_first(MDB_cursor *mc, MDB_val *key, MDB_val *data);
static int mdb_cursor_last(MDB_cursor *mc, MDB_val *key, MDB_val *data);
@@ -1100,7 +1087,6 @@ static void mdb_xcursor_init2(MDB_cursor *mc, MDB_xcursor *src_mx, int force);
static int mdb_drop0(MDB_cursor *mc, int subs);
static void mdb_default_cmp(MDB_txn *txn, MDB_dbi dbi);
static int mdb_reader_check0(MDB_env *env, int rlocked, int *dead);
static int mdb_cursor_touch(MDB_cursor *mc);
/** @cond */
static MDB_cmp_func mdb_cmp_memn, mdb_cmp_memnr, mdb_cmp_int_ai, mdb_cmp_int_a2, mdb_cmp_int_ua;
@@ -5779,36 +5765,7 @@ mdb_get(MDB_txn *txn, MDB_dbi dbi,
return MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
return mdb_cursor_set(&mc, key, data, MDB_SET, &exact, NULL);
}
int
mdb_cursor_get_attr(MDB_cursor *mc, MDB_val *key, MDB_val *data, uint64_t *attrp)
{
int exact = 0;
return mdb_cursor_set(mc, key, data, MDB_GET_ATTR, &exact, attrp);
}
int
mdb_get_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, uint64_t *attrp)
{
MDB_cursor mc;
MDB_xcursor mx;
if (!key || !attrp || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL;
/** TODO: implement support for DUPSORT? */
if (txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
return ENOTSUP;
if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) && !data)
return EINVAL;
if (txn->mt_flags & MDB_TXN_ERROR)
return MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
return mdb_cursor_get_attr(&mc, key, data, attrp);
return mdb_cursor_set(&mc, key, data, MDB_SET, &exact);
}
/** Find a sibling for a page.
@@ -6033,7 +5990,7 @@ mdb_cursor_prev(MDB_cursor *mc, MDB_val *key, MDB_val *data, MDB_cursor_op op)
/** Set the cursor on a specific data item. */
static int
mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_cursor_op op, int *exactp, uint64_t *attrp)
MDB_cursor_op op, int *exactp)
{
int rc;
MDB_page *mp;
@@ -6198,7 +6155,7 @@ set1:
} else {
ex2p = NULL;
}
rc = mdb_cursor_set(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p, attrp);
rc = mdb_cursor_set(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_SET_RANGE, ex2p);
if (unlikely(rc != MDB_SUCCESS))
return rc;
}
@@ -6222,9 +6179,6 @@ set1:
}
}
if (op == MDB_GET_ATTR)
*attrp = NODEATTR(leaf);
/* The key already matches in all other cases */
if (op == MDB_SET_RANGE || op == MDB_SET_KEY)
MDB_GET_KEY(leaf, key);
@@ -6387,7 +6341,7 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
rc = EINVAL;
} else {
rc = mdb_cursor_set(mc, key, data, op,
op == MDB_SET_RANGE ? NULL : &exact, NULL);
op == MDB_SET_RANGE ? NULL : &exact);
}
break;
case MDB_GET_MULTIPLE:
@@ -6555,7 +6509,7 @@ mdb_cursor_touch(MDB_cursor *mc)
#define MDB_NOSPILL 0x8000
int
mdb_cursor_put_attr(MDB_cursor *mc, MDB_val *key, MDB_val *data, uint64_t attr,
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned flags)
{
MDB_env *env;
@@ -6657,20 +6611,11 @@ mdb_cursor_put_attr(MDB_cursor *mc, MDB_val *key, MDB_val *data, uint64_t attr,
}
}
} else {
rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact, NULL);
rc = mdb_cursor_set(mc, key, &d2, MDB_SET, &exact);
}
if ((flags & MDB_NOOVERWRITE) && rc == 0) {
mdb_debug("duplicate key [%s]", DKEY(key));
*data = d2;
if (F_ISSET(flags, MDB_SETATTR)) {
/* make sure all cursor pages are writable */
rc2 = mdb_cursor_touch(mc);
if (rc2)
return rc2;
leaf = NODEPTR(mc->mc_pg[mc->mc_top],
mc->mc_ki[mc->mc_top]);
SETATTR(leaf, attr);
}
return MDB_KEYEXIST;
}
if (rc && unlikely(rc != MDB_NOTFOUND))
@@ -6947,8 +6892,6 @@ current:
omp = np;
}
SETDSZ(leaf, data->mv_size);
if (F_ISSET(flags, MDB_SETATTR))
SETATTR(leaf, attr);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = PAGEDATA(omp);
else
@@ -6963,8 +6906,6 @@ current:
* also reuse this node if the new data is smaller,
* but instead we opt to shrink the node in that case.
*/
if (F_ISSET(flags, MDB_SETATTR))
SETATTR(leaf, attr);
if (F_ISSET(flags, MDB_RESERVE))
data->mv_data = olddata.mv_data;
else if (!(mc->mc_flags & C_SUB))
@@ -6988,10 +6929,10 @@ new_sub:
nflags &= ~MDB_APPEND; /* sub-page may need room to grow */
if (!insert_key)
nflags |= MDB_SPLIT_REPLACE;
rc = mdb_page_split(mc, key, rdata, P_INVALID, attr, nflags);
rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags);
} else {
/* There is room already in this leaf page. */
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, attr, nflags);
rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags);
if (likely(rc == 0)) {
/* Adjust other cursors pointing to mp */
MDB_cursor *m2, *m3;
@@ -7113,41 +7054,6 @@ bad_sub:
return rc;
}
int
mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
unsigned int flags)
{
flags &= ~MDB_SETATTR;
return mdb_cursor_put_attr(mc, key, data, 0, flags);
}
int
mdb_set_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, uint64_t attr)
{
MDB_cursor mc;
MDB_xcursor mx;
MDB_val dummy, *rdata = data ? data : &dummy;
int rc, exact = 1;
if (!key || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL;
/** TODO: implement support for DUPSORT? */
if (txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
return ENOTSUP;
if ((txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED)) && !data)
return EINVAL;
if (txn->mt_flags & MDB_TXN_ERROR)
return MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
if ((rc = mdb_cursor_set(&mc, key, rdata, MDB_SET, &exact, NULL)) != MDB_SUCCESS)
return rc;
return mdb_cursor_put_attr(&mc, key, rdata, attr,
MDB_CURRENT|MDB_SETATTR);
}
int
mdb_cursor_del(MDB_cursor *mc, unsigned flags)
{
@@ -7363,7 +7269,7 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
*/
static int
mdb_node_add(MDB_cursor *mc, indx_t indx,
MDB_val *key, MDB_val *data, pgno_t pgno, uint64_t attr, unsigned flags)
MDB_val *key, MDB_val *data, pgno_t pgno, unsigned flags)
{
unsigned i;
size_t node_size = NODESIZE;
@@ -7445,10 +7351,9 @@ update:
node = NODEPTR(mp, indx);
node->mn_ksize = (key == NULL) ? 0 : key->mv_size;
node->mn_flags = flags;
if (IS_LEAF(mp)) {
if (IS_LEAF(mp))
SETDSZ(node,data->mv_size);
SETATTR(node,attr);
} else
else
SETPGNO(node,pgno);
if (key)
@@ -7906,7 +7811,7 @@ mdb_update_key(MDB_cursor *mc, MDB_val *key)
mdb_debug("Not enough room, delta = %d, splitting...", delta);
pgno = NODEPGNO(node);
mdb_node_del(mc, 0);
return mdb_page_split(mc, key, NULL, pgno, 0, MDB_SPLIT_REPLACE);
return mdb_page_split(mc, key, NULL, pgno, MDB_SPLIT_REPLACE);
}
numkeys = NUMKEYS(mp);
@@ -7959,7 +7864,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
{
MDB_node *srcnode;
MDB_val key, data;
uint64_t attr = 0UL;
pgno_t srcpg;
MDB_cursor mn;
int rc;
@@ -8007,7 +7911,6 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
}
data.mv_size = NODEDSZ(srcnode);
data.mv_data = NODEDATA(srcnode);
attr = NODEATTR(srcnode);
}
mn.mc_xcursor = NULL;
if (IS_BRANCH(cdst->mc_pg[cdst->mc_top]) && cdst->mc_ki[cdst->mc_top] == 0) {
@@ -8044,8 +7947,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
/* Add the node to the destination page.
*/
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg,
attr, flags);
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags);
if (unlikely(rc != MDB_SUCCESS))
return rc;
@@ -8224,7 +8126,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
key.mv_size = csrc->mc_db->md_xsize;
key.mv_data = PAGEDATA(psrc);
for (i = 0; i < NUMKEYS(psrc); i++, j++) {
rc = mdb_node_add(cdst, j, &key, NULL, 0, 0, 0);
rc = mdb_node_add(cdst, j, &key, NULL, 0, 0);
if (unlikely(rc != MDB_SUCCESS))
return rc;
key.mv_data = (char *)key.mv_data + key.mv_size;
@@ -8256,8 +8158,7 @@ mdb_page_merge(MDB_cursor *csrc, MDB_cursor *cdst)
data.mv_size = NODEDSZ(srcnode);
data.mv_data = NODEDATA(srcnode);
rc = mdb_node_add(cdst, j, &key, &data, NODEPGNO(srcnode),
NODEATTR(srcnode), srcnode->mn_flags);
rc = mdb_node_add(cdst, j, &key, &data, NODEPGNO(srcnode), srcnode->mn_flags);
if (unlikely(rc != MDB_SUCCESS))
return rc;
}
@@ -8672,7 +8573,7 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
xdata = NULL;
flags |= MDB_NODUPDATA;
}
rc = mdb_cursor_set(&mc, key, xdata, op, &exact, NULL);
rc = mdb_cursor_set(&mc, key, xdata, op, &exact);
if (likely(rc == 0)) {
/* let mdb_page_split know about this cursor if needed:
* delete will trigger a rebalance; if it needs to move
@@ -8698,13 +8599,12 @@ mdb_del0(MDB_txn *txn, MDB_dbi dbi,
* @param[in] newkey The key for the newly inserted node.
* @param[in] newdata The data for the newly inserted node.
* @param[in] newpgno The page number, if the new node is a branch node.
* @param[in] newattr The node attr for the newly inserted node.
* @param[in] nflags The #NODE_ADD_FLAGS for the new node.
* @return 0 on success, non-zero on failure.
*/
static int
mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno,
uint64_t newattr, unsigned nflags)
unsigned nflags)
{
unsigned flags;
int rc = MDB_SUCCESS, new_root = 0, did_split = 0;
@@ -8754,7 +8654,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
new_root = mc->mc_db->md_depth++;
/* Add left (implicit) pointer. */
if (unlikely((rc = mdb_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0, 0)) != MDB_SUCCESS)) {
if (unlikely((rc = mdb_node_add(mc, 0, NULL, NULL, mp->mp_pgno, 0)) != MDB_SUCCESS)) {
/* undo the pre-push */
mc->mc_pg[0] = mc->mc_pg[1];
mc->mc_ki[0] = mc->mc_ki[1];
@@ -8918,8 +8818,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mn.mc_top--;
did_split = 1;
/* We want other splits to find mn when doing fixups */
WITH_CURSOR_TRACKING(mn,
rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0, 0));
WITH_CURSOR_TRACKING(mn, rc = mdb_page_split(&mn, &sepkey, NULL, rp->mp_pgno, 0));
if (unlikely(rc != MDB_SUCCESS))
goto done;
@@ -8947,7 +8846,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
}
} else {
mn.mc_top--;
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0, 0);
rc = mdb_node_add(&mn, mn.mc_ki[ptop], &sepkey, NULL, rp->mp_pgno, 0);
mn.mc_top++;
}
if (unlikely(rc != MDB_SUCCESS))
@@ -8955,14 +8854,13 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
if (nflags & MDB_APPEND) {
mc->mc_pg[mc->mc_top] = rp;
mc->mc_ki[mc->mc_top] = 0;
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, newattr, nflags);
rc = mdb_node_add(mc, 0, newkey, newdata, newpgno, nflags);
if (rc)
goto done;
for (i=0; i<mc->mc_top; i++)
mc->mc_ki[i] = mn.mc_ki[i];
} else if (!IS_LEAF2(mp)) {
/* Move nodes */
uint64_t rattr;
mc->mc_pg[mc->mc_top] = rp;
i = split_indx;
j = 0;
@@ -8970,7 +8868,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
if (i == newindx) {
rkey.mv_data = newkey->mv_data;
rkey.mv_size = newkey->mv_size;
rattr = newattr;
if (IS_LEAF(mp)) {
rdata = newdata;
} else
@@ -8982,7 +8879,6 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
node = (MDB_node *)((char *)mp + copy->mp_ptrs[i] + PAGEBASE);
rkey.mv_data = NODEKEY(node);
rkey.mv_size = node->mn_ksize;
rattr = NODEATTR(node);
if (IS_LEAF(mp)) {
xdata.mv_data = NODEDATA(node);
xdata.mv_size = NODEDSZ(node);
@@ -8997,7 +8893,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
rkey.mv_size = 0;
}
rc = mdb_node_add(mc, j, &rkey, rdata, pgno, rattr, flags);
rc = mdb_node_add(mc, j, &rkey, rdata, pgno, flags);
if (rc)
goto done;
if (i == nkeys) {
@@ -9153,27 +9049,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
return rc;
}
int
mdb_put_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, uint64_t attr, unsigned int flags)
{
MDB_cursor mc;
MDB_xcursor mx;
if (!key || !data || dbi == FREE_DBI || !TXN_DBI_EXIST(txn, dbi, DB_USRVALID))
return EINVAL;
/** TODO: implement support for DUPSORT? */
if (txn->mt_dbs[dbi].md_flags & (MDB_DUPSORT|MDB_DUPFIXED))
return ENOTSUP;
if ((flags & (MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP)) != flags)
return EINVAL;
mdb_cursor_init(&mc, txn, dbi, &mx);
return mdb_cursor_put_attr(&mc, key, data, attr, flags | MDB_SETATTR);
}
#ifndef MDB_WBUF
#define MDB_WBUF (1024*1024)
#endif
@@ -9974,7 +9849,7 @@ int mdb_dbi_open(MDB_txn *txn, const char *name, unsigned flags, MDB_dbi *dbi)
key.mv_size = len;
key.mv_data = (void *)name;
mdb_cursor_init(&mc, txn, MAIN_DBI, NULL);
rc = mdb_cursor_set(&mc, &key, &data, MDB_SET, &exact, NULL);
rc = mdb_cursor_set(&mc, &key, &data, MDB_SET, &exact);
if (likely(rc == MDB_SUCCESS)) {
/* make sure this is actually a DB */
MDB_node *node = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]);
@@ -10709,6 +10584,143 @@ mdbx_env_pgwalk(MDB_txn *txn, MDB_pgvisitor_func* visitor, void* user)
return rc;
}
/* attribute support functions for Nexenta ***********************************/
static __inline int
mdbx_attr_peek(MDB_val *data, mdbx_attr_t *attrptr)
{
if (unlikely(data->mv_size < sizeof(mdbx_attr_t)))
return MDB_INCOMPATIBLE;
if (likely(attrptr != NULL))
*attrptr = *(mdbx_attr_t*) data->mv_data;
data->mv_size -= sizeof(mdbx_attr_t);
data->mv_data = likely(data->mv_size > 0)
? ((mdbx_attr_t*) data->mv_data) + 1 : NULL;
return MDB_SUCCESS;
}
static __inline int
mdbx_attr_poke(MDB_val *reserved, MDB_val *data, mdbx_attr_t attr, unsigned flags)
{
mdbx_attr_t *space = reserved->mv_data;
if (flags & MDB_RESERVE) {
if (likely(data != NULL)) {
data->mv_data = data->mv_size ? space + 1 : NULL;
}
} else {
*space = attr;
if (likely(data != NULL)) {
memcpy(space + 1, data->mv_data, data->mv_size );
}
}
return MDB_SUCCESS;
}
int
mdbx_cursor_get_attr(MDB_cursor *mc, MDB_val *key, MDB_val *data,
mdbx_attr_t *attrptr, MDB_cursor_op op)
{
int rc = mdbx_cursor_get(mc, key, data, op);
if (unlikely(rc != MDB_SUCCESS))
return rc;
return mdbx_attr_peek(data, attrptr);
}
int
mdbx_get_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, uint64_t *attrptr)
{
int rc = mdbx_get(txn, dbi, key, data);
if (unlikely(rc != MDB_SUCCESS))
return rc;
return mdbx_attr_peek(data, attrptr);
}
int
mdbx_put_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, mdbx_attr_t attr, unsigned flags)
{
MDB_val reserve = {
.mv_data = NULL,
.mv_size = (data ? data->mv_size : 0) + sizeof(mdbx_attr_t)
};
int rc = mdbx_put(txn, dbi, key, &reserve, flags | MDB_RESERVE);
if (unlikely(rc != MDB_SUCCESS))
return rc;
return mdbx_attr_poke(&reserve, data, attr, flags);
}
int mdbx_cursor_put_attr(MDB_cursor *cursor, MDB_val *key, MDB_val *data,
mdbx_attr_t attr, unsigned flags)
{
MDB_val reserve = {
.mv_data = NULL,
.mv_size = (data ? data->mv_size : 0) + sizeof(mdbx_attr_t)
};
int rc = mdbx_cursor_put(cursor, key, &reserve, flags | MDB_RESERVE);
if (unlikely(rc != MDB_SUCCESS))
return rc;
return mdbx_attr_poke(&reserve, data, attr, flags);
}
int mdbx_set_attr(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *data, mdbx_attr_t attr)
{
MDB_cursor mc;
MDB_xcursor mx;
MDB_val old_data;
mdbx_attr_t old_attr;
int rc;
if (unlikely(!key || !txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID)))
return EINVAL;
if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
rc = mdb_cursor_set(&mc, key, &old_data, MDB_SET, NULL);
if (unlikely(rc != MDB_SUCCESS)) {
if (rc == MDB_NOTFOUND && data) {
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdbx_cursor_put_attr(&mc, key, data, attr, 0);
txn->mt_cursors[dbi] = mc.mc_next;
}
return rc;
}
rc = mdbx_attr_peek(&old_data, &old_attr);
if (unlikely(rc != MDB_SUCCESS))
return rc;
if (old_attr == attr && (!data ||
(data->mv_size == old_data.mv_size
&& memcpy(data->mv_data, old_data.mv_data, old_data.mv_size) == 0)))
return MDB_SUCCESS;
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
rc = mdbx_cursor_put_attr(&mc, key, data ? data : &old_data, attr, MDB_CURRENT);
txn->mt_cursors[dbi] = mc.mc_next;
return rc;
}
#endif /* MDBX_MODE_ENABLED */
/** @} */