Better TransactionLog with multiread

This commit is contained in:
Antonio Andelic 2022-09-26 09:31:27 +00:00
parent cc3719e463
commit 56cc3c7137
2 changed files with 48 additions and 57 deletions

View File

@ -11,7 +11,6 @@
#include <Core/ServerUUID.h> #include <Core/ServerUUID.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/noexcept_scope.h> #include <Common/noexcept_scope.h>
#include "Coordination/KeeperConstants.h"
namespace DB namespace DB
@ -115,53 +114,58 @@ String TransactionLog::serializeTID(const TransactionID & tid)
return buf.str(); return buf.str();
} }
struct IZooKeeperEntryData
{
virtual Coordination::GetResponse operator[](size_t index) = 0;
virtual ~IZooKeeperEntryData() = default;
};
template <bool with_multiread> template <bool with_multiread>
struct ZooKeeperEntryData : public IZooKeeperEntryData std::vector<std::pair<TIDHash, TransactionLog::CSNEntry>>
{ TransactionLog::getLoaded(Strings::const_iterator beg, Strings::const_iterator end)
public:
ZooKeeperEntryData(Strings::const_iterator beg, Strings::const_iterator end, const auto & zookeeper, const auto & zookeeper_path_log)
{ {
using FutureGetResponses = std::vector<std::future<Coordination::GetResponse>>;
using ResponsesType = std::conditional_t<with_multiread, Coordination::Responses, FutureGetResponses>;
size_t entries_count = std::distance(beg, end); size_t entries_count = std::distance(beg, end);
responses.reserve(entries_count); ResponsesType entries_data;
entries_data.reserve(entries_count);
if constexpr (with_multiread) if constexpr (with_multiread)
{ {
Coordination::Requests requests; Coordination::Requests requests;
requests.reserve(entries_count);
for (auto it = beg; it != end; ++it) for (auto it = beg; it != end; ++it)
{
requests.emplace_back(zkutil::makeGetRequest(fs::path(zookeeper_path_log) / *it)); requests.emplace_back(zkutil::makeGetRequest(fs::path(zookeeper_path_log) / *it));
}
zookeeper->multi(requests); entries_data = TSA_READ_ONE_THREAD(zookeeper)->multi(requests);
} }
else else
{ {
for (auto it = beg; it != end; ++it) for (auto it = beg; it != end; ++it)
{ entries_data.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
responses.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
}
}
} }
Coordination::GetResponse operator[](size_t index) override auto it = beg;
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
for (size_t i = 0; i < entries_count; ++i, ++it)
{
auto res = [&]
{ {
if constexpr (with_multiread) if constexpr (with_multiread)
return std::move(dynamic_cast<Coordination::GetResponse &>(*responses[index])); return std::move(dynamic_cast<Coordination::GetResponse &>(*entries_data[i]));
else else
return responses[index].get(); return entries_data[i].get();
}();
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
} }
private: return loaded;
using FutureListResponses = std::vector<std::future<Coordination::GetResponse>>; }
using ResponsesType = std::conditional_t<with_multiread, Coordination::Responses, FutureListResponses>;
ResponsesType responses; template std::vector<std::pair<TIDHash, TransactionLog::CSNEntry>>
}; TransactionLog::getLoaded<true>(Strings::const_iterator beg, Strings::const_iterator end);
template std::vector<std::pair<TIDHash, TransactionLog::CSNEntry>>
TransactionLog::getLoaded<false>(Strings::const_iterator beg, Strings::const_iterator end);
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end) void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
{ {
@ -171,25 +175,9 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
String last_entry = *std::prev(end); String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry); LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
std::unique_ptr<IZooKeeperEntryData> entries_data_ptr;
if (TSA_READ_ONE_THREAD(zookeeper)->getApiVersion() >= KeeperApiVersion::WITH_MULTI_READ)
entries_data_ptr = std::make_unique<ZooKeeperEntryData<true>>(beg, end, TSA_READ_ONE_THREAD(zookeeper), zookeeper_path_log);
else
entries_data_ptr = std::make_unique<ZooKeeperEntryData<false>>(beg, end, TSA_READ_ONE_THREAD(zookeeper), zookeeper_path_log);
auto & entries = *entries_data_ptr; auto loaded = TSA_READ_ONE_THREAD(zookeeper)->getApiVersion() >= KeeperApiVersion::WITH_MULTI_READ ? getLoaded<true>(beg, end)
: getLoaded<false>(beg, end);
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
auto it = beg;
for (size_t i = 0; i < entries_count; ++i, ++it)
{
auto res = entries[i];
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
}
NOEXCEPT_SCOPE_STRICT({ NOEXCEPT_SCOPE_STRICT({
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};

View File

@ -169,6 +169,9 @@ private:
using TIDMap = std::unordered_map<TIDHash, CSNEntry>; using TIDMap = std::unordered_map<TIDHash, CSNEntry>;
TIDMap tid_to_csn TSA_GUARDED_BY(mutex); TIDMap tid_to_csn TSA_GUARDED_BY(mutex);
template<bool with_multiread>
std::vector<std::pair<TIDHash, CSNEntry>> getLoaded(Strings::const_iterator beg, Strings::const_iterator end);
mutable std::mutex running_list_mutex; mutable std::mutex running_list_mutex;
/// Transactions that are currently processed /// Transactions that are currently processed
TransactionsList running_list TSA_GUARDED_BY(running_list_mutex); TransactionsList running_list TSA_GUARDED_BY(running_list_mutex);