Use multiread

This commit is contained in:
Antonio Andelic 2022-09-26 09:07:34 +00:00
parent 99665f17ba
commit cc3719e463
2 changed files with 125 additions and 33 deletions

View File

@ -5,11 +5,13 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include "Common/ZooKeeper/Types.h"
#include <Common/Exception.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/ServerUUID.h>
#include <Common/logger_useful.h>
#include <Common/noexcept_scope.h>
#include "Coordination/KeeperConstants.h"
namespace DB
@ -113,32 +115,81 @@ String TransactionLog::serializeTID(const TransactionID & tid)
return buf.str();
}
struct IZooKeeperEntryData
{
virtual Coordination::GetResponse operator[](size_t index) = 0;
virtual ~IZooKeeperEntryData() = default;
};
template <bool with_multiread>
struct ZooKeeperEntryData : public IZooKeeperEntryData
{
public:
ZooKeeperEntryData(Strings::const_iterator beg, Strings::const_iterator end, const auto & zookeeper, const auto & zookeeper_path_log)
{
size_t entries_count = std::distance(beg, end);
responses.reserve(entries_count);
if constexpr (with_multiread)
{
Coordination::Requests requests;
for (auto it = beg; it != end; ++it)
{
requests.emplace_back(zkutil::makeGetRequest(fs::path(zookeeper_path_log) / *it));
}
zookeeper->multi(requests);
}
else
{
for (auto it = beg; it != end; ++it)
{
responses.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
}
}
}
Coordination::GetResponse operator[](size_t index) override
{
if constexpr (with_multiread)
return std::move(dynamic_cast<Coordination::GetResponse &>(*responses[index]));
else
return responses[index].get();
}
private:
using FutureListResponses = std::vector<std::future<Coordination::GetResponse>>;
using ResponsesType = std::conditional_t<with_multiread, Coordination::Responses, FutureListResponses>;
ResponsesType responses;
};
void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_iterator end)
{
std::vector<std::future<Coordination::GetResponse>> futures;
size_t entries_count = std::distance(beg, end);
if (!entries_count)
return;
String last_entry = *std::prev(end);
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(TSA_READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
std::unique_ptr<IZooKeeperEntryData> entries_data_ptr;
if (TSA_READ_ONE_THREAD(zookeeper)->getApiVersion() >= KeeperApiVersion::WITH_MULTI_READ)
entries_data_ptr = std::make_unique<ZooKeeperEntryData<true>>(beg, end, TSA_READ_ONE_THREAD(zookeeper), zookeeper_path_log);
else
entries_data_ptr = std::make_unique<ZooKeeperEntryData<false>>(beg, end, TSA_READ_ONE_THREAD(zookeeper), zookeeper_path_log);
auto & entries = *entries_data_ptr;
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
auto it = beg;
for (size_t i = 0; i < entries_count; ++i, ++it)
{
auto res = futures[i].get();
auto res = entries[i];
CSN csn = deserializeCSN(*it);
TransactionID tid = deserializeTID(res.data);
loaded.emplace_back(tid.getHash(), CSNEntry{csn, tid});
LOG_TEST(log, "Got entry {} -> {}", tid, csn);
}
futures.clear();
NOEXCEPT_SCOPE_STRICT({
std::lock_guard lock{mutex};

View File

@ -10,6 +10,7 @@
#include <Common/CurrentMetrics.h>
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
namespace DB
@ -1948,6 +1949,69 @@ std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatu
return result;
}
namespace
{
struct BlockInfoInZooKeeper
{
String partition;
Int64 number;
String zk_path;
std::future<Coordination::GetResponse> contents_future;
};
template<bool with_multiread>
std::vector<BlockInfoInZooKeeper> getBlockInfos(const auto & partitions, const auto & zookeeper, const auto & zookeeper_path)
{
using FutureListResponses = std::vector<std::future<Coordination::ListResponse>>;
using ResponsesType = std::conditional_t<with_multiread, Coordination::Responses, FutureListResponses>;
ResponsesType locks;
if constexpr (with_multiread)
{
Coordination::Requests requests;
for (const String & partition : partitions)
requests.push_back(zkutil::makeSimpleListRequest(fs::path(zookeeper_path) / "block_numbers" / partition));
locks = zookeeper->multi(requests);
}
else
{
for (const String & partition : partitions)
locks.push_back(zookeeper->asyncGetChildren(fs::path(zookeeper_path) / "block_numbers" / partition));
}
std::vector<BlockInfoInZooKeeper> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
const Strings & partition_block_numbers = [&]
{
if constexpr (with_multiread)
return dynamic_cast<const Coordination::ZooKeeperSimpleListResponse &>(*locks[i]).names;
else
return locks[i].get().names;
}();
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.
/// We won't need to check them on the next iteration.
if (startsWith(entry, "block-"))
{
Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
String zk_path = fs::path(zookeeper_path) / "block_numbers" / partitions[i] / entry;
block_infos.emplace_back(
BlockInfoInZooKeeper{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
}
}
}
return block_infos;
}
}
ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue()
{
return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex);
@ -1977,35 +2041,12 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
if (!lock_holder_paths.empty())
{
Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
std::vector<std::future<Coordination::ListResponse>> lock_futures;
for (const String & partition : partitions)
lock_futures.push_back(zookeeper->asyncGetChildren(fs::path(queue.zookeeper_path) / "block_numbers" / partition));
struct BlockInfoInZooKeeper
{
String partition;
Int64 number;
String zk_path;
std::future<Coordination::GetResponse> contents_future;
};
std::vector<BlockInfoInZooKeeper> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = lock_futures[i].get().names;
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.
/// We won't need to check them on the next iteration.
if (startsWith(entry, "block-"))
{
Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
String zk_path = fs::path(queue.zookeeper_path) / "block_numbers" / partitions[i] / entry;
block_infos.emplace_back(
BlockInfoInZooKeeper{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
}
}
}
if (zookeeper->getApiVersion() >= KeeperApiVersion::WITH_MULTI_READ)
block_infos = getBlockInfos<true>(partitions, zookeeper, queue.zookeeper_path);
else
block_infos = getBlockInfos<false>(partitions, zookeeper, queue.zookeeper_path);
for (auto & block : block_infos)
{