Merge pull request #42243 from ClickHouse/use-multiread-more

Use MultiRead where possible
This commit is contained in:
Antonio Andelic 2022-10-20 09:04:54 +02:00 committed by GitHub
commit 77eb353839
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 49 deletions

View File

@ -419,14 +419,14 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
LOG_TRACE(log, "Checking {} blocks ({} are not cached){}", stat.numChildren, not_cached_blocks, " to clear old ones from ZooKeeper.");
}
zkutil::AsyncResponses<Coordination::ExistsResponse> exists_futures;
std::vector<std::string> exists_paths;
for (const String & block : blocks)
{
auto it = cached_block_stats.find(block);
if (it == cached_block_stats.end())
{
/// New block. Fetch its stat asynchronously.
exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block));
exists_paths.emplace_back(storage.zookeeper_path + "/blocks/" + block);
}
else
{
@ -436,14 +436,18 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
}
}
auto exists_size = exists_paths.size();
auto exists_results = zookeeper.exists(exists_paths);
/// Put fetched stats into the cache
for (auto & elem : exists_futures)
for (size_t i = 0; i < exists_size; ++i)
{
auto status = elem.second.get();
auto status = exists_results[i];
if (status.error != Coordination::Error::ZNONODE)
{
cached_block_stats.emplace(elem.first, std::make_pair(status.stat.ctime, status.stat.version));
timed_blocks.emplace_back(elem.first, status.stat.ctime, status.stat.version);
auto node_name = fs::path(exists_paths[i]).filename();
cached_block_stats.emplace(node_name, std::make_pair(status.stat.ctime, status.stat.version));
timed_blocks.emplace_back(node_name, status.stat.ctime, status.stat.version);
}
}

View File

@ -153,17 +153,19 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
::sort(children.begin(), children.end());
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(children.size());
auto children_num = children.size();
std::vector<std::string> paths;
paths.reserve(children_num);
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(fs::path(queue_path) / child));
paths.emplace_back(fs::path(queue_path) / child);
for (auto & future : futures)
auto results = zookeeper->get(paths);
for (size_t i = 0; i < children_num; ++i)
{
Coordination::GetResponse res = future.second.get();
auto res = results[i];
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
entry->znode_name = future.first;
entry->znode_name = children[i];
std::lock_guard lock(state_mutex);
@ -641,11 +643,11 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
LOG_DEBUG(log, "Pulling {} entries to queue: {} - {}", (end - begin), *begin, *last);
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(end - begin);
Strings get_paths;
get_paths.reserve(end - begin);
for (auto it = begin; it != end; ++it)
futures.emplace_back(*it, zookeeper->asyncGet(fs::path(zookeeper_path) / "log" / *it));
get_paths.emplace_back(fs::path(zookeeper_path) / "log" / *it);
/// Simultaneously add all new entries to the queue and move the pointer to the log.
@ -655,9 +657,11 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
std::optional<time_t> min_unprocessed_insert_time_changed;
for (auto & future : futures)
auto get_results = zookeeper->get(get_paths);
auto get_num = get_results.size();
for (size_t i = 0; i < get_num; ++i)
{
Coordination::GetResponse res = future.second.get();
auto res = get_results[i];
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));

View File

