mdbx: Merge branch 'devel'.

Change-Id: I7e5916ef2a9be46aa912d7c3166a372370630b6b
This commit is contained in:
Leo Yuriev 2017-01-09 02:05:09 +03:00
commit bc0d45df09
10 changed files with 851 additions and 172 deletions

View File

@ -2,6 +2,9 @@ MDBX
Add MDB_PREV_MULTIPLE Add MDB_PREV_MULTIPLE
Add error MDB_PROBLEM, replace some MDB_CORRUPTED Add error MDB_PROBLEM, replace some MDB_CORRUPTED
LMDB 0.9.20 Release Engineering
Fix mdb_load with escaped plaintext (ITS#8558)
LMDB 0.9.19 Release (2016/12/28) LMDB 0.9.19 Release (2016/12/28)
Fix mdb_env_cwalk cursor init (ITS#8424) Fix mdb_env_cwalk cursor init (ITS#8424)
Fix robust mutexes on Solaris 10/11 (ITS#8339) Fix robust mutexes on Solaris 10/11 (ITS#8339)

View File

@ -24,8 +24,8 @@ suffix ?=
CC ?= gcc CC ?= gcc
XCFLAGS ?= -DNDEBUG=1 -DMDB_DEBUG=0 XCFLAGS ?= -DNDEBUG=1 -DMDB_DEBUG=0
CFLAGS ?= -O2 -g3 -Wall -Werror -Wextra CFLAGS ?= -O2 -g3 -Wall -Werror -Wextra -ffunction-sections
CFLAGS += -pthread $(XCFLAGS) CFLAGS += -std=gnu99 -pthread $(XCFLAGS)
# LY: for ability to built with modern glibc, # LY: for ability to built with modern glibc,
# but then run with the old # but then run with the old
@ -199,13 +199,15 @@ bench: bench-lmdb.txt bench-mdbx.txt
endif endif
ci-rule = ( CC=$$(which $1); if [ -n "$$CC" ]; then \ ci-rule = ( CC=$$(which $1); if [ -n "$$CC" ]; then \
echo -n "probe by $2 ($$CC): " && \ echo -n "probe by $2 ($$(readlink -f $$(which $$CC))): " && \
$(MAKE) clean >$1.log 2>$1.err && \ $(MAKE) clean >$1.log 2>$1.err && \
$(MAKE) CC=$$(readlink -f $$CC) XCFLAGS="-UNDEBUG -DMDB_DEBUG=2" all check 1>$1.log 2>$1.err && echo "OK" \ $(MAKE) CC=$$(readlink -f $$CC) XCFLAGS="-UNDEBUG -DMDB_DEBUG=2" all check 1>$1.log 2>$1.err && echo "OK" \
|| ( echo "FAILED"; cat $1.err >&2; exit 1 ); \ || ( echo "FAILED"; cat $1.err >&2; exit 1 ); \
else echo "no $2 ($1) for probe"; fi; ) else echo "no $2 ($1) for probe"; fi; )
ci: ci:
@if [ "$(CC)" != "gcc" ]; then \ @if [ "$$(readlink -f $$(which $(CC)))" != "$$(readlink -f $$(which gcc || echo /bin/false))" -a \
"$$(readlink -f $$(which $(CC)))" != "$$(readlink -f $$(which clang || echo /bin/false))" -a \
"$$(readlink -f $$(which $(CC)))" != "$$(readlink -f $$(which icc || echo /bin/false))" ]; then \
$(call ci-rule,$(CC),default C compiler); \ $(call ci-rule,$(CC),default C compiler); \
fi fi
@$(call ci-rule,gcc,GCC) @$(call ci-rule,gcc,GCC)

266
README.md Normal file
View File

@ -0,0 +1,266 @@
libmdbx
======================================
Extended LMDB, aka "Расширенная LMDB".
*The Future will Positive. Всё будет хорошо.*
[![Build Status](https://travis-ci.org/ReOpen/libmdbx.svg?branch=master)](https://travis-ci.org/ReOpen/libmdbx)
English version by Google [is
here](https://translate.googleusercontent.com/translate_c?act=url&depth=1&hl=ru&ie=UTF8&prev=_t&rurl=translate.google.com&sl=ru&tl=en&u=https://github.com/ReOpen/libmdbx/tree/master).
## Кратко
libmdbx - это встраиваемый key-value движок хранения со специфическим
набором возможностей, которые при правильном применении позволяют создавать
уникальные решения с чемпионской производительностью.
libmdbx является форком [Symas Lightning Memory-Mapped
Database](https://symas.com/products/lightning-memory-mapped-database/)
(известной под аббревиатурой
[LMDB](https://en.wikipedia.org/wiki/Lightning_Memory-Mapped_Database)), с
рядом существенных доработок, которые перечислены ниже.
Изначально модификация производилась в составе исходного кода проекта
[ReOpenLDAP](https://github.com/ReOpen/ReOpenLDAP). Примерно за год работы
внесенные изменения приобрели самостоятельную ценность.
Осенью 2015 доработанный движок был выделен в отдельный проект, который был
[представлен на конференции Highload++
2015](http://www.highload.ru/2015/abstracts/1831.html).
## Характеристики и ключевые особенности
### Общее для оригинальной LMDB и MDBX
* Данные хранятся в упорядоченном отображении (ordered map), ключи всегда
отсортированы, поддерживается выборка диапазонов (range lookups).
* Транзакции согласно [ACID](https://ru.wikipedia.org/wiki/ACID), посредством
[MVCC](https://ru.wikipedia.org/wiki/MVCC)
и [COW](https://ru.wikipedia.org/wiki/%D0%9A%D0%BE%D0%BF%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5_%D0%BF%D1%80%D0%B8_%D0%B7%D0%B0%D0%BF%D0%B8%D1%81%D0%B8).
* Чтение [без
блокировок](https://ru.wikipedia.org/wiki/%D0%9D%D0%B5%D0%B1%D0%BB%D0%BE%D0%BA%D0%B8%D1%80%D1%83%D1%8E%D1%89%D0%B0%D1%8F_%D1%81%D0%B8%D0%BD%D1%85%D1%80%D0%BE%D0%BD%D0%B8%D0%B7%D0%B0%D1%86%D0%B8%D1%8F),
без [атомарных операций](https://ru.wikipedia.org/wiki/%D0%90%D1%82%D0%BE%D0%BC%D0%B0%D1%80%D0%BD%D0%B0%D1%8F_%D0%BE%D0%BF%D0%B5%D1%80%D0%B0%D1%86%D0%B8%D1%8F).
Мьютексы захватываются только при старте и завершении сеанса работы с БД.
* Читатели не конкурируют между собой, чтение масштабируется линейно по ядрам CPU.
* Изменения строго последовательны и не блокируются чтением, конфликты между
транзакциями не возможны.
* Амортизационная стоимость любой операции Olog(N),
[WAF](https://en.wikipedia.org/wiki/Write_amplification) и RAF также Olog(N).
* Нет [WAL](https://en.wikipedia.org/wiki/Write-ahead_logging) и журнала
транзакций, после сбоев не требуется восстановление.
* Не требуется компактификация или какое-либо периодическое обслуживание.
* Эффективное хранение дубликатов (ключей с несколькими значениями) с
сортировкой значений.
* Эффективная поддержка ключей фиксированной длины (uint32_t, uint64_t).
* Поддержка горячего резервного копирования.
* Файл БД отображается в память кажлого процесса, который работает с БД. К
ключам и данным обеспечивается прямой доступ (без копирования), они не
меняются до завершения транзакции чтения.
* Отсутствует какое-либо внутреннее управление памятью или кэшированием. Всё
необходимое выполняет ядро ОС.
### Недостатки и Компромиссы
1. Единовременно может выполняться не более одной транзакция изменения данных
(один писатель). Зато все изменения всегда последовательны, не может быть
конфликтов или ошибок при откате транзакций.
2. Отсутствие [WAL](https://en.wikipedia.org/wiki/Write-ahead_logging)
обуславливает относительно большой
[WAF](https://en.wikipedia.org/wiki/Write_amplification). Поэтому фиксация
изменений на диске относительно дорога и является главным ограничителем для
производительности по записи. В качестве компромисса предлагается несколько
режимов ленивой и/или периодической фиксации. В том числе режим `WRITEMAP`,
при котором изменения происходят только в памяти и асинхронно фиксируются на
диске ядром ОС.
3. [COW](https://ru.wikipedia.org/wiki/%D0%9A%D0%BE%D0%BF%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5_%D0%BF%D1%80%D0%B8_%D0%B7%D0%B0%D0%BF%D0%B8%D1%81%D0%B8)
для реализации [MVCC](https://ru.wikipedia.org/wiki/MVCC) выполняется на
уровне страниц в [B+ дереве](https://ru.wikipedia.org/wiki/B-%D0%B4%D0%B5%D1%80%D0%B5%D0%B2%D0%BE).
Поэтому изменение данных амортизационно требует копирования Olog(N) страниц,
что расходует [пропускную способность оперативной
памяти](https://en.wikipedia.org/wiki/Memory_bandwidth) и является основным
ограничителем производительности в режиме `WRITEMAP`.
4. Проблема долгих чтений (зависших читателей), см. ниже.
5. Вероятность разрушения БД в режиме `WRITEMAP`, см ниже.
#### Проблема долгих чтений
Понимание проблемы требует некоторых пояснений, которые изложены ниже, но
могут быть сложны для быстрого восприятия. Поэтому, тезисно:
* Изменение данных на фоне долгой операции чтения может приводить к исчерпанию
места в БД.
* После чего любая попытка обновить данные будет приводить к ошибке `MAP_FULL`
до завершения долгой операции чтения.
* Характерными примерами долгих чтений являются горячее резервное копирования
и отладка клиентского приложения при активной транзакции чтения.
* В оригинальной LMDB после этого будет наблюдаться устойчивая деградация
производительности всех механизмов обратной записи на диск (в I/O контроллере,
в гипервизоре, в ядре ОС).
* В MDBX предусмотрен механизм аварийного прерывания таких операций, а также
режим `LIFO RECLAIM` устраняющий последующую деградацию производительности.
Операции чтения выполняются в контексте снимка данных (версии БД), который был
актуальным на момент старта транзакции чтения. Такой читаемый снимок
поддерживается неизменным до завершения операции. В свою очередь, это не
позволяет повторно использовать страницы БД в последующих версиях (снимках
БД).
Другими словами, если обновление данных выполняется на фоне долгой операции
чтения, то вместо повторного использования "старых" ненужных страниц будут
выделяться новые, так как "старые" страницы составляют снимок БД, который еще
используется долгой операцией чтения.
В результате, при интенсивном изменении данных и достаточно длительной
операции чтения, в БД могут быть исчерпаны свободные страницы, что не позволит
создавать новые снимки/версии БД. Такая ситуация будет сохраняться до
завершения операции чтения, которая использует старый снимок данных и
препятствует повторному использованию страниц БД.
Однако, на этом проблемы не заканчиваются. После описанной ситуации, все
дополнительные страницы, которые были выделены пока переработка старых была
невозможна, будут участвовать в цикле выделения/освобождения до конца жизни
экземпляра БД. В оригинальной LMDB этот цикл использования страниц работает по
принципу [FIFO](https://ru.wikipedia.org/wiki/FIFO). Поэтому увеличение
количества циркулирующий страниц, с точки зрения механизмов кэширования и/или
обратной записи, выглядит как увеличение рабочего набор данных. Проще говоря,
однократное попадание в ситуацию "уснувшего читателя" приводит к устойчивому
эффекту вымывания I/O кэша при всех последующих изменениях данных.
Для решения описанных проблемы в MDBX сделаны существенные доработки, см.
ниже. Иллюстрации к проблеме "долгих чтений" можно найти в [слайдах
презентации](http://www.slideshare.net/leoyuriev/lmdb). Там же приведен пример
количественной оценки прироста производительности за счет эффективной работы
[BBWC](https://en.wikipedia.org/wiki/BBWC) при включении `LIFO RECLAIM` в
MDBX.
## Доработки MDBX
1. Режим `LIFO RECLAIM`.
Для повторного использования выбираются не самые старые, а самые новые
страницы из доступных. За счет этого цикл использования страниц всегда
имеет минимальную длину и не зависит от общего числа выделенных страниц.
В результате механизмы кэширования и обратной записи работают с
максимально возможной эффективностью. В случае использования контроллера
дисков или системы хранения с [BBWC](https://en.wikipedia.org/wiki/BBWC)
возможно многократное увеличение производительности по записи
(обновлению данных).
2. Обработчик `OOM-KICK`.
Посредством `mdbx_env_set_oomfunc()` может быть
установлен внешний обработчик (callback), который будет вызван при исчерпания
свободных страниц из-за долгой операцией чтения. Обработчику будет передан PID
и pthread_id. В свою очередь обработчик может предпринять одно из действий:
* отправить сигнал kill (#9), если долгое чтение выполняется сторонним процессом;
* отменить или перезапустить проблемную операцию чтения, если операция
выполняется одним из потоков текущего процесса;
* подождать некоторое время, в расчете что проблемная операция чтения будет
штатно завершена;
* перервать текущую операцию изменения данных с возвратом кода ошибки.
3. Гарантия сохранности БД в режиме `WRITEMAP`.
При работе в режиме `WRITEMAP` запись измененных страниц выполняется ядром ОС,
что имеет ряд преимуществ. Так например, при крахе приложения, ядро ОС
сохранит все изменения.
Однако, при аварийном отключении питания или сбое в ядре ОС, на диске будет
сохранена только часть измененных страниц БД. При этом с большой вероятностью
может оказаться так, что будут сохранены мета-страницы со ссылками на страницы
с новыми версиями данных, но не сами новые данные. В этом случае БД будет
безвозвратна разрушена, даже если до аварии производилась полная синхронизация
данных (посредством `mdb_env_sync()`).
В MDBX эта проблема решена путем полной переработки пути записи данных:
* В режиме `WRITEMAP` MDBX не обновляет мета-страницы непосредственно,
а поддерживает их теневые копии с переносом изменений после фиксации
данных.
* При завершении транзакций, в зависимости от состояния
синхронности данных между диском и оперативной память, MDBX помечает
точки фиксации либо как сильные (strong), либо как слабые (weak). Так
например, в режиме `WRITEMAP` завершаемые транзакции помечаются как
слабые, а при явной синхронизации данных как сильные.
* При открытии БД
выполняется автоматический откат к последней сильной фиксации. Этим
обеспечивается гарантия сохранности БД.
К сожалению, такая гарантия надежности не дается бесплатно. Для сохранности
данных, страницы формирующие крайний снимок с сильной фиксацией, не должны
повторно использоваться (перезаписываться) до формирования следующей сильной
точки фиксации. Таким образом, крайняя точки фиксации создает описанный выше
эффект "долгого чтения", с разницей в том, что при исчерпании свободных
страниц автоматически будет сформирована новая точка сильной фиксации.
В последующих версиях MDBX будут предусмотрены средства для асинхронной записи
данных на диск с формированием сильных точек фиксации.
4. Возможность автоматического формирования контрольных точек (сброса данных
на диск) при накоплении заданного объёма изменений, устанавливаемого функцией
`mdbx_env_set_syncbytes()`.
5. Возможность получить отставание текущей транзакции чтения от последней
версии данных в БД посредством `mdbx_txn_straggler()`.
6. Утилита mdbx_chk для проверки БД и функция `mdbx_env_pgwalk()` для обхода
всех страниц БД.
7. Управление отладкой и получение отладочных сообщений посредством
`mdbx_setup_debug()`.
8. Возможность связать с каждой завершаемой транзакцией до 3 дополнительных
маркеров посредством `mdbx_canary_put()`, и прочитать их в транзакции чтения
посредством `mdbx_canary_get()`.
9. Возможность узнать есть ли за текущей позицией курсора строка данных
посредством `mdbx_cursor_eof()`.
10. Возможность явно запросить обновление существующей записи, без создания
новой посредством флажка `MDB_CURRENT` для `mdb_put()`.
11. Возможность обновить или удалить запись с получением предыдущего значения
данных посредством `mdbx_replace()`.
12. Поддержка ключей нулевого размера.
13. Исправленный вариант `mdb_cursor_count()`, возвращающий корректное
количество дубликатов для всех типов таблиц и любого положения курсора.
14. Возможность открыть БД в эксклюзивном режиме посредством
`mdbx_env_open_ex()`, например в целях её проверки.
15. Возможность закрыть БД в "грязном" состоянии (без сброса данных и
формирования сильной точки фиксации) посредством `mdbx_env_close_ex()`.
16. Возможность получить посредством `mdbx_env_info()` дополнительную
информацию, включая номер самой старой версии БД (снимка данных), который
используется одним из читателей.

7
lmdb.h
View File

@ -1,7 +1,7 @@
/** @file lmdb.h /** @file lmdb.h
* @brief Reliable Lightning memory-mapped database library * @brief Extended Lightning memory-mapped database library
* *
* @mainpage Reliable Lightning Memory-Mapped Database Manager (MDBX) * @mainpage Extended Lightning Memory-Mapped Database Manager (MDBX)
* *
* @section intro_sec Introduction * @section intro_sec Introduction
* MDBX is a Btree-based database management library modeled loosely on the * MDBX is a Btree-based database management library modeled loosely on the
@ -354,7 +354,8 @@ typedef void (MDB_rel_func)(MDB_val *item, void *oldptr, void *newptr, void *rel
* For mdb_cursor_del: remove all duplicate data items. * For mdb_cursor_del: remove all duplicate data items.
*/ */
#define MDB_NODUPDATA 0x20 #define MDB_NODUPDATA 0x20
/** For mdb_cursor_put: overwrite the current key/data pair */ /** For mdb_cursor_put: overwrite the current key/data pair
* MDBX allows this flag for mdb_put() for explicit overwrite/update without insertion. */
#define MDB_CURRENT 0x40 #define MDB_CURRENT 0x40
/** For put: Just reserve space for data, don't copy it. Return a /** For put: Just reserve space for data, don't copy it. Return a
* pointer to the reserved space. * pointer to the reserved space.

504
mdb.c
View File

@ -431,8 +431,6 @@ typedef struct MDB_rxbody {
volatile pid_t mrb_pid; volatile pid_t mrb_pid;
/** The thread ID of the thread owning this txn. */ /** The thread ID of the thread owning this txn. */
volatile pthread_t mrb_tid; volatile pthread_t mrb_tid;
/** Pointer to the context for deferred cleanup reader thread. */
struct MDB_rthc *mrb_rthc;
} MDB_rxbody; } MDB_rxbody;
/** The actual reader record, with cacheline padding. */ /** The actual reader record, with cacheline padding. */
@ -443,7 +441,6 @@ typedef struct MDB_reader {
#define mr_txnid mru.mrx.mrb_txnid #define mr_txnid mru.mrx.mrb_txnid
#define mr_pid mru.mrx.mrb_pid #define mr_pid mru.mrx.mrb_pid
#define mr_tid mru.mrx.mrb_tid #define mr_tid mru.mrx.mrb_tid
#define mr_rthc mru.mrx.mrb_rthc
/** cache line alignment */ /** cache line alignment */
char pad[(sizeof(MDB_rxbody)+CACHELINE_SIZE-1) & ~(CACHELINE_SIZE-1)]; char pad[(sizeof(MDB_rxbody)+CACHELINE_SIZE-1) & ~(CACHELINE_SIZE-1)];
} mru; } mru;
@ -785,6 +782,10 @@ typedef struct MDB_meta {
volatile uint64_t mm_datasync_sign; volatile uint64_t mm_datasync_sign;
#define META_IS_WEAK(meta) ((meta)->mm_datasync_sign == MDB_DATASIGN_WEAK) #define META_IS_WEAK(meta) ((meta)->mm_datasync_sign == MDB_DATASIGN_WEAK)
#define META_IS_STEADY(meta) ((meta)->mm_datasync_sign > MDB_DATASIGN_WEAK) #define META_IS_STEADY(meta) ((meta)->mm_datasync_sign > MDB_DATASIGN_WEAK)
#if MDBX_MODE_ENABLED
volatile mdbx_canary mm_canary;
#endif
} MDB_meta; } MDB_meta;
/** Buffer for a stack-allocated meta page. /** Buffer for a stack-allocated meta page.
@ -822,7 +823,7 @@ typedef struct MDB_dbx {
* Every operation requires a transaction handle. * Every operation requires a transaction handle.
*/ */
struct MDB_txn { struct MDB_txn {
#define MDBX_MT_SIGNATURE (0x706C553B^MDBX_MODE_SALT) #define MDBX_MT_SIGNATURE (0x93D53A31^MDBX_MODE_SALT)
unsigned mt_signature; unsigned mt_signature;
MDB_txn *mt_parent; /**< parent of a nested txn */ MDB_txn *mt_parent; /**< parent of a nested txn */
/** Nested txn under this txn, set together with flag #MDB_TXN_HAS_CHILD */ /** Nested txn under this txn, set together with flag #MDB_TXN_HAS_CHILD */
@ -909,6 +910,10 @@ struct MDB_txn {
* dirty_list into mt_parent after freeing hidden mt_parent pages. * dirty_list into mt_parent after freeing hidden mt_parent pages.
*/ */
unsigned mt_dirty_room; unsigned mt_dirty_room;
#if MDBX_MODE_ENABLED
mdbx_canary mt_canary;
#endif
}; };
/** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty. /** Enough space for 2^32 nodes with minimum of 2 keys per node. I.e., plenty.
@ -1004,9 +1009,14 @@ typedef struct MDB_pgstate {
/** Context for deferred cleanup of reader's threads. /** Context for deferred cleanup of reader's threads.
* to avoid https://github.com/ReOpen/ReOpenLDAP/issues/48 */ * to avoid https://github.com/ReOpen/ReOpenLDAP/issues/48 */
struct MDB_rthc { typedef struct MDBX_rthc {
struct MDBX_rthc *rc_next;
pthread_t rc_thread;
MDB_reader *rc_reader; MDB_reader *rc_reader;
}; } MDBX_rthc;
static MDBX_rthc* mdbx_rthc_get(pthread_key_t key);
/** The database environment. */ /** The database environment. */
struct MDB_env { struct MDB_env {
#define MDBX_ME_SIGNATURE (0x9A899641^MDBX_MODE_SALT) #define MDBX_ME_SIGNATURE (0x9A899641^MDBX_MODE_SALT)
@ -1367,8 +1377,7 @@ mdb_dkey(MDB_val *key, char *buf)
if (key->mv_size > DKBUF_MAXKEYSIZE) if (key->mv_size > DKBUF_MAXKEYSIZE)
return "MDB_MAXKEYSIZE"; return "MDB_MAXKEYSIZE";
/* may want to make this a dynamic check: if the key is mostly /* may want to make this a dynamic check: if the key is mostly
* printable characters, print it as-is instead of converting to hex. * printable characters, print it as-is instead of converting to hex. */
*/
#if 1 #if 1
buf[0] = '\0'; buf[0] = '\0';
for (i=0; i<key->mv_size; i++) for (i=0; i<key->mv_size; i++)
@ -1576,8 +1585,7 @@ mdb_page_malloc(MDB_txn *txn, unsigned num)
if ((env->me_flags & MDB_NOMEMINIT) == 0) { if ((env->me_flags & MDB_NOMEMINIT) == 0) {
/* For a single page alloc, we init everything after the page header. /* For a single page alloc, we init everything after the page header.
* For multi-page, we init the final page; if the caller needed that * For multi-page, we init the final page; if the caller needed that
* many pages they will be filling in at least up to the last page. * many pages they will be filling in at least up to the last page. */
*/
size_t skip = PAGEHDRSZ; size_t skip = PAGEHDRSZ;
if (num > 1) if (num > 1)
skip += (num - 1) * env->me_psize; skip += (num - 1) * env->me_psize;
@ -1671,8 +1679,7 @@ mdb_page_loose(MDB_cursor *mc, MDB_page *mp)
if (txn->mt_parent) { if (txn->mt_parent) {
MDB_ID2 *dl = txn->mt_u.dirty_list; MDB_ID2 *dl = txn->mt_u.dirty_list;
/* If txn has a parent, make sure the page is in our /* If txn has a parent, make sure the page is in our
* dirty list. * dirty list. */
*/
if (dl[0].mid) { if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno); unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) { if (x <= dl[0].mid && dl[x].mid == pgno) {
@ -1862,8 +1869,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
* turns out to be a lot of wasted effort because in a large txn many * turns out to be a lot of wasted effort because in a large txn many
* of those pages will need to be used again. So now we spill only 1/8th * of those pages will need to be used again. So now we spill only 1/8th
* of the dirty pages. Testing revealed this to be a good tradeoff, * of the dirty pages. Testing revealed this to be a good tradeoff,
* better than 1/2, 1/4, or 1/10. * better than 1/2, 1/4, or 1/10. */
*/
if (need < MDB_IDL_UM_MAX / 8) if (need < MDB_IDL_UM_MAX / 8)
need = MDB_IDL_UM_MAX / 8; need = MDB_IDL_UM_MAX / 8;
@ -1875,8 +1881,7 @@ mdb_page_spill(MDB_cursor *m0, MDB_val *key, MDB_val *data)
if (dp->mp_flags & (P_LOOSE|P_KEEP)) if (dp->mp_flags & (P_LOOSE|P_KEEP))
continue; continue;
/* Can't spill twice, make sure it's not already in a parent's /* Can't spill twice, make sure it's not already in a parent's
* spill list. * spill list. */
*/
if (txn->mt_parent) { if (txn->mt_parent) {
MDB_txn *tx2; MDB_txn *tx2;
for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) { for (tx2 = txn->mt_parent; tx2; tx2 = tx2->mt_parent) {
@ -2124,8 +2129,7 @@ mdb_page_alloc(MDB_cursor *mc, int num, MDB_page **mp, int flags)
pgno_t *idl; pgno_t *idl;
/* Seek a big enough contiguous page range. Prefer /* Seek a big enough contiguous page range. Prefer
* pages at the tail, just truncating the list. * pages at the tail, just truncating the list. */
*/
if (likely(flags & MDBX_ALLOC_CACHE) if (likely(flags & MDBX_ALLOC_CACHE)
&& mop_len > n2 && mop_len > n2
&& ( !(flags & MDBX_COALESCE) || op == MDB_FIRST)) { && ( !(flags & MDBX_COALESCE) || op == MDB_FIRST)) {
@ -2405,8 +2409,7 @@ mdb_page_copy(MDB_page *dst, MDB_page *src, unsigned psize)
indx_t upper = src->mp_upper, lower = src->mp_lower, unused = upper-lower; indx_t upper = src->mp_upper, lower = src->mp_lower, unused = upper-lower;
/* If page isn't full, just copy the used portion. Adjust /* If page isn't full, just copy the used portion. Adjust
* alignment so memcpy may copy words instead of bytes. * alignment so memcpy may copy words instead of bytes. */
*/
if ((unused &= -Align) && !IS_LEAF2(src)) { if ((unused &= -Align) && !IS_LEAF2(src)) {
upper = (upper + PAGEBASE) & -Align; upper = (upper + PAGEBASE) & -Align;
memcpy(dst, src, (lower + PAGEBASE + (Align-1)) & -Align); memcpy(dst, src, (lower + PAGEBASE + (Align-1)) & -Align);
@ -2460,8 +2463,7 @@ mdb_page_unspill(MDB_txn *txn, MDB_page *mp, MDB_page **ret)
if (tx2 == txn) { if (tx2 == txn) {
/* If in current txn, this page is no longer spilled. /* If in current txn, this page is no longer spilled.
* If it happens to be the last page, truncate the spill list. * If it happens to be the last page, truncate the spill list.
* Otherwise mark it as deleted by setting the LSB. * Otherwise mark it as deleted by setting the LSB. */
*/
if (x == txn->mt_spill_pgs[0]) if (x == txn->mt_spill_pgs[0])
txn->mt_spill_pgs[0]--; txn->mt_spill_pgs[0]--;
else else
@ -2521,8 +2523,7 @@ mdb_page_touch(MDB_cursor *mc)
MDB_ID2 mid, *dl = txn->mt_u.dirty_list; MDB_ID2 mid, *dl = txn->mt_u.dirty_list;
pgno = mp->mp_pgno; pgno = mp->mp_pgno;
/* If txn has a parent, make sure the page is in our /* If txn has a parent, make sure the page is in our
* dirty list. * dirty list. */
*/
if (dl[0].mid) { if (dl[0].mid) {
unsigned x = mdb_mid2l_search(dl, pgno); unsigned x = mdb_mid2l_search(dl, pgno);
if (x <= dl[0].mid && dl[x].mid == pgno) { if (x <= dl[0].mid && dl[x].mid == pgno) {
@ -2665,8 +2666,7 @@ mdb_cursor_shadow(MDB_txn *src, MDB_txn *dst)
mc->mc_db = &dst->mt_dbs[i]; mc->mc_db = &dst->mt_dbs[i];
/* Kill pointers into src to reduce abuse: The /* Kill pointers into src to reduce abuse: The
* user may not use mc until dst ends. But we need a valid * user may not use mc until dst ends. But we need a valid
* txn pointer here for cursor fixups to keep working. * txn pointer here for cursor fixups to keep working. */
*/
mc->mc_txn = dst; mc->mc_txn = dst;
mc->mc_dbflag = &dst->mt_dbflags[i]; mc->mc_dbflag = &dst->mt_dbflags[i];
if ((mx = mc->mc_xcursor) != NULL) { if ((mx = mc->mc_xcursor) != NULL) {
@ -2764,28 +2764,19 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
} }
if (flags & MDB_TXN_RDONLY) { if (flags & MDB_TXN_RDONLY) {
struct MDB_rthc *rthc = NULL; MDBX_rthc *rthc = NULL;
MDB_reader *r = NULL; MDB_reader *r = NULL;
txn->mt_flags = MDB_TXN_RDONLY; txn->mt_flags = MDB_TXN_RDONLY;
if (likely(env->me_flags & MDB_ENV_TXKEY)) { if (likely(env->me_flags & MDB_ENV_TXKEY)) {
mdb_assert(env, !(env->me_flags & MDB_NOTLS)); mdb_assert(env, !(env->me_flags & MDB_NOTLS));
rthc = pthread_getspecific(env->me_txkey); rthc = mdbx_rthc_get(env->me_txkey);
if (unlikely(! rthc)) { if (unlikely(! rthc))
rthc = calloc(1, sizeof(struct MDB_rthc)); return ENOMEM;
if (unlikely(! rthc)) if (likely(rthc->rc_reader)) {
return ENOMEM; r = rthc->rc_reader;
rc = pthread_setspecific(env->me_txkey, rthc);
if (unlikely(rc)) {
free(rthc);
return rc;
}
}
r = rthc->rc_reader;
if (r) {
mdb_assert(env, r->mr_pid == env->me_pid); mdb_assert(env, r->mr_pid == env->me_pid);
mdb_assert(env, r->mr_tid == pthread_self()); mdb_assert(env, r->mr_tid == pthread_self());
mdb_assert(env, r->mr_rthc == rthc);
} }
} else { } else {
mdb_assert(env, env->me_flags & MDB_NOTLS); mdb_assert(env, env->me_flags & MDB_NOTLS);
@ -2826,8 +2817,7 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
* uses the reader table un-mutexed: First reset the * uses the reader table un-mutexed: First reset the
* slot, next publish it in mti_numreaders. After * slot, next publish it in mti_numreaders. After
* that, it is safe for mdb_env_close() to touch it. * that, it is safe for mdb_env_close() to touch it.
* When it will be closed, we can finally claim it. * When it will be closed, we can finally claim it. */
*/
r->mr_pid = 0; r->mr_pid = 0;
r->mr_txnid = ~(txnid_t)0; r->mr_txnid = ~(txnid_t)0;
r->mr_tid = tid; r->mr_tid = tid;
@ -2848,7 +2838,6 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
new_notls = MDB_END_SLOT; new_notls = MDB_END_SLOT;
if (likely(rthc)) { if (likely(rthc)) {
rthc->rc_reader = r; rthc->rc_reader = r;
r->mr_rthc = rthc;
new_notls = 0; new_notls = 0;
} }
} }
@ -2866,6 +2855,9 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
txn->mt_next_pgno = meta->mm_last_pg+1; txn->mt_next_pgno = meta->mm_last_pg+1;
/* Copy the DB info and flags */ /* Copy the DB info and flags */
memcpy(txn->mt_dbs, meta->mm_dbs, CORE_DBS * sizeof(MDB_db)); memcpy(txn->mt_dbs, meta->mm_dbs, CORE_DBS * sizeof(MDB_db));
#if MDBX_MODE_ENABLED
txn->mt_canary = meta->mm_canary;
#endif
break; break;
} }
} }
@ -2882,6 +2874,9 @@ mdb_txn_renew0(MDB_txn *txn, unsigned flags)
pthread_mutex_lock(&tsan_mutex); pthread_mutex_lock(&tsan_mutex);
#endif #endif
MDB_meta *meta = mdb_meta_head_w(env); MDB_meta *meta = mdb_meta_head_w(env);
#if MDBX_MODE_ENABLED
txn->mt_canary = meta->mm_canary;
#endif
txn->mt_txnid = meta->mm_txnid + 1; txn->mt_txnid = meta->mm_txnid + 1;
txn->mt_flags = flags; txn->mt_flags = flags;
#ifdef __SANITIZE_THREAD__ #ifdef __SANITIZE_THREAD__
@ -3002,8 +2997,7 @@ mdb_txn_begin(MDB_env *env, MDB_txn *parent, unsigned flags, MDB_txn **ret)
size += tsize = sizeof(MDB_txn); size += tsize = sizeof(MDB_txn);
} else { } else {
/* Reuse preallocated write txn. However, do not touch it until /* Reuse preallocated write txn. However, do not touch it until
* mdb_txn_renew0() succeeds, since it currently may be active. * mdb_txn_renew0() succeeds, since it currently may be active. */
*/
txn = env->me_txn0; txn = env->me_txn0;
goto renew; goto renew;
} }
@ -3289,8 +3283,7 @@ mdb_freelist_save(MDB_txn *txn)
{ {
/* env->me_pghead[] can grow and shrink during this call. /* env->me_pghead[] can grow and shrink during this call.
* env->me_pglast and txn->mt_free_pgs[] can only grow. * env->me_pglast and txn->mt_free_pgs[] can only grow.
* Page numbers cannot disappear from txn->mt_free_pgs[]. * Page numbers cannot disappear from txn->mt_free_pgs[]. */
*/
MDB_cursor mc; MDB_cursor mc;
MDB_env *env = txn->mt_env; MDB_env *env = txn->mt_env;
int rc, maxfree_1pg = env->me_maxfree_1pg, more = 1; int rc, maxfree_1pg = env->me_maxfree_1pg, more = 1;
@ -3315,8 +3308,7 @@ again:
if (! lifo) { if (! lifo) {
/* If using records from freeDB which we have not yet /* If using records from freeDB which we have not yet
* deleted, delete them and any we reserved for me_pghead. * deleted, delete them and any we reserved for me_pghead. */
*/
while (pglast < env->me_pglast) { while (pglast < env->me_pglast) {
rc = mdb_cursor_first(&mc, &key, NULL); rc = mdb_cursor_first(&mc, &key, NULL);
if (unlikely(rc)) if (unlikely(rc))
@ -3358,8 +3350,7 @@ again:
if (unlikely(!env->me_pghead) && txn->mt_loose_pgs) { if (unlikely(!env->me_pghead) && txn->mt_loose_pgs) {
/* Put loose page numbers in mt_free_pgs, since /* Put loose page numbers in mt_free_pgs, since
* we may be unable to return them to me_pghead. * we may be unable to return them to me_pghead. */
*/
MDB_page *mp = txn->mt_loose_pgs; MDB_page *mp = txn->mt_loose_pgs;
if (unlikely((rc = mdb_midl_need(&txn->mt_free_pgs, txn->mt_loose_count)) != 0)) if (unlikely((rc = mdb_midl_need(&txn->mt_free_pgs, txn->mt_loose_count)) != 0))
return rc; return rc;
@ -3413,8 +3404,7 @@ again:
/* Reserve records for me_pghead[]. Split it if multi-page, /* Reserve records for me_pghead[]. Split it if multi-page,
* to avoid searching freeDB for a page range. Use keys in * to avoid searching freeDB for a page range. Use keys in
* range [1,me_pglast]: Smaller than txnid of oldest reader. * range [1,me_pglast]: Smaller than txnid of oldest reader. */
*/
if (total_room >= mop_len) { if (total_room >= mop_len) {
if (total_room == mop_len || --more < 0) if (total_room == mop_len || --more < 0)
break; break;
@ -3491,8 +3481,7 @@ again:
mdb_tassert(txn, cleanup_idx == (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0)); mdb_tassert(txn, cleanup_idx == (txn->mt_lifo_reclaimed ? txn->mt_lifo_reclaimed[0] : 0));
/* Return loose page numbers to me_pghead, though usually none are /* Return loose page numbers to me_pghead, though usually none are
* left at this point. The pages themselves remain in dirty_list. * left at this point. The pages themselves remain in dirty_list. */
*/
if (txn->mt_loose_pgs) { if (txn->mt_loose_pgs) {
MDB_page *mp = txn->mt_loose_pgs; MDB_page *mp = txn->mt_loose_pgs;
unsigned count = txn->mt_loose_count; unsigned count = txn->mt_loose_count;
@ -3766,8 +3755,7 @@ mdb_txn_commit(MDB_txn *txn)
goto fail; goto fail;
mdb_midl_free(txn->mt_free_pgs); mdb_midl_free(txn->mt_free_pgs);
/* Failures after this must either undo the changes /* Failures after this must either undo the changes
* to the parent or set MDB_TXN_ERROR in the parent. * to the parent or set MDB_TXN_ERROR in the parent. */
*/
parent->mt_next_pgno = txn->mt_next_pgno; parent->mt_next_pgno = txn->mt_next_pgno;
parent->mt_flags = txn->mt_flags; parent->mt_flags = txn->mt_flags;
@ -3943,6 +3931,9 @@ mdb_txn_commit(MDB_txn *txn)
meta.mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI]; meta.mm_dbs[MAIN_DBI] = txn->mt_dbs[MAIN_DBI];
meta.mm_last_pg = txn->mt_next_pgno - 1; meta.mm_last_pg = txn->mt_next_pgno - 1;
meta.mm_txnid = txn->mt_txnid; meta.mm_txnid = txn->mt_txnid;
#if MDBX_MODE_ENABLED
meta.mm_canary = txn->mt_canary;
#endif
rc = mdb_env_sync0(env, env->me_flags | txn->mt_flags, &meta); rc = mdb_env_sync0(env, env->me_flags | txn->mt_flags, &meta);
} }
@ -4179,6 +4170,9 @@ mdb_env_sync0(MDB_env *env, unsigned flags, MDB_meta *pending)
target->mm_dbs[FREE_DBI] = pending->mm_dbs[FREE_DBI]; target->mm_dbs[FREE_DBI] = pending->mm_dbs[FREE_DBI];
target->mm_dbs[MAIN_DBI] = pending->mm_dbs[MAIN_DBI]; target->mm_dbs[MAIN_DBI] = pending->mm_dbs[MAIN_DBI];
target->mm_last_pg = pending->mm_last_pg; target->mm_last_pg = pending->mm_last_pg;
#if MDBX_MODE_ENABLED
target->mm_canary = pending->mm_canary;
#endif
/* LY: 'commit' the meta */ /* LY: 'commit' the meta */
target->mm_txnid = pending->mm_txnid; target->mm_txnid = pending->mm_txnid;
target->mm_datasync_sign = pending->mm_datasync_sign; target->mm_datasync_sign = pending->mm_datasync_sign;
@ -4523,38 +4517,230 @@ mdb_env_open2(MDB_env *env, MDB_meta *meta)
return MDB_SUCCESS; return MDB_SUCCESS;
} }
static pthread_mutex_t mdb_rthc_lock = PTHREAD_MUTEX_INITIALIZER; /****************************************************************************/
#ifndef MDBX_USE_THREAD_ATEXIT
# if __GLIBC_PREREQ(2,18)
# define MDBX_USE_THREAD_ATEXIT 1
# else
# define MDBX_USE_THREAD_ATEXIT 0
# endif
#endif
static pthread_mutex_t mdbx_rthc_mutex = PTHREAD_MUTEX_INITIALIZER;
static MDBX_rthc *mdbx_rthc_list;
static pthread_key_t mdbx_pthread_crutch_key;
static __inline
void mdbx_rthc_lock(void) {
mdb_ensure(NULL, pthread_mutex_lock(&mdbx_rthc_mutex) == 0);
}
static __inline
void mdbx_rthc_unlock(void) {
mdb_ensure(NULL, pthread_mutex_unlock(&mdbx_rthc_mutex) == 0);
}
/** Release a reader thread's slot in the reader lock table. /** Release a reader thread's slot in the reader lock table.
* This function is called automatically when a thread exits. * This function is called automatically when a thread exits.
* @param[in] ptr This points to the MDB_rthc of a slot in the reader lock table. * @param[in] ptr This points to the MDB_rthc of a slot in the reader lock table.
*/ */
static __cold
/* LY: TODO: Yet another problem is here - segfault in case if a DSO will void mdbx_rthc_dtor(void)
* be unloaded before a thread would been finished. */
static ATTRIBUTE_NO_SANITIZE_THREAD
void mdb_env_reader_destr(void *ptr)
{ {
struct MDB_rthc* rthc = ptr; /* LY: Основная задача этого деструктора была и есть в освобождении
MDB_reader *reader; * слота таблицы читателей при завершении треда, но тут есть пара
* не очевидных сложностей:
* - Таблица читателей располагается в разделяемой памяти, поэтому
* во избежание segfault деструктор не должен что-либо делать после
* или одновременно с mdb_env_close().
* - Действительно, mdb_env_close() вызовет pthread_key_delete() и
* после этого glibc не будет вызывать деструктор.
* - ОДНАКО, это никак не решает проблему гонок между mdb_env_close()
* и завершающимися тредами. Грубо говоря, при старте mdb_env_close()
* деструктор уже может выполняться в некоторых тредах, и завершиться
* эти выполнения могут во время или после окончания mdb_env_close().
* - БОЛЕЕ ТОГО, схожая проблема возникает при выгрузке dso/dll,
* так как в текущей glibc (2.24) подсистема ld.so ничего не знает о
* TSD-деструкторах и поэтому может выгрузить lib.so до того как
* отработали все деструкторы.
* - Исходное проявление проблемы было зафиксировано
* в https://github.com/ReOpen/ReOpenLDAP/issues/48
*
* Предыдущее решение посредством выделяемого динамически MDB_rthc
* было не удачным, так как порождало либо утечку памяти,
* либо вероятностное обращение к уже освобожденной памяти
* из этого деструктора.
*
* Текущее решение достаточно "развесисто", но решает все описанные выше
* проблемы без пенальти по производительности.
*/
mdb_ensure(NULL, pthread_mutex_lock(&mdb_rthc_lock) == 0); mdbx_rthc_lock();
reader = rthc->rc_reader;
if (reader && reader->mr_pid == getpid()) { pid_t pid = getpid();
mdb_ensure(NULL, reader->mr_rthc == rthc); pthread_t thread = pthread_self();
rthc->rc_reader = NULL; for (MDBX_rthc** ref = &mdbx_rthc_list; *ref; ) {
reader->mr_rthc = NULL; MDBX_rthc* rthc = *ref;
mdbx_compiler_barrier(); if (rthc->rc_thread == thread) {
reader->mr_pid = 0; if (rthc->rc_reader && rthc->rc_reader->mr_pid == pid) {
mdbx_coherent_barrier(); rthc->rc_reader->mr_pid = 0;
mdbx_coherent_barrier();
}
*ref = rthc->rc_next;
free(rthc);
} else {
ref = &(*ref)->rc_next;
}
} }
mdb_ensure(NULL, pthread_mutex_unlock(&mdb_rthc_lock) == 0);
free(rthc); mdbx_rthc_unlock();
} }
#if MDBX_USE_THREAD_ATEXIT
extern void *__dso_handle __attribute__ ((__weak__));
extern int __cxa_thread_atexit_impl(void (*dtor)(void*), void *obj, void *dso_symbol);
static __cold
void mdbx_rthc__thread_atexit(void *ptr) {
mdb_ensure(NULL, ptr == pthread_getspecific(mdbx_pthread_crutch_key));
mdb_ensure(NULL, pthread_setspecific(mdbx_pthread_crutch_key, NULL) == 0);
mdbx_rthc_dtor();
}
static __attribute__((constructor)) __cold
void mdbx_pthread_crutch_ctor(void) {
mdb_ensure(NULL, pthread_key_create(
&mdbx_pthread_crutch_key, NULL) == 0);
}
#else /* MDBX_USE_THREAD_ATEXIT */
static __cold
void mdbx_rthc__thread_key_dtor(void *ptr) {
(void) ptr;
if (mdbx_pthread_crutch_key != (pthread_key_t) -1)
mdbx_rthc_dtor();
}
static __attribute__((constructor)) __cold
void mdbx_pthread_crutch_ctor(void) {
mdb_ensure(NULL, pthread_key_create(
&mdbx_pthread_crutch_key, mdbx_rthc__thread_key_dtor) == 0);
}
static __attribute__((destructor)) __cold
void mdbx_pthread_crutch_dtor(void)
{
pthread_key_delete(mdbx_pthread_crutch_key);
mdbx_pthread_crutch_key = -1;
/* LY: Из-за race condition в pthread_key_delete()
* деструкторы уже могли начать выполняться.
* Уступая квант времени сразу после удаления ключа
* мы даем им шанс завершиться. */
pthread_yield();
mdbx_rthc_lock();
pid_t pid = getpid();
while (mdbx_rthc_list != NULL) {
MDBX_rthc* rthc = mdbx_rthc_list;
mdbx_rthc_list = mdbx_rthc_list->rc_next;
if (rthc->rc_reader && rthc->rc_reader->mr_pid == pid) {
rthc->rc_reader->mr_pid = 0;
mdbx_coherent_barrier();
}
free(rthc);
/* LY: Каждый неудаленный элемент списка - это один
* не отработавший деструктор и потенциальный
* шанс получить segfault после выгрузки lib.so
* Поэтому на каждой итерации уступаем квант времени,
* в надежде что деструкторы успеют отработать. */
mdbx_rthc_unlock();
pthread_yield();
mdbx_rthc_lock();
}
mdbx_rthc_unlock();
pthread_yield();
}
#endif /* MDBX_USE_THREAD_ATEXIT */
static __cold
MDBX_rthc* mdbx_rthc_add(pthread_key_t key)
{
MDBX_rthc *rthc = malloc(sizeof(MDBX_rthc));
if (unlikely(rthc == NULL))
goto bailout;
rthc->rc_next = NULL;
rthc->rc_reader = NULL;
rthc->rc_thread = pthread_self();
if (unlikely(pthread_setspecific(key, rthc) != 0))
goto bailout_free;
mdbx_rthc_lock();
if (pthread_getspecific(mdbx_pthread_crutch_key) == NULL) {
#if MDBX_USE_THREAD_ATEXIT
void *dso_anchor = (&__dso_handle && __dso_handle)
? __dso_handle : (void *)mdb_version;
if (unlikely(__cxa_thread_atexit_impl(mdbx_rthc__thread_atexit, rthc, dso_anchor) != 0)) {
mdbx_rthc_unlock();
goto bailout_free;
}
#endif /* MDBX_USE_THREAD_ATEXIT */
mdb_ensure(NULL, pthread_setspecific(mdbx_pthread_crutch_key, rthc) == 0);
}
rthc->rc_next = mdbx_rthc_list;
mdbx_rthc_list = rthc;
mdbx_rthc_unlock();
return rthc;
bailout_free:
free(rthc);
bailout:
return NULL;
}
static __inline
MDBX_rthc* mdbx_rthc_get(pthread_key_t key)
{
MDBX_rthc *rthc = pthread_getspecific(key);
if (likely(rthc != NULL))
return rthc;
return mdbx_rthc_add(key);
}
static __cold
void mdbx_rthc_cleanup(MDB_env *env)
{
mdbx_rthc_lock();
MDB_reader *begin = env->me_txns->mti_readers;
MDB_reader *end = begin + env->me_close_readers;
for (MDBX_rthc** ref = &mdbx_rthc_list; *ref; ) {
MDBX_rthc* rthc = *ref;
if (rthc->rc_reader >= begin && rthc->rc_reader < end) {
if (rthc->rc_reader->mr_pid == env->me_pid) {
rthc->rc_reader->mr_pid = 0;
mdbx_coherent_barrier();
}
*ref = rthc->rc_next;
free(rthc);
} else {
ref = &(*ref)->rc_next;
}
}
mdbx_rthc_unlock();
}
/****************************************************************************/
/** Downgrade the exclusive lock on the region back to shared */ /** Downgrade the exclusive lock on the region back to shared */
static int __cold static __cold
mdb_env_share_locks(MDB_env *env, int *excl) int mdb_env_share_locks(MDB_env *env, int *excl)
{ {
struct flock lock_info; struct flock lock_info;
int rc = 0; int rc = 0;
@ -4721,7 +4907,7 @@ mdb_env_setup_locks(MDB_env *env, char *lpath, int mode, int *excl)
fcntl(env->me_lfd, F_SETFD, fdflags); fcntl(env->me_lfd, F_SETFD, fdflags);
if (!(env->me_flags & MDB_NOTLS)) { if (!(env->me_flags & MDB_NOTLS)) {
rc = pthread_key_create(&env->me_txkey, mdb_env_reader_destr); rc = pthread_key_create(&env->me_txkey, NULL);
if (rc) if (rc)
return rc; return rc;
env->me_flags |= MDB_ENV_TXKEY; env->me_flags |= MDB_ENV_TXKEY;
@ -5009,11 +5195,7 @@ mdb_env_close0(MDB_env *env)
mdb_midl_free(env->me_free_pgs); mdb_midl_free(env->me_free_pgs);
if (env->me_flags & MDB_ENV_TXKEY) { if (env->me_flags & MDB_ENV_TXKEY) {
struct MDB_rthc *rthc = pthread_getspecific(env->me_txkey); mdb_ensure(env, pthread_key_delete(env->me_txkey) == 0);
if (rthc && pthread_setspecific(env->me_txkey, NULL) == 0) {
mdb_env_reader_destr(rthc);
}
pthread_key_delete(env->me_txkey);
env->me_flags &= ~MDB_ENV_TXKEY; env->me_flags &= ~MDB_ENV_TXKEY;
} }
@ -5027,7 +5209,6 @@ mdb_env_close0(MDB_env *env)
if (env->me_fd != INVALID_HANDLE_VALUE) if (env->me_fd != INVALID_HANDLE_VALUE)
(void) close(env->me_fd); (void) close(env->me_fd);
pid_t pid = env->me_pid;
/* Clearing readers is done in this function because /* Clearing readers is done in this function because
* me_txkey with its destructor must be disabled first. * me_txkey with its destructor must be disabled first.
* *
@ -5035,26 +5216,12 @@ mdb_env_close0(MDB_env *env)
* data owned by this process (me_close_readers and * data owned by this process (me_close_readers and
* our readers), and clear each reader atomically. * our readers), and clear each reader atomically.
*/ */
if (pid == getpid()) { if (env->me_pid == getpid())
mdb_ensure(env, pthread_mutex_lock(&mdb_rthc_lock) == 0); mdbx_rthc_cleanup(env);
for (i = env->me_close_readers; --i >= 0; ) {
MDB_reader *reader = &env->me_txns->mti_readers[i];
if (reader->mr_pid == pid) {
struct MDB_rthc *rthc = reader->mr_rthc;
if (rthc) {
mdb_ensure(env, rthc->rc_reader == reader);
rthc->rc_reader = NULL;
reader->mr_rthc = NULL;
}
reader->mr_pid = 0;
}
}
mdbx_coherent_barrier();
mdb_ensure(env, pthread_mutex_unlock(&mdb_rthc_lock) == 0);
}
munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo)); munmap((void *)env->me_txns, (env->me_maxreaders-1)*sizeof(MDB_reader)+sizeof(MDB_txninfo));
env->me_txns = NULL; env->me_txns = NULL;
env->me_pid = 0;
if (env->me_lfd != INVALID_HANDLE_VALUE) { if (env->me_lfd != INVALID_HANDLE_VALUE) {
(void) close(env->me_lfd); (void) close(env->me_lfd);
@ -5190,7 +5357,7 @@ mdb_cmp_int_ua(const MDB_val *a, const MDB_val *b)
do { do {
diff = *--pa - *--pb; diff = *--pa - *--pb;
if (likely(diff)) break; if (likely(diff != 0)) break;
} while(pa != a->mv_data); } while(pa != a->mv_data);
return diff; return diff;
} }
@ -5410,8 +5577,7 @@ mdb_page_get(MDB_cursor *mc, pgno_t pgno, MDB_page **ret, int *lvl)
/* Spilled pages were dirtied in this txn and flushed /* Spilled pages were dirtied in this txn and flushed
* because the dirty list got full. Bring this page * because the dirty list got full. Bring this page
* back in from the map (but don't unspill it here, * back in from the map (but don't unspill it here,
* leave that unless page_touch happens again). * leave that unless page_touch happens again). */
*/
if (tx2->mt_spill_pgs) { if (tx2->mt_spill_pgs) {
MDB_ID pn = pgno << 1; MDB_ID pn = pgno << 1;
x = mdb_midl_search(tx2->mt_spill_pgs, pn); x = mdb_midl_search(tx2->mt_spill_pgs, pn);
@ -5998,9 +6164,6 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_node *leaf = NULL; MDB_node *leaf = NULL;
DKBUF; DKBUF;
if (unlikely(key->mv_size == 0))
return MDB_BAD_VALSIZE;
if ( (mc->mc_db->md_flags & MDB_INTEGERKEY) if ( (mc->mc_db->md_flags & MDB_INTEGERKEY)
&& unlikely( key->mv_size != sizeof(unsigned) && unlikely( key->mv_size != sizeof(unsigned)
&& key->mv_size != sizeof(size_t) )) { && key->mv_size != sizeof(size_t) )) {
@ -6080,8 +6243,7 @@ mdb_cursor_set(MDB_cursor *mc, MDB_val *key, MDB_val *data,
} }
} }
/* If any parents have right-sibs, search. /* If any parents have right-sibs, search.
* Otherwise, there's nothing further. * Otherwise, there's nothing further. */
*/
for (i=0; i<mc->mc_top; i++) for (i=0; i<mc->mc_top; i++)
if (mc->mc_ki[i] < if (mc->mc_ki[i] <
NUMKEYS(mc->mc_pg[i])-1) NUMKEYS(mc->mc_pg[i])-1)
@ -6172,7 +6334,6 @@ set1:
rc = 0; rc = 0;
} }
*data = olddata; *data = olddata;
} else { } else {
if (mc->mc_xcursor) if (mc->mc_xcursor)
mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF); mc->mc_xcursor->mx_cursor.mc_flags &= ~(C_INITIALIZED|C_EOF);
@ -6317,6 +6478,12 @@ mdb_cursor_get(MDB_cursor *mc, MDB_val *key, MDB_val *data,
MDB_GET_KEY(leaf, key); MDB_GET_KEY(leaf, key);
if (data) { if (data) {
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED))) {
mdb_xcursor_init1(mc, leaf);
rc = mdb_cursor_first(&mc->mc_xcursor->mx_cursor, data, NULL);
if (unlikely(rc))
break;
}
rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT); rc = mdb_cursor_get(&mc->mc_xcursor->mx_cursor, data, NULL, MDB_GET_CURRENT);
} else { } else {
rc = mdb_node_read(mc, leaf, data); rc = mdb_node_read(mc, leaf, data);
@ -6544,7 +6711,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED))) if (unlikely(mc->mc_txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN; return (mc->mc_txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
if (unlikely(key->mv_size-1 >= ENV_MAXKEY(env))) if (unlikely(key->mv_size > ENV_MAXKEY(env)))
return MDB_BAD_VALSIZE; return MDB_BAD_VALSIZE;
#if SIZE_MAX > MAXDATASIZE #if SIZE_MAX > MAXDATASIZE
@ -6574,7 +6741,7 @@ mdb_cursor_put(MDB_cursor *mc, MDB_val *key, MDB_val *data,
dkey.mv_size = 0; dkey.mv_size = 0;
if (flags == MDB_CURRENT) { if (flags & MDB_CURRENT) {
if (unlikely(!(mc->mc_flags & C_INITIALIZED))) if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return EINVAL; return EINVAL;
rc = MDB_SUCCESS; rc = MDB_SUCCESS;
@ -6765,6 +6932,7 @@ more:
break; break;
} }
/* FALLTHRU: Big enough MDB_DUPFIXED sub-page */ /* FALLTHRU: Big enough MDB_DUPFIXED sub-page */
case MDB_CURRENT | MDB_NODUPDATA:
case MDB_CURRENT: case MDB_CURRENT:
fp->mp_flags |= P_DIRTY; fp->mp_flags |= P_DIRTY;
COPY_PGNO(fp->mp_pgno, mp->mp_pgno); COPY_PGNO(fp->mp_pgno, mp->mp_pgno);
@ -6953,8 +7121,7 @@ new_sub:
/* Now store the actual data in the child DB. Note that we're /* Now store the actual data in the child DB. Note that we're
* storing the user data in the keys field, so there are strict * storing the user data in the keys field, so there are strict
* size limits on dupdata. The actual data fields of the child * size limits on dupdata. The actual data fields of the child
* DB are all zero size. * DB are all zero size. */
*/
if (do_sub) { if (do_sub) {
int xflags, new_dupdata; int xflags, new_dupdata;
size_t ecount; size_t ecount;
@ -6962,12 +7129,15 @@ put_sub:
xdata.mv_size = 0; xdata.mv_size = 0;
xdata.mv_data = ""; xdata.mv_data = "";
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
xflags = MDB_NOSPILL;
if (flags & MDB_NODUPDATA)
xflags |= MDB_NOOVERWRITE;
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
if (flags & MDB_CURRENT) { if (flags & MDB_CURRENT) {
xflags = MDB_CURRENT|MDB_NOSPILL; xflags |= MDB_CURRENT;
} else { } else {
mdb_xcursor_init1(mc, leaf); mdb_xcursor_init1(mc, leaf);
xflags = (flags & MDB_NODUPDATA) ?
MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL;
} }
if (sub_root) if (sub_root)
mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root; mc->mc_xcursor->mx_cursor.mc_pg[0] = sub_root;
@ -7001,8 +7171,6 @@ put_sub:
} }
} }
ecount = mc->mc_xcursor->mx_db.md_entries; ecount = mc->mc_xcursor->mx_db.md_entries;
if (flags & MDB_APPENDDUP)
xflags |= MDB_APPEND;
rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags);
if (flags & F_SUBDATA) { if (flags & F_SUBDATA) {
void *db = NODEDATA(leaf); void *db = NODEDATA(leaf);
@ -7018,8 +7186,7 @@ put_sub:
if (unlikely(rc)) if (unlikely(rc))
goto bad_sub; goto bad_sub;
/* If we succeeded and the key didn't exist before, /* If we succeeded and the key didn't exist before,
* make sure the cursor is marked valid. * make sure the cursor is marked valid. */
*/
mc->mc_flags |= C_INITIALIZED; mc->mc_flags |= C_INITIALIZED;
} }
if (flags & MDB_MULTIPLE) { if (flags & MDB_MULTIPLE) {
@ -7232,10 +7399,11 @@ mdb_branch_size(MDB_env *env, MDB_val *key)
size_t sz; size_t sz;
sz = INDXSIZE(key); sz = INDXSIZE(key);
if (sz > env->me_nodemax) { if (unlikely(sz > env->me_nodemax)) {
/* put on overflow page */ /* put on overflow page */
/* not implemented */ /* not implemented */
/* sz -= key->size - sizeof(pgno_t); */ mdb_assert_fail(env, "INDXSIZE(key) <= env->me_nodemax", __FUNCTION__, __LINE__);
sz -= key->mv_size - sizeof(pgno_t);
} }
return sz + sizeof(indx_t); return sz + sizeof(indx_t);
@ -7560,7 +7728,6 @@ mdb_xcursor_init1(MDB_cursor *mc, MDB_node *node)
#endif */ #endif */
} }
/** Fixup a sorted-dups cursor due to underlying update. /** Fixup a sorted-dups cursor due to underlying update.
* Sets up some fields that depend on the data from the main cursor. * Sets up some fields that depend on the data from the main cursor.
* Almost the same as init1, but skips initialization steps if the * Almost the same as init1, but skips initialization steps if the
@ -7689,35 +7856,51 @@ mdb_cursor_renew(MDB_txn *txn, MDB_cursor *mc)
int int
mdb_cursor_count(MDB_cursor *mc, size_t *countp) mdb_cursor_count(MDB_cursor *mc, size_t *countp)
{ {
MDB_node *leaf;
if (unlikely(mc == NULL || countp == NULL)) if (unlikely(mc == NULL || countp == NULL))
return EINVAL; return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE)) if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH; return MDB_VERSION_MISMATCH;
if (unlikely(mc->mc_xcursor == NULL))
return MDB_INCOMPATIBLE;
if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED)) if (unlikely(mc->mc_txn->mt_flags & MDB_TXN_BLOCKED))
return MDB_BAD_TXN; return MDB_BAD_TXN;
if (unlikely(!(mc->mc_flags & C_INITIALIZED))) if (unlikely(!(mc->mc_flags & C_INITIALIZED)))
return EINVAL; return EINVAL;
#if MDBX_MODE_ENABLED
MDB_page *mp = mc->mc_pg[mc->mc_top];
int nkeys = NUMKEYS(mp);
if (!nkeys || mc->mc_ki[mc->mc_top] >= nkeys) {
*countp = 0;
return MDB_NOTFOUND;
} else if (mc->mc_xcursor == NULL || IS_LEAF2(mp)) {
*countp = 1;
} else {
MDB_node *leaf = NODEPTR(mp, mc->mc_ki[mc->mc_top]);
if (!F_ISSET(leaf->mn_flags, F_DUPDATA))
*countp = 1;
else if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)))
return EINVAL;
else
*countp = mc->mc_xcursor->mx_db.md_entries;
}
#else
if (unlikely(mc->mc_xcursor == NULL))
return MDB_INCOMPATIBLE;
if (unlikely(!mc->mc_snum || (mc->mc_flags & C_EOF))) if (unlikely(!mc->mc_snum || (mc->mc_flags & C_EOF)))
return MDB_NOTFOUND; return MDB_NOTFOUND;
leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); MDB_node *leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]);
if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) {
*countp = 1; *countp = 1;
} else { } else {
if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED))) if (unlikely(!(mc->mc_xcursor->mx_cursor.mc_flags & C_INITIALIZED)))
return EINVAL; return EINVAL;
*countp = mc->mc_xcursor->mx_db.md_entries; *countp = mc->mc_xcursor->mx_db.md_entries;
} }
#endif /* MDBX_MODE_ENABLED */
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -7939,14 +8122,12 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
csrc->mc_pg[csrc->mc_top]->mp_pgno, csrc->mc_pg[csrc->mc_top]->mp_pgno,
cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno); cdst->mc_ki[cdst->mc_top], cdst->mc_pg[cdst->mc_top]->mp_pgno);
/* Add the node to the destination page. /* Add the node to the destination page. */
*/
rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags); rc = mdb_node_add(cdst, cdst->mc_ki[cdst->mc_top], &key, &data, srcpg, flags);
if (unlikely(rc != MDB_SUCCESS)) if (unlikely(rc != MDB_SUCCESS))
return rc; return rc;
/* Delete the node from the source page. /* Delete the node from the source page. */
*/
mdb_node_del(csrc, key.mv_size); mdb_node_del(csrc, key.mv_size);
{ {
@ -8007,8 +8188,7 @@ mdb_node_move(MDB_cursor *csrc, MDB_cursor *cdst, int fromleft)
} }
} }
/* Update the parent separators. /* Update the parent separators. */
*/
if (csrc->mc_ki[csrc->mc_top] == 0) { if (csrc->mc_ki[csrc->mc_top] == 0) {
if (csrc->mc_ki[csrc->mc_top-1] != 0) { if (csrc->mc_ki[csrc->mc_top-1] != 0) {
if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) { if (IS_LEAF2(csrc->mc_pg[csrc->mc_top])) {
@ -8807,8 +8987,7 @@ mdb_page_split(MDB_cursor *mc, MDB_val *newkey, MDB_val *newdata, pgno_t newpgno
mdb_debug("separator is %d [%s]", split_indx, DKEY(&sepkey)); mdb_debug("separator is %d [%s]", split_indx, DKEY(&sepkey));
/* Copy separator key to the parent. /* Copy separator key to the parent. */
*/
if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) { if (SIZELEFT(mn.mc_pg[ptop]) < mdb_branch_size(env, &sepkey)) {
int snum = mc->mc_snum; int snum = mc->mc_snum;
mn.mc_snum--; mn.mc_snum--;
@ -9020,7 +9199,6 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
{ {
MDB_cursor mc; MDB_cursor mc;
MDB_xcursor mx; MDB_xcursor mx;
int rc;
if (unlikely(!key || !data || !txn)) if (unlikely(!key || !data || !txn))
return EINVAL; return EINVAL;
@ -9031,7 +9209,9 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID))) if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID)))
return EINVAL; return EINVAL;
if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP))) if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP
/* LY: MDB_CURRENT indicates explicit overwrite (update) for MDBX */
| (MDBX_MODE_ENABLED ? MDB_CURRENT : 0))))
return EINVAL; return EINVAL;
if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED))) if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
@ -9040,8 +9220,25 @@ mdb_put(MDB_txn *txn, MDB_dbi dbi,
mdb_cursor_init(&mc, txn, dbi, &mx); mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi]; mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc; txn->mt_cursors[dbi] = &mc;
rc = mdb_cursor_put(&mc, key, data, flags); int rc = MDB_SUCCESS;
#if MDBX_MODE_ENABLED
/* LY: support for update (explicit overwrite) */
if (flags & MDB_CURRENT) {
rc = mdb_cursor_get(&mc, key, NULL, MDB_SET);
if (likely(rc == MDB_SUCCESS) && (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)) {
/* LY: allows update (explicit overwrite) only for unique keys */
MDB_node *leaf = NODEPTR(mc.mc_pg[mc.mc_top], mc.mc_ki[mc.mc_top]);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_tassert(txn, XCURSOR_INITED(&mc) && mc.mc_xcursor->mx_db.md_entries > 1);
rc = MDB_KEYEXIST;
}
}
}
#endif /* MDBX_MODE_ENABLED */
if (likely(rc == MDB_SUCCESS))
rc = mdb_cursor_put(&mc, key, data, flags);
txn->mt_cursors[dbi] = mc.mc_next; txn->mt_cursors[dbi] = mc.mc_next;
return rc; return rc;
} }
@ -10276,11 +10473,9 @@ mdb_pid_insert(pid_t *ids, pid_t pid)
if( val < 0 ) { if( val < 0 ) {
n = pivot; n = pivot;
} else if ( val > 0 ) { } else if ( val > 0 ) {
base = cursor; base = cursor;
n -= pivot + 1; n -= pivot + 1;
} else { } else {
/* found, so it's a duplicate */ /* found, so it's a duplicate */
return -1; return -1;
@ -10346,15 +10541,14 @@ mdb_reader_check0(MDB_env *env, int rlocked, int *dead)
j = rdrs; j = rdrs;
} }
} }
for (; j<rdrs; j++) for (; j < rdrs; j++) {
if (mr[j].mr_pid == pid) { if (mr[j].mr_pid == pid) {
mdb_debug("clear stale reader pid %u txn %zd", mdb_debug("clear stale reader pid %u txn %zd",
(unsigned) pid, mr[j].mr_txnid); (unsigned) pid, mr[j].mr_txnid);
mr[j].mr_rthc = NULL; mr[j].mr_pid = 0;
mdbx_compiler_barrier(); count++;
mr[j].mr_pid = 0; }
count++; }
}
if (rmutex) if (rmutex)
mdb_mutex_unlock(env, rmutex); mdb_mutex_unlock(env, rmutex);
} }

View File

@ -432,7 +432,6 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
fflush(NULL); fflush(NULL);
} }
skipped_subdb++; skipped_subdb++;
mdbx_dbi_close(env, dbi);
return MDB_SUCCESS; return MDB_SUCCESS;
} }
@ -444,14 +443,12 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
rc = mdbx_dbi_flags(txn, dbi, &flags); rc = mdbx_dbi_flags(txn, dbi, &flags);
if (rc) { if (rc) {
error(" - mdbx_dbi_flags failed, error %d %s\n", rc, mdbx_strerror(rc)); error(" - mdbx_dbi_flags failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc; return rc;
} }
rc = mdbx_stat(txn, dbi, &ms, sizeof(ms)); rc = mdbx_stat(txn, dbi, &ms, sizeof(ms));
if (rc) { if (rc) {
error(" - mdbx_stat failed, error %d %s\n", rc, mdbx_strerror(rc)); error(" - mdbx_stat failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc; return rc;
} }
@ -475,7 +472,6 @@ static int process_db(MDB_dbi dbi, char *name, visitor *handler, int silent)
rc = mdbx_cursor_open(txn, dbi, &mc); rc = mdbx_cursor_open(txn, dbi, &mc);
if (rc) { if (rc) {
error(" - mdbx_cursor_open failed, error %d %s\n", rc, mdbx_strerror(rc)); error(" - mdbx_cursor_open failed, error %d %s\n", rc, mdbx_strerror(rc));
mdbx_dbi_close(env, dbi);
return rc; return rc;
} }
@ -565,7 +561,6 @@ bailout:
} }
mdbx_cursor_close(mc); mdbx_cursor_close(mc);
mdbx_dbi_close(env, dbi);
return rc || problems_count; return rc || problems_count;
} }
@ -686,7 +681,11 @@ int main(int argc, char *argv[])
} }
maxkeysize = rc; maxkeysize = rc;
mdbx_env_set_maxdbs(env, 3); rc = mdbx_env_set_maxdbs(env, MAX_DBI);
if (rc < 0) {
error("mdbx_env_set_maxdbs failed, error %d %s\n", rc, mdbx_strerror(rc));
goto bailout;
}
rc = mdbx_env_open_ex(env, envname, envflags, 0664, &exclusive); rc = mdbx_env_open_ex(env, envname, envflags, 0664, &exclusive);
if (rc) { if (rc) {
@ -747,7 +746,7 @@ int main(int argc, char *argv[])
meta_lt(info.me_meta1_txnid, info.me_meta1_sign, meta_lt(info.me_meta1_txnid, info.me_meta1_sign,
info.me_meta2_txnid, info.me_meta2_sign) ? "tail" : "head"); info.me_meta2_txnid, info.me_meta2_sign) ? "tail" : "head");
if (info.me_meta1_txnid > info.base.me_last_txnid) if (info.me_meta1_txnid > info.base.me_last_txnid)
print(", rolled-back %zu (%zu >>> %zu)\n", print(", rolled-back %zu (%zu >>> %zu)",
info.me_meta1_txnid - info.base.me_last_txnid, info.me_meta1_txnid - info.base.me_last_txnid,
info.me_meta1_txnid, info.base.me_last_txnid); info.me_meta1_txnid, info.base.me_last_txnid);
print("\n"); print("\n");
@ -757,7 +756,7 @@ int main(int argc, char *argv[])
meta_lt(info.me_meta2_txnid, info.me_meta2_sign, meta_lt(info.me_meta2_txnid, info.me_meta2_sign,
info.me_meta1_txnid, info.me_meta1_sign) ? "tail" : "head"); info.me_meta1_txnid, info.me_meta1_sign) ? "tail" : "head");
if (info.me_meta2_txnid > info.base.me_last_txnid) if (info.me_meta2_txnid > info.base.me_last_txnid)
print(", rolled-back %zu (%zu >>> %zu)\n", print(", rolled-back %zu (%zu >>> %zu)",
info.me_meta2_txnid - info.base.me_last_txnid, info.me_meta2_txnid - info.base.me_last_txnid,
info.me_meta2_txnid, info.base.me_last_txnid); info.me_meta2_txnid, info.base.me_last_txnid);
print("\n"); print("\n");

View File

@ -252,7 +252,8 @@ badend:
c2 += 2; c2 += 2;
} }
} else { } else {
c1++; c2++; /* copies are redundant when no escapes were used */
*c1++ = *c2++;
} }
} }
} else { } else {

179
mdbx.c
View File

@ -325,3 +325,182 @@ mdbx_env_pgwalk(MDB_txn *txn, MDBX_pgvisitor_func* visitor, void* user)
rc = visitor(P_INVALID, 0, user, NULL, NULL, 0, 0, 0, 0); rc = visitor(P_INVALID, 0, user, NULL, NULL, 0, 0, 0, 0);
return rc; return rc;
} }
int mdbx_canary_put(MDB_txn *txn, const mdbx_canary* canary)
{
if (unlikely(!txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(F_ISSET(txn->mt_flags, MDB_TXN_RDONLY)))
return EACCES;
if (likely(canary)) {
txn->mt_canary.x = canary->x;
txn->mt_canary.y = canary->y;
txn->mt_canary.z = canary->z;
}
txn->mt_canary.v = txn->mt_txnid;
return MDB_SUCCESS;
}
size_t mdbx_canary_get(MDB_txn *txn, mdbx_canary* canary)
{
if(unlikely(!txn || txn->mt_signature != MDBX_MT_SIGNATURE))
return 0;
if (likely(canary))
*canary = txn->mt_canary;
return txn->mt_txnid;
}
int mdbx_cursor_eof(MDB_cursor *mc)
{
if (unlikely(mc == NULL))
return EINVAL;
if (unlikely(mc->mc_signature != MDBX_MC_SIGNATURE))
return MDB_VERSION_MISMATCH;
return (mc->mc_flags & C_INITIALIZED) ? 0 : 1;
}
static int mdbx_is_samedata(const MDB_val* a, const MDB_val* b) {
return a->iov_len == b->iov_len
&& memcmp(a->iov_base, b->iov_base, a->iov_len) == 0;
}
/* Позволяет обновить или удалить существующую запись с получением
* в old_data предыдущего значения данных. При этом если new_data равен
* нулю, то выполняется удаление, иначе обновление/вставка.
*
* Текущее значение может находиться в уже измененной (грязной) странице.
* В этом случае страница будет перезаписана при обновлении, а само старое
* значение утрачено. Поэтому исходно в old_data должен быть передан
* дополнительный буфер для копирования старого значения.
* Если переданный буфер слишком мал, то функция вернет -1, установив
* old_data->iov_len в соответствующее значение.
*
* Для не-уникальных ключей также возможен второй сценарий использования,
* когда посредством old_data из записей с одинаковым ключом для
* удаления/обновления выбирается конкретная. Для выбора этого сценария
* во flags следует одновременно указать MDB_CURRENT и MDB_NOOVERWRITE.
*
* Функция может быть замещена соответствующими операциями с курсорами
* после двух доработок (TODO):
* - внешняя аллокация курсоров, в том числе на стеке (без malloc).
* - получения статуса страницы по адресу (знать о P_DIRTY).
*/
int mdbx_replace(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *new_data, MDB_val *old_data, unsigned flags)
{
MDB_cursor mc;
MDB_xcursor mx;
if (unlikely(!key || !old_data || !txn))
return EINVAL;
if (unlikely(txn->mt_signature != MDBX_MT_SIGNATURE))
return MDB_VERSION_MISMATCH;
if (unlikely(old_data->iov_base == NULL && old_data->iov_len))
return EINVAL;
if (unlikely(new_data == NULL && !(flags & MDB_CURRENT)))
return EINVAL;
if (unlikely(!TXN_DBI_EXIST(txn, dbi, DB_USRVALID)))
return EINVAL;
if (unlikely(flags & ~(MDB_NOOVERWRITE|MDB_NODUPDATA|MDB_RESERVE|MDB_APPEND|MDB_APPENDDUP|MDB_CURRENT)))
return EINVAL;
if (unlikely(txn->mt_flags & (MDB_TXN_RDONLY|MDB_TXN_BLOCKED)))
return (txn->mt_flags & MDB_TXN_RDONLY) ? EACCES : MDB_BAD_TXN;
mdb_cursor_init(&mc, txn, dbi, &mx);
mc.mc_next = txn->mt_cursors[dbi];
txn->mt_cursors[dbi] = &mc;
int rc;
MDB_val present_key = *key;
if (F_ISSET(flags, MDB_CURRENT | MDB_NOOVERWRITE)
&& (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT)) {
/* в old_data значение для выбора конкретного дубликата */
rc = mdbx_cursor_get(&mc, &present_key, old_data, MDB_GET_BOTH);
if (rc != MDB_SUCCESS)
goto bailout;
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(old_data, new_data))
goto bailout;
} else {
/* в old_data буфер получения предыдущего значения */
MDB_val present_data;
rc = mdbx_cursor_get(&mc, &present_key, &present_data, MDB_SET_KEY);
if (unlikely(rc != MDB_SUCCESS)) {
old_data->iov_base = NULL;
old_data->iov_len = rc;
if (rc != MDB_NOTFOUND || (flags & MDB_CURRENT))
goto bailout;
} else if (flags & MDB_NOOVERWRITE) {
rc = MDB_KEYEXIST;
*old_data = present_data;
goto bailout;
} else {
MDB_page *page = mc.mc_pg[mc.mc_top];
if (txn->mt_dbs[dbi].md_flags & MDB_DUPSORT) {
if (flags & MDB_CURRENT) {
/* для не-уникальных ключей позволяем update/delete только если ключ один */
MDB_node *leaf = NODEPTR(page, mc.mc_ki[mc.mc_top]);
if (F_ISSET(leaf->mn_flags, F_DUPDATA)) {
mdb_tassert(txn, XCURSOR_INITED(&mc) && mc.mc_xcursor->mx_db.md_entries > 1);
rc = MDB_KEYEXIST;
goto bailout;
}
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(&present_data, new_data)) {
*old_data = *new_data;
goto bailout;
}
} else if ((flags & MDB_NODUPDATA) && mdbx_is_samedata(&present_data, new_data)) {
/* если данные совпадают и установлен MDB_NODUPDATA */
rc = MDB_KEYEXIST;
goto bailout;
}
} else {
/* если данные совпадают, то ничего делать не надо */
if (new_data && mdbx_is_samedata(&present_data, new_data)) {
*old_data = *new_data;
goto bailout;
}
flags |= MDB_CURRENT;
}
if (page->mp_flags & P_DIRTY) {
if (unlikely(old_data->iov_len < present_data.iov_len)) {
old_data->iov_base = NULL;
old_data->iov_len = present_data.iov_len;
rc = -1;
goto bailout;
}
memcpy(old_data->iov_base, present_data.iov_base, present_data.iov_len);
old_data->iov_len = present_data.iov_len;
} else {
*old_data = present_data;
}
}
}
if (likely(new_data))
rc = mdbx_cursor_put(&mc, key, new_data, flags);
else
rc = mdbx_cursor_del(&mc, 0);
bailout:
txn->mt_cursors[dbi] = mc.mc_next;
return rc;
}

15
mdbx.h
View File

@ -211,6 +211,21 @@ typedef int MDBX_pgvisitor_func(size_t pgno, unsigned pgnumber, void* ctx,
const char* dbi, const char *type, int nentries, const char* dbi, const char *type, int nentries,
int payload_bytes, int header_bytes, int unused_bytes); int payload_bytes, int header_bytes, int unused_bytes);
int mdbx_env_pgwalk(MDB_txn *txn, MDBX_pgvisitor_func* visitor, void* ctx); int mdbx_env_pgwalk(MDB_txn *txn, MDBX_pgvisitor_func* visitor, void* ctx);
typedef struct mdbx_canary {
size_t x, y, z, v;
} mdbx_canary;
int mdbx_canary_put(MDB_txn *txn, const mdbx_canary* canary);
size_t mdbx_canary_get(MDB_txn *txn, mdbx_canary* canary);
/** Returns 1 when no more data available or cursor not positioned,
* 0 otherwise or less that zero in error case. */
int mdbx_cursor_eof(MDB_cursor *mc);
int mdbx_replace(MDB_txn *txn, MDB_dbi dbi,
MDB_val *key, MDB_val *new_data, MDB_val *old_data, unsigned flags);
/** @} */ /** @} */
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -23,6 +23,8 @@
#include <sys/stat.h> #include <sys/stat.h>
#include "mdbx.h" #include "mdbx.h"
#include <pthread.h>
#define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr) #define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
#define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0)) #define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
#define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \ #define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
@ -32,6 +34,18 @@
# define DBPATH "./testdb" # define DBPATH "./testdb"
#endif #endif
void* thread_entry(void *ctx)
{
MDB_env *env = ctx;
MDB_txn *txn;
int rc;
E(mdb_txn_begin(env, NULL, MDB_RDONLY, &txn));
mdb_txn_abort(txn);
return NULL;
}
int main(int argc,char * argv[]) int main(int argc,char * argv[])
{ {
int i = 0, j = 0, rc; int i = 0, j = 0, rc;
@ -60,7 +74,7 @@ int main(int argc,char * argv[])
} }
E(mdb_env_create(&env)); E(mdb_env_create(&env));
E(mdb_env_set_maxreaders(env, 1)); E(mdb_env_set_maxreaders(env, 42));
E(mdb_env_set_mapsize(env, 10485760)); E(mdb_env_set_mapsize(env, 10485760));
E(stat("/proc/self/exe", &exe_stat)?errno:0); E(stat("/proc/self/exe", &exe_stat)?errno:0);
@ -184,6 +198,11 @@ int main(int argc,char * argv[])
mdb_cursor_close(cur2); mdb_cursor_close(cur2);
E(mdb_txn_commit(txn)); E(mdb_txn_commit(txn));
for(i = 0; i < 41; ++i) {
pthread_t thread;
pthread_create(&thread, NULL, thread_entry, env);
}
printf("Restarting cursor outside txn\n"); printf("Restarting cursor outside txn\n");
E(mdb_txn_begin(env, NULL, 0, &txn)); E(mdb_txn_begin(env, NULL, 0, &txn));
E(mdb_cursor_open(txn, dbi, &cursor)); E(mdb_cursor_open(txn, dbi, &cursor));