Fix sending external table for hedged connections.

This commit is contained in:
Nikolai Kochetov 2021-04-30 17:19:48 +03:00
parent fa5cea7a0a
commit 352ec049ad
5 changed files with 25 additions and 13 deletions

View File

@ -680,8 +680,12 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
PipelineExecutorPtr executor;
auto on_cancel = [& executor]() { executor->cancel(); };
if (!elem->pipe)
elem->pipe = elem->creating_pipe_callback();
QueryPipeline pipeline;
pipeline.init(std::move(*elem->pipe));
elem->pipe.reset();
pipeline.resize(1);
auto sink = std::make_shared<ExternalTableDataSink>(pipeline.getHeader(), *this, *elem, std::move(on_cancel));
pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr

View File

@ -41,6 +41,7 @@ struct ExternalTableData
/// Pipe of data form table;
std::unique_ptr<Pipe> pipe;
std::string table_name;
std::function<std::unique_ptr<Pipe>()> creating_pipe_callback;
/// Flag if need to stop reading.
std::atomic_bool is_cancelled = false;
};

View File

@ -458,8 +458,6 @@ void RemoteQueryExecutor::sendScalars()
void RemoteQueryExecutor::sendExternalTables()
{
SelectQueryInfo query_info;
size_t count = connections->size();
{
@ -474,24 +472,31 @@ void RemoteQueryExecutor::sendExternalTables()
for (const auto & table : external_tables)
{
StoragePtr cur = table.second;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
auto data = std::make_unique<ExternalTableData>();
data->table_name = table.first;
data->creating_pipe_callback = [cur, context = this->context]()
{
SelectQueryInfo query_info;
auto metadata_snapshot = cur->getInMemoryMetadataPtr();
QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
context, QueryProcessingStage::Complete, query_info);
if (pipe.empty())
data->pipe = std::make_unique<Pipe>(
Pipe pipe = cur->read(
metadata_snapshot->getColumns().getNamesOfPhysical(),
metadata_snapshot, query_info, context,
read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
if (pipe.empty())
return std::make_unique<Pipe>(
std::make_shared<SourceFromSingleChunk>(metadata_snapshot->getSampleBlock(), Chunk()));
else
data->pipe = std::make_unique<Pipe>(std::move(pipe));
return std::make_unique<Pipe>(std::move(pipe));
};
data->pipe = data->creating_pipe_callback();
res.emplace_back(std::move(data));
}
external_tables_data.push_back(std::move(res));

View File

@ -0,0 +1 @@
select number from remote('127.0.0.{3|2}', numbers(2)) where number global in (select number from numbers(1)) settings async_socket_for_remote=1, use_hedged_requests = 1, sleep_in_send_data_ms=10, receive_data_timeout_ms=1;