Merge pull request #55574 from ClickHouse/pr-progress-bar

Parallel replicas: progress bar
This commit is contained in:
Igor Nikonov 2023-11-05 20:28:44 +01:00 committed by GitHub
commit 0f76ba83e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 126 additions and 34 deletions

View File

@ -30,7 +30,7 @@
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
#define DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION 2
#define DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION 3
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
#define DBMS_MERGE_TREE_PART_INFO_VERSION 1

View File

@ -29,7 +29,7 @@ protected:
virtual Chunk generate();
virtual std::optional<Chunk> tryGenerate();
virtual void progress(size_t read_rows, size_t read_bytes);
void progress(size_t read_rows, size_t read_bytes);
public:
explicit ISource(Block header, bool enable_auto_progress = true);

View File

@ -343,7 +343,6 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
/// We have a special logic for local replica. It has to read less data, because in some cases it should
/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
size_t total_rows = parts_with_range.getRowsCountAllParts();
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
std::move(extension),
@ -371,14 +370,6 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
actions_settings, block_size_copy, reader_settings, virt_column_names);
auto source = std::make_shared<MergeTreeSource>(std::move(processor));
/// Set the approximate number of rows for the first source only
/// In case of parallel processing on replicas do not set approximate rows at all.
/// Because the value will be identical on every replicas and will be accounted
/// multiple times (settings.max_parallel_replicas times more)
if (i == 0 && !client_info.collaborate_with_initiator)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
}

View File

