Merge pull request #61784 from ClickHouse/stream_system_replicas

Stream rows when reading from system.replicas
This commit is contained in:
Alexander Gololobov 2024-04-01 09:47:55 +02:00 committed by GitHub
commit c297e72a4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 96 additions and 20 deletions

View File

@ -261,6 +261,7 @@ public:
Block sample_block,
std::map<String, std::map<String, StoragePtr>> replicated_tables_,
bool with_zk_fields_,
size_t max_block_size_,
std::shared_ptr<StorageSystemReplicasImpl> impl_)
: SourceStepWithFilter(
DataStream{.header = std::move(sample_block)},
@ -270,6 +271,7 @@ public:
context_)
, replicated_tables(std::move(replicated_tables_))
, with_zk_fields(with_zk_fields_)
, max_block_size(max_block_size_)
, impl(std::move(impl_))
{
}
@ -279,6 +281,7 @@ public:
private:
std::map<String, std::map<String, StoragePtr>> replicated_tables;
const bool with_zk_fields;
const size_t max_block_size;
std::shared_ptr<StorageSystemReplicasImpl> impl;
const ActionsDAG::Node * predicate = nullptr;
};
@ -297,7 +300,7 @@ void StorageSystemReplicas::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const size_t max_block_size,
const size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
@ -348,11 +351,51 @@ void StorageSystemReplicas::read(
auto header = storage_snapshot->metadata->getSampleBlock();
auto reading = std::make_unique<ReadFromSystemReplicas>(
column_names, query_info, storage_snapshot,
std::move(context), std::move(header), std::move(replicated_tables), with_zk_fields, impl); // /*std::move(this_ptr),*/ std::move(columns_mask), max_block_size);
std::move(context), std::move(header), std::move(replicated_tables), with_zk_fields, max_block_size, impl);
query_plan.addStep(std::move(reading));
}
class SystemReplicasSource : public ISource
{
public:
SystemReplicasSource(
Block header_,
size_t max_block_size_,
ColumnPtr col_database_,
ColumnPtr col_table_,
ColumnPtr col_engine_,
std::vector<std::shared_future<ReplicatedTableStatus>> futures_,
ContextPtr context_)
: ISource(header_)
, max_block_size(max_block_size_)
, col_database(std::move(col_database_))
, col_table(std::move(col_table_))
, col_engine(std::move(col_engine_))
, futures(std::move(futures_))
, context(std::move(context_))
{
}
String getName() const override { return "SystemReplicas"; }
protected:
Chunk generate() override;
private:
const size_t max_block_size;
/// Columns with table metadata.
ColumnPtr col_database;
ColumnPtr col_table;
ColumnPtr col_engine;
/// Futures for the status of each table.
std::vector<std::shared_future<ReplicatedTableStatus>> futures;
ContextPtr context;
/// Index (row number) of the next table to process.
size_t i = 0;
};
void ReadFromSystemReplicas::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto header = getOutputStream().header;
@ -398,8 +441,6 @@ void ReadFromSystemReplicas::initializePipeline(QueryPipelineBuilder & pipeline,
col_engine = filtered_block.getByName("engine").column;
}
MutableColumns res_columns = storage_snapshot->metadata->getSampleBlock().cloneEmptyColumns();
size_t tables_size = col_database->size();
/// Use separate queues for requests with and without ZooKeeper fields.
@ -426,11 +467,46 @@ void ReadFromSystemReplicas::initializePipeline(QueryPipelineBuilder & pipeline,
/// If there are more requests, they will be scheduled by the query that needs them.
get_status_requests.scheduleRequests(max_request_id, query_status);
for (size_t i = 0; i < tables_size; ++i)
pipeline.init(Pipe(std::make_shared<SystemReplicasSource>(header, max_block_size, col_database, col_table, col_engine, std::move(futures), context)));
}
Chunk SystemReplicasSource::generate()
{
if (i == futures.size())
return {};
QueryStatusPtr query_status = context ? context->getProcessListElement() : nullptr;
MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
bool rows_added = false;
for (; i < futures.size(); ++i)
{
if (query_status)
query_status->checkTimeLimit();
if (rows_added)
{
/// Return current chunk if the next future is not ready yet
if (futures[i].wait_for(std::chrono::seconds(0)) != std::future_status::ready)
break;
if (max_block_size != 0)
{
size_t total_size = 0;
for (const auto & column : res_columns)
total_size += column->byteSize();
/// If the block size exceeds the maximum, return the current block
if (total_size >= max_block_size)
break;
}
}
res_columns[0]->insert((*col_database)[i]);
res_columns[1]->insert((*col_table)[i]);
res_columns[2]->insert((*col_engine)[i]);
const auto & status = futures[i].get();
size_t col_num = 3;
res_columns[col_num++]->insert(status.is_leader);
@ -476,23 +552,12 @@ void ReadFromSystemReplicas::initializePipeline(QueryPipelineBuilder & pipeline,
}
res_columns[col_num++]->insert(std::move(replica_is_active_values));
rows_added = true;
}
Columns fin_columns;
fin_columns.reserve(res_columns.size());
for (auto & col : res_columns)
fin_columns.emplace_back(std::move(col));
fin_columns[0] = std::move(col_database);
fin_columns[1] = std::move(col_table);
fin_columns[2] = std::move(col_engine);
UInt64 num_rows = fin_columns.at(0)->size();
Chunk chunk(std::move(fin_columns), num_rows);
pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(header, std::move(chunk))));
UInt64 num_rows = res_columns.at(0)->size();
return Chunk(std::move(res_columns), num_rows);
}
}

View File

@ -1,4 +1,8 @@
Creating 300 tables
900 1 1
900 1 1
900 1 1
900 1 1
Making 200 requests to system.replicas
Query system.replicas while waiting for other concurrent requests to finish
0

View File

@ -45,6 +45,13 @@ done
wait;
# Check results with different max_block_size
$CLICKHOUSE_CLIENT -q 'SELECT count(), sum(total_replicas) >= 2700, sum(active_replicas) >= 2700 FROM system.replicas WHERE database=currentDatabase()'
$CLICKHOUSE_CLIENT -q 'SELECT count(), sum(total_replicas) >= 2700, sum(active_replicas) >= 2700 FROM system.replicas WHERE database=currentDatabase() SETTINGS max_block_size=1'
$CLICKHOUSE_CLIENT -q 'SELECT count(), sum(total_replicas) >= 2700, sum(active_replicas) >= 2700 FROM system.replicas WHERE database=currentDatabase() SETTINGS max_block_size=77'
$CLICKHOUSE_CLIENT -q 'SELECT count(), sum(total_replicas) >= 2700, sum(active_replicas) >= 2700 FROM system.replicas WHERE database=currentDatabase() SETTINGS max_block_size=11111'
echo "Making $CONCURRENCY requests to system.replicas"
for i in $(seq 1 $CONCURRENCY)