From 0a9c9840daa77c7eb0995e9accae7f48bb9587d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9B=D0=B5=D0=BE=D0=BD=D0=B8=D0=B4=20=D0=AE=D1=80=D1=8C?= =?UTF-8?q?=D0=B5=D0=B2=20=28Leonid=20Yuriev=29?= <leo@yuriev.ru> Date: Thu, 20 Mar 2025 01:47:56 +0300 Subject: [PATCH] =?UTF-8?q?mdbx-tests:=20=D1=81=D1=83=D1=89=D0=B5=D1=81?= =?UTF-8?q?=D1=82=D0=B2=D0=B5=D0=BD=D0=BD=D0=BE=D0=B5=20=D1=80=D0=B0=D1=81?= =?UTF-8?q?=D1=88=D0=B8=D1=80=D0=B5=D0=BD=D0=B8=D0=B5=20`extra/cursor-clos?= =?UTF-8?q?ing`=20(backport).?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/extra/cursor_closing.c++ | 341 +++++++++++++++++++++++++++++++--- 1 file changed, 313 insertions(+), 28 deletions(-) diff --git a/test/extra/cursor_closing.c++ b/test/extra/cursor_closing.c++ index 045b7677..60749dbd 100644 --- a/test/extra/cursor_closing.c++ +++ b/test/extra/cursor_closing.c++ @@ -1,6 +1,13 @@ #include "mdbx.h++" +#include <chrono> +#include <deque> #include <iostream> +#include <vector> +#if defined(__cpp_lib_latch) && __cpp_lib_latch >= 201907L +#include <latch> +#include <thread> +#endif static void logger_nofmt(MDBX_log_level_t loglevel, const char *function, int line, const char *msg, unsigned length) noexcept { @@ -11,6 +18,304 @@ static void logger_nofmt(MDBX_log_level_t loglevel, const char *function, int li static char log_buffer[1024]; +//-------------------------------------------------------------------------------------------- + +bool case0(mdbx::env env) { + auto txn = env.start_write(); + auto table = txn.create_map("case0", mdbx::key_mode::usual, mdbx::value_mode::single); + auto cursor_1 = txn.open_cursor(table); + auto cursor_2 = cursor_1.clone(); + + auto nested = env.start_write(txn); + auto nested_cursor_1 = nested.open_cursor(table); + auto nested_cursor_2 = nested_cursor_1.clone(); + auto nested_cursor_3 = cursor_1.clone(); + + auto deep = env.start_write(nested); + auto deep_cursor_1 = deep.open_cursor(table); + auto deep_cursor_2 = nested_cursor_1.clone(); + auto deep_cursor_3 = cursor_1.clone(); + deep_cursor_1.close(); + deep.commit(); + deep_cursor_2.close(); + + nested_cursor_1.close(); + nested.abort(); + nested_cursor_2.close(); + + cursor_1.close(); + txn.commit(); + cursor_2.close(); + return true; +} + +//-------------------------------------------------------------------------------------------- + +/* Сценарий: + * + * 0. Создаём N таблиц, курсор для каждой таблицы и заполняем (1000 ключей, от 1 до 1000 значений в каждом ключе). + * 1. Запускаем N-1 фоновых потоков и используем текущий/основной. + * 2. В каждом потоке 100500 раз повторяем последовательность действий: + * - 100500 раз запускаем читающую транзакцию и выполняем "читающий цикл": + * - в читающей транзакции создаем 0..3 курсоров, потом подключаем заранее созданный курсор, + * потом еще 0..3 курсоров; + * - выполняем по паре поисков через каждый курсор; + * - отключаем заранее созданный курсор; + * - снова выполняем несколько поисков по каждому курсору; + * - псевдослучайно закрываем один из курсоров и один отключаем; + * - псевдослучайно выполняем один из путей: + * - закрываем все курсоры посредством mdbx_txn_release_all_cursors(); + * - отсоединяем все курсоры посредством mdbx_txn_release_all_cursors(); + * - псевдослучайно закрываем один из курсоров и один отключаем; + * - ничего не делаем; + * - завершаем читающую транзакцию псевдослучайно выбирая между commit и abort; + * - закрываем оставшиеся курсоры. + * 3. Выполняем "пишущий цикл": + * - запускаем пишущую или вложенную транзакцию; + * - из оставшихся с предыдущих итераций курсоров половину закрываем, + * половину подключаем к транзакции; + * - для каждой таблицы с вероятностью 1/2 выполняем "читающий цикл"; + * - для каждой таблицы с вероятностью 1/2 выполняем "модифицирующий" цикл: + * - подключаем курсор, либо создаем при отсутствии подходящих; + * - 100 раз выполняем поиск случайных пар ключ/значение; + * - при успешном поиске удаляем значение, иначе вставляем; + * - с вероятностью 1/2 повторяем "читающий цикл"; + * - с вероятностью 7/16 запускаем вложенную транзакцию: + * - действуем рекурсивно как с пишущей транзакцией; + * - в "читающих циклах" немного меняем поведение: + * - игнорируем ожидаемые ошибки mdbx_cursor_unbind(); + * - в 2-3 раза уменьшаем вероятность использования mdbx_txn_release_all_cursors(); + * - завершаем вложенную транзакцию псевдослучайно выбирая между commit и abort; + * - для каждой таблицы с вероятностью 1/2 выполняем "читающий цикл"; + * - завершаем транзакцию псевдослучайно выбирая между commit и abort; + * 4. Ждем завершения фоновых потоков. + * 5. Закрываем оставшиеся курсоры и закрываем БД. */ + +thread_local size_t salt; + +static size_t prng() { + salt = salt * 134775813 + 1; + return salt ^ ((salt >> 11) * 1822226723); +} + +static inline bool flipcoin() { return prng() & 1; } + +static inline size_t prng(size_t range) { return prng() % range; } + +void case1_shuffle_pool(std::vector<MDBX_cursor *> &pool) { + for (size_t n = 1; n < pool.size(); ++n) { + const auto i = prng(n); + if (i != n) + std::swap(pool[n], pool[i]); + } +} + +void case1_read_pool(std::vector<MDBX_cursor *> &pool) { + for (auto c : pool) + if (flipcoin()) + mdbx::cursor(c).find_multivalue(mdbx::slice::wrap(prng(1000)), mdbx::slice::wrap(prng(1000)), false); + for (auto c : pool) + if (flipcoin()) + mdbx::cursor(c).find_multivalue(mdbx::slice::wrap(prng(1000)), mdbx::slice::wrap(prng(1000)), false); +} + +MDBX_cursor *case1_try_unbind(MDBX_cursor *cursor) { + if (cursor) { + auto err = mdbx::error(static_cast<MDBX_error_t>(mdbx_cursor_unbind(cursor))); + if (err.code() != MDBX_EINVAL) + err.success_or_throw(); + } + return cursor; +} + +MDBX_cursor *case1_pool_remove(std::vector<MDBX_cursor *> &pool) { + switch (pool.size()) { + case 0: + return nullptr; + case 1: + if (flipcoin()) { + const auto c = pool[0]; + pool.pop_back(); + return c; + } + return nullptr; + default: + const auto i = prng(pool.size()); + const auto c = pool[i]; + pool.erase(pool.begin() + i); + return c; + } +} + +mdbx::map_handle case1_cycle_dbi(std::deque<mdbx::map_handle> &dbi) { + const auto h = dbi.front(); + dbi.pop_front(); + dbi.push_back(h); + return h; +} + +void case1_read_cycle(mdbx::txn txn, std::deque<mdbx::map_handle> &dbi, std::vector<MDBX_cursor *> &pool, + mdbx::cursor pre, bool nested = false) { + for (auto c : pool) + mdbx::cursor(c).bind(txn, case1_cycle_dbi(dbi)); + pre.bind(txn, case1_cycle_dbi(dbi)); + + for (auto n = prng(3 + dbi.size()); n > 0; --n) { + auto c = txn.open_cursor(dbi[prng(dbi.size())]); + pool.push_back(c.withdraw_handle()); + } + case1_shuffle_pool(pool); + case1_read_pool(pool); + + pool.push_back(pre); + case1_read_pool(pool); + pool.pop_back(); + + for (auto n = prng(3 + dbi.size()); n > 0; --n) { + auto c = txn.open_cursor(dbi[prng(dbi.size())]); + pool.push_back(c.withdraw_handle()); + } + pool.push_back(pre); + case1_read_pool(pool); + pool.pop_back(); + + case1_try_unbind(pre); + case1_shuffle_pool(pool); + case1_read_pool(pool); + + if (flipcoin()) { + mdbx_cursor_close(case1_pool_remove(pool)); + auto u = case1_try_unbind(case1_pool_remove(pool)); + case1_read_pool(pool); + if (u) + pool.push_back(u); + } else { + auto u = case1_try_unbind(case1_pool_remove(pool)); + mdbx_cursor_close(case1_pool_remove(pool)); + case1_read_pool(pool); + if (u) + pool.push_back(u); + } + + switch (prng(nested ? 7 : 3)) { + case 0: + for (auto i = pool.begin(); i != pool.end();) + if (mdbx_cursor_txn(*i)) + i = pool.erase(i); + else + ++i; + txn.close_all_cursors(); + break; + case 1: + txn.unbind_all_cursors(); + break; + } +} + +void case1_write_cycle(mdbx::txn_managed txn, std::deque<mdbx::map_handle> &dbi, std::vector<MDBX_cursor *> &pool, + mdbx::cursor pre, bool nested = false) { + if (flipcoin()) + case1_cycle_dbi(dbi); + if (flipcoin()) + case1_shuffle_pool(pool); + + for (auto n = prng(dbi.size() + 1); n > 1; n -= 2) { + if (!nested) + pre.unbind(); + if (!pre.txn()) + pre.bind(txn, dbi[prng(dbi.size())]); + for (auto i = 0; i < 1000; ++i) { + auto k = mdbx::default_buffer::wrap(prng(1000)); + auto v = mdbx::default_buffer::wrap(prng(1000)); + if (pre.find_multivalue(k, v, false)) + pre.erase(); + else + pre.upsert(k, v); + } + } + + if (prng(16) > 8) + case1_write_cycle(txn.start_nested(), dbi, pool, pre, true); + + if (flipcoin()) + txn.commit(); + else + txn.abort(); +} + +bool case1_thread(mdbx::env env, std::deque<mdbx::map_handle> dbi, mdbx::cursor pre) { + salt = size_t(std::chrono::high_resolution_clock::now().time_since_epoch().count()); + std::vector<MDBX_cursor *> pool; + for (auto loop = 0; loop < 333; ++loop) { + for (auto read = 0; read < 333; ++read) { + auto txn = env.start_read(); + case1_read_cycle(txn, dbi, pool, pre); + if (flipcoin()) + txn.commit(); + else + txn.abort(); + } + + case1_write_cycle(env.start_write(), dbi, pool, pre); + + for (auto c : pool) + mdbx_cursor_close(c); + pool.clear(); + } + + pre.unbind(); + return true; +} + +bool case1(mdbx::env env) { + bool ok = true; + std::deque<mdbx::map_handle> dbi; + std::vector<mdbx::cursor_managed> cursors; +#if defined(__cpp_lib_latch) && __cpp_lib_latch >= 201907L + static const auto N = 10; +#else + static const auto N = 3; +#endif + for (auto t = 0; t < N; ++t) { + auto txn = env.start_write(); + auto table = txn.create_map(std::to_string(t), mdbx::key_mode::ordinal, mdbx::value_mode::multi_samelength); + auto cursor = txn.open_cursor(table); + for (size_t i = 0; i < 10000; ++i) + cursor.upsert(mdbx::default_buffer::wrap(prng(1000)), mdbx::default_buffer::wrap(prng(1000))); + txn.commit(); + + cursors.push_back(std::move(cursor)); + dbi.push_back(table); + } + +#if defined(__cpp_lib_latch) && __cpp_lib_latch >= 201907L + std::latch s(1); + std::vector<std::thread> threads; + for (auto t = 1; t < N; ++t) { + case1_cycle_dbi(dbi); + threads.push_back(std::thread([&, t]() { + s.wait(); + if (!case1_thread(env, dbi, cursors[t])) + ok = false; + })); + } + case1_cycle_dbi(dbi); + s.count_down(); +#endif + + if (!case1_thread(env, dbi, cursors[0])) + ok = false; + +#if defined(__cpp_lib_latch) && __cpp_lib_latch >= 201907L + for (auto &t : threads) + t.join(); +#endif + + return ok; +} + +//-------------------------------------------------------------------------------------------- + int main(int argc, const char *argv[]) { (void)argc; (void)argv; @@ -23,34 +328,14 @@ int main(int argc, const char *argv[]) { mdbx::env_managed env(db_filename, mdbx::env_managed::create_parameters(), mdbx::env::operate_parameters(42, 0, mdbx::env::nested_transactions)); - { - auto txn = env.start_write(); - auto table = txn.create_map("dummy", mdbx::key_mode::usual, mdbx::value_mode::single); - auto cursor_1 = txn.open_cursor(table); - auto cursor_2 = cursor_1.clone(); + bool ok = case0(env); + ok = case1(env) && ok; - auto nested = env.start_write(txn); - auto nested_cursor_1 = nested.open_cursor(table); - auto nested_cursor_2 = nested_cursor_1.clone(); - auto nested_cursor_3 = cursor_1.clone(); - - auto deep = env.start_write(nested); - auto deep_cursor_1 = deep.open_cursor(table); - auto deep_cursor_2 = nested_cursor_1.clone(); - auto deep_cursor_3 = cursor_1.clone(); - deep_cursor_1.close(); - deep.commit(); - deep_cursor_2.close(); - - nested_cursor_1.close(); - nested.abort(); - nested_cursor_2.close(); - - cursor_1.close(); - txn.commit(); - cursor_2.close(); + if (ok) { + std::cout << "OK\n"; + return EXIT_SUCCESS; + } else { + std::cout << "FAIL!\n"; + return EXIT_FAILURE; } - - std::cout << "OK\n"; - return EXIT_SUCCESS; }