Merge pull request #23805 from ClickHouse/fix-hedged-requests-ext-tables

Fix sending external table for hedged connections.
This commit is contained in:
alexey-milovidov 2021-05-01 08:13:13 +03:00 committed by GitHub
commit 7ca42d2ce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 24 additions and 14 deletions

View File

@ -680,8 +680,12 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
PipelineExecutorPtr executor; PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); }; auto on_cancel = [& executor]() { executor->cancel(); };
if (!elem->pipe)
elem->pipe = elem->creating_pipe_callback();
QueryPipeline pipeline; QueryPipeline pipeline;
pipeline.init(std::move(*elem->pipe)); pipeline.init(std::move(*elem->pipe));
elem->pipe.reset();
pipeline.resize(1); pipeline.resize(1);
auto sink = std::make_shared<ExternalTableDataSink>(pipeline.getHeader(), *this, *elem, std::move(on_cancel)); auto sink = std::make_shared<ExternalTableDataSink>(pipeline.getHeader(), *this, *elem, std::move(on_cancel));
pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr

View File

@ -41,6 +41,7 @@ struct ExternalTableData
/// Pipe of data form table; /// Pipe of data form table;
std::unique_ptr<Pipe> pipe; std::unique_ptr<Pipe> pipe;
std::string table_name; std::string table_name;
std::function<std::unique_ptr<Pipe>()> creating_pipe_callback;
/// Flag if need to stop reading. /// Flag if need to stop reading.
std::atomic_bool is_cancelled = false; std::atomic_bool is_cancelled = false;
}; };

View File

@ -458,8 +458,6 @@ void RemoteQueryExecutor::sendScalars()
void RemoteQueryExecutor::sendExternalTables() void RemoteQueryExecutor::sendExternalTables()
{ {
SelectQueryInfo query_info;
size_t count = connections->size(); size_t count = connections->size();
{ {
@ -474,24 +472,29 @@ void RemoteQueryExecutor::sendExternalTables()
for (const auto & table : external_tables) for (const auto & table : external_tables)
{ {
StoragePtr cur = table.second; StoragePtr cur = table.second;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
auto data = std::make_unique<ExternalTableData>(); auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first; data->table_name = table.first;
data->creating_pipe_callback = [cur, context = this->context]()
{
SelectQueryInfo query_info;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
if (pipe.empty()) Pipe pipe = cur->read(
data->pipe = std::make_unique<Pipe>( metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (pipe.empty())
return std::make_unique<Pipe>(
std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), Chunk())); std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), Chunk()));
else
data->pipe = std::make_unique<Pipe>(std::move(pipe));
return std::make_unique<Pipe>(std::move(pipe));
};
data->pipe = data->creating_pipe_callback();
res.emplace_back(std::move(data)); res.emplace_back(std::move(data));
} }
external_tables_data.push_back(std::move(res)); external_tables_data.push_back(std::move(res));

View File

@ -0,0 +1 @@
select number from remote('127.0.0.{3|2}', numbers(2)) where number global in (select number from numbers(1)) settings async_socket_for_remote=1, use_hedged_requests = 1, sleep_in_send_data_ms=10, receive_data_timeout_ms=1;