@ -25,6 +25,16 @@ RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation
for (auto & type : sample.getDataTypes())
if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
add_aggregation_info = true;
/// Progress method will be called on Progress packet.
query_executor->setProgressCallback([this](const Progress & value)
{
if (value.total_rows_to_read)
addTotalRowsApprox(value.total_rows_to_read);
if (value.total_bytes_to_read)
addTotalBytes(value.total_bytes_to_read);
progress(value.read_rows, value.read_bytes);
});
}
RemoteSource::~RemoteSource() = default;
@ -72,16 +82,6 @@ std::optional<Chunk> RemoteSource::tryGenerate()
if (!was_query_sent)
{
/// Progress method will be called on Progress packet.
query_executor->setProgressCallback([this](const Progress & value)
{
if (value.total_rows_to_read)
addTotalRowsApprox(value.total_rows_to_read);
if (value.total_bytes_to_read)
addTotalBytes(value.total_bytes_to_read);
progress(value.read_rows, value.read_bytes);
});
/// Get rows_before_limit result for remote query from ProfileInfo packet.
query_executor->setProfileInfoCallback([this](const ProfileInfo & info)
{

View File

@ -88,9 +88,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, extension(extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
@ -105,9 +103,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, extension(extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -773,4 +769,12 @@ bool RemoteQueryExecutor::hasThrownException() const
return got_exception_from_replica || got_unknown_packet_from_replica;
}
void RemoteQueryExecutor::setProgressCallback(ProgressCallback callback)
{
progress_callback = std::move(callback);
if (extension && extension->parallel_reading_coordinator)
extension->parallel_reading_coordinator->setProgressCallback(progress_callback);
}
}

View File

@ -165,7 +165,7 @@ public:
Block getExtremes() { return std::move(extremes); }
/// Set callback for progress. It will be called on Progress packet.
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
void setProgressCallback(ProgressCallback callback);
/// Set callback for profile info. It will be called on ProfileInfo packet.
void setProfileInfoCallback(ProfileInfoCallback callback) { profile_info_callback = std::move(callback); }

View File

@ -15,6 +15,7 @@
#include <Common/thread_local_rng.h>
#include <base/types.h>
#include "IO/WriteBufferFromString.h"
#include <IO/Progress.h>
#include "Storages/MergeTree/RangesInDataPart.h"
#include "Storages/MergeTree/RequestResponse.h"
#include <Storages/MergeTree/MarkRange.h>
@ -78,6 +79,7 @@ public:
Stats stats;
size_t replicas_count{0};
size_t unavailable_replicas_count{0};
ProgressCallback progress_callback;
explicit ImplInterface(size_t replicas_count_)
: stats{replicas_count_}
@ -88,6 +90,8 @@ public:
virtual ParallelReadResponse handleRequest(ParallelReadRequest request) = 0;
virtual void handleInitialAllRangesAnnouncement(InitialAllRangesAnnouncement announcement) = 0;
virtual void markReplicaAsUnavailable(size_t replica_number) = 0;
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
};
using Parts = std::set<Part>;
@ -231,6 +235,20 @@ void DefaultCoordinator::finalizeReadingState()
delayed_parts.pop_front();
}
// update progress with total rows
if (progress_callback)
{
size_t total_rows_to_read = 0;
for (const auto & part : all_parts_to_read)
total_rows_to_read += part.description.rows;
Progress progress;
progress.total_rows_to_read = total_rows_to_read;
progress_callback(progress);
LOG_DEBUG(log, "Total rows to read: {}", total_rows_to_read);
}
LOG_DEBUG(log, "Reading state is fully initialized: {}", fmt::join(all_parts_to_read, "; "));
}
@ -361,6 +379,7 @@ public:
void markReplicaAsUnavailable(size_t replica_number) override;
Parts all_parts_to_read;
size_t total_rows_to_read = 0;
Poco::Logger * log = &Poco::Logger::get(fmt::format("{}{}", magic_enum::enum_name(mode), "Coordinator"));
};
@ -381,6 +400,8 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
{
LOG_TRACE(log, "Received an announcement {}", announcement.describe());
size_t new_rows_to_read = 0;
/// To get rid of duplicates
for (auto && part: announcement.description)
{
@ -401,10 +422,23 @@ void InOrderCoordinator<mode>::handleInitialAllRangesAnnouncement(InitialAllRang
if (covering_or_the_same_it != all_parts_to_read.end())
continue;
new_rows_to_read += part.rows;
auto [inserted_it, _] = all_parts_to_read.emplace(Part{.description = std::move(part), .replicas = {announcement.replica_num}});
auto & ranges = inserted_it->description.ranges;
std::sort(ranges.begin(), ranges.end());
}
if (new_rows_to_read > 0)
{
Progress progress;
progress.total_rows_to_read = new_rows_to_read;
progress_callback(progress);
total_rows_to_read += new_rows_to_read;
LOG_DEBUG(log, "Updated total rows to read: added {} rows, total {} rows", new_rows_to_read, total_rows_to_read);
}
}
template <CoordinationMode mode>
@ -508,7 +542,6 @@ void ParallelReplicasReadingCoordinator::handleInitialAllRangesAnnouncement(Init
initialize();
}
return pimpl->handleInitialAllRangesAnnouncement(std::move(announcement));
}
@ -543,18 +576,28 @@ void ParallelReplicasReadingCoordinator::initialize()
{
case CoordinationMode::Default:
pimpl = std::make_unique<DefaultCoordinator>(replicas_count);
return;
break;
case CoordinationMode::WithOrder:
pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::WithOrder>>(replicas_count);
return;
break;
case CoordinationMode::ReverseOrder:
pimpl = std::make_unique<InOrderCoordinator<CoordinationMode::ReverseOrder>>(replicas_count);
return;
break;
}
if (progress_callback)
pimpl->setProgressCallback(std::move(progress_callback));
}
ParallelReplicasReadingCoordinator::ParallelReplicasReadingCoordinator(size_t replicas_count_) : replicas_count(replicas_count_) {}
ParallelReplicasReadingCoordinator::~ParallelReplicasReadingCoordinator() = default;
void ParallelReplicasReadingCoordinator::setProgressCallback(ProgressCallback callback)
{
// store callback since pimpl can be not instantiated yet
progress_callback = std::move(callback);
if (pimpl)
pimpl->setProgressCallback(std::move(progress_callback));
}
}

View File