@ -99,19 +99,22 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & z
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
std::vector<std::future<Coordination::ExistsResponse>> replicas_status_futures;
replicas_status_futures.reserve(replicas.size());
Strings exists_paths;
for (const auto & replica : replicas)
if (replica != storage.replica_name)
replicas_status_futures.emplace_back(zookeeper->asyncExists(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active"));
exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active");
std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
auto exists_result = zookeeper->exists(exists_paths);
auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"});
size_t active_replicas = 1; /// Assume current replica is active (will check below)
for (auto & status : replicas_status_futures)
if (status.get().error == Coordination::Error::ZOK)
for (size_t i = 0; i < exists_paths.size(); ++i)
{
auto status = exists_result[i];
if (status.error == Coordination::Error::ZOK)
++active_replicas;
}
size_t replicas_number = replicas.size();
size_t quorum_size = getQuorumSize(replicas_number);
@ -135,8 +138,8 @@ size_t ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & z
/// Both checks are implicitly made also later (otherwise there would be a race condition).
auto is_active = is_active_future.get();
auto host = host_future.get();
auto is_active = get_results[0];
auto host = get_results[1];
if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
throw Exception("Replica is not active right now", ErrorCodes::READONLY);

View File

@ -682,24 +682,20 @@ Chunk StorageKeeperMap::getBySerializedKeys(const std::span<const std::string> k
auto client = getClient();
std::vector<std::future<Coordination::GetResponse>> values;
values.reserve(keys.size());
Strings full_key_paths;
full_key_paths.reserve(keys.size());
for (const auto & key : keys)
{
const auto full_path = fullPathForKey(key);
values.emplace_back(client->asyncTryGet(full_path));
full_key_paths.emplace_back(fullPathForKey(key));
}
auto wait_until = std::chrono::system_clock::now() + std::chrono::milliseconds(Coordination::DEFAULT_OPERATION_TIMEOUT_MS);
auto values = client->tryGet(full_key_paths);
for (size_t i = 0; i < keys.size(); ++i)
{
auto & value = values[i];
if (value.wait_until(wait_until) != std::future_status::ready)
throw DB::Exception(ErrorCodes::KEEPER_EXCEPTION, "Failed to fetch values: timeout");
auto response = values[i];
auto response = value.get();
Coordination::Error code = response.error;
if (code == Coordination::Error::ZOK)

View File

@ -3206,16 +3206,17 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
int32_t log_version,
MergeType merge_type)
{
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(parts.size());
Strings exists_paths;
exists_paths.reserve(parts.size());
for (const auto & part : parts)
exists_futures.emplace_back(zookeeper->asyncExists(fs::path(replica_path) / "parts" / part->name));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part->name);
auto exists_results = zookeeper->exists(exists_paths);
bool all_in_zk = true;
for (size_t i = 0; i < parts.size(); ++i)
{
/// If there is no information about part in ZK, we will not merge it.
if (exists_futures[i].get().error == Coordination::Error::ZNONODE)
if (exists_results[i].error == Coordination::Error::ZNONODE)
{
all_in_zk = false;
@ -6228,19 +6229,20 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(const Strin
auto zookeeper = getZooKeeper();
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(part_names.size());
Strings exists_paths;
exists_paths.reserve(part_names.size());
for (const String & part_name : part_names)
{
String part_path = fs::path(replica_path) / "parts" / part_name;
exists_futures.emplace_back(zookeeper->asyncExists(part_path));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part_name);
}
auto exists_results = zookeeper->exists(exists_paths);
std::vector<std::future<Coordination::MultiResponse>> remove_futures;
remove_futures.reserve(part_names.size());
for (size_t i = 0; i < part_names.size(); ++i)
{
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
Coordination::ExistsResponse exists_resp = exists_results[i];
if (exists_resp.error == Coordination::Error::ZOK)
{
Coordination::Requests ops;
@ -6286,9 +6288,9 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(const Strin
void StorageReplicatedMergeTree::removePartsFromZooKeeper(
zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names, NameSet * parts_should_be_retried)
{
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
Strings exists_paths;
std::vector<std::future<Coordination::MultiResponse>> remove_futures;
exists_futures.reserve(part_names.size());
exists_paths.reserve(part_names.size());
remove_futures.reserve(part_names.size());
try
{
@ -6296,13 +6298,14 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(
/// if zk session will be dropped
for (const String & part_name : part_names)
{
String part_path = fs::path(replica_path) / "parts" / part_name;
exists_futures.emplace_back(zookeeper->asyncExists(part_path));
exists_paths.emplace_back(fs::path(replica_path) / "parts" / part_name);
}
auto exists_results = zookeeper->exists(exists_paths);
for (size_t i = 0; i < part_names.size(); ++i)
{
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
auto exists_resp = exists_results[i];
if (exists_resp.error == Coordination::Error::ZOK)
{
Coordination::Requests ops;

View File

@ -11,11 +11,13 @@ node1 = cluster.add_instance(
"node1",
main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"],
with_zookeeper=True,
use_keeper=False,
)
node2 = cluster.add_instance(
"node2",
main_configs=["configs/zookeeper_config.xml", "configs/remote_servers.xml"],
with_zookeeper=True,
use_keeper=False,
)