@ -6,10 +6,10 @@
namespace DB
{
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
/// The main class to spread mark ranges across replicas dynamically
/// The reason why it uses pimpl - this header file is included in
/// multiple other files like Context or RemoteQueryExecutor
class ParallelReplicasReadingCoordinator
{
public:
@ -27,6 +27,9 @@ public:
/// "pending" state waiting for the unavailable replica to send the announcement.
void markReplicaAsUnavailable(size_t replica_number);
/// needed to report total rows to read
void setProgressCallback(ProgressCallback callback);
private:
void initialize();
@ -35,6 +38,7 @@ private:
CoordinationMode mode{CoordinationMode::Default};
std::atomic<bool> initialized{false};
std::unique_ptr<ImplInterface> pimpl;
ProgressCallback progress_callback; // store the callback only to bypass it to coordinator implementation
};
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;

View File

@ -32,6 +32,7 @@ void RangesInDataPartDescription::serialize(WriteBuffer & out) const
{
info.serialize(out);
ranges.serialize(out);
writeVarUInt(rows, out);
}
String RangesInDataPartDescription::describe() const
@ -45,6 +46,7 @@ void RangesInDataPartDescription::deserialize(ReadBuffer & in)
{
info.deserialize(in);
ranges.deserialize(in);
readVarUInt(rows, in);
}
void RangesInDataPartsDescription::serialize(WriteBuffer & out) const
@ -82,6 +84,7 @@ RangesInDataPartDescription RangesInDataPart::getDescription() const
return RangesInDataPartDescription{
.info = data_part->info,
.ranges = ranges,
.rows = getRowsCount(),
};
}

View File

@ -21,6 +21,7 @@ struct RangesInDataPartDescription
{
MergeTreePartInfo info;
MarkRanges ranges;
size_t rows = 0;
void serialize(WriteBuffer & out) const;
String describe() const;

View File

@ -0,0 +1,8 @@
3000 1000 3999 2499.5
1
1998 2944475297004403859
1999 254596732598015005
2000 6863370867519437063
2001 17844331710293705251
2002 1587587338113897332
1

View File

@ -0,0 +1,38 @@
DROP TABLE IF EXISTS t1 SYNC;
DROP TABLE IF EXISTS t2 SYNC;
DROP TABLE IF EXISTS t3 SYNC;
CREATE TABLE t1(k UInt32, v String) ENGINE ReplicatedMergeTree('/parallel_replicas/{database}/test_tbl', 'r1') ORDER BY k;
CREATE TABLE t2(k UInt32, v String) ENGINE ReplicatedMergeTree('/parallel_replicas/{database}/test_tbl', 'r2') ORDER BY k;
CREATE TABLE t3(k UInt32, v String) ENGINE ReplicatedMergeTree('/parallel_replicas/{database}/test_tbl', 'r3') ORDER BY k;
insert into t1 select number, toString(number) from numbers(1000, 1000);
insert into t2 select number, toString(number) from numbers(2000, 1000);
insert into t3 select number, toString(number) from numbers(3000, 1000);
system sync replica t1;
system sync replica t2;
system sync replica t3;
SET allow_experimental_parallel_reading_from_replicas=1, max_parallel_replicas=3, use_hedged_requests=0, parallel_replicas_for_non_replicated_merge_tree=1, cluster_for_parallel_replicas='test_cluster_one_shard_three_replicas_localhost';
-- default coordinator
SELECT count(), min(k), max(k), avg(k) FROM t1 SETTINGS log_comment='02898_default_190aed82-2423-413b-ad4c-24dcca50f65b';
-- check logs
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_default_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Total rows to read: 3000%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
-- reading in order coordinator
SELECT k, sipHash64(v) FROM t1 order by k limit 5 offset 998 SETTINGS optimize_read_in_order=1, log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b';
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log
WHERE query_id in (select query_id from system.query_log where current_database = currentDatabase() AND log_comment='02898_inorder_190aed82-2423-413b-ad4c-24dcca50f65b')
AND message LIKE '%Updated total rows to read: added % rows, total 3000 rows%' SETTINGS allow_experimental_parallel_reading_from_replicas=0;
DROP TABLE t1 SYNC;
DROP TABLE t2 SYNC;
DROP TABLE t3 SYNC;