diff --git a/src/Client/Connection.cpp b/src/Client/Connection.cpp index 70d8109545b..d26f7454bcc 100644 --- a/src/Client/Connection.cpp +++ b/src/Client/Connection.cpp @@ -680,8 +680,12 @@ void Connection::sendExternalTablesData(ExternalTablesData & data) PipelineExecutorPtr executor; auto on_cancel = [& executor]() { executor->cancel(); }; + if (!elem->pipe) + elem->pipe = elem->creating_pipe_callback(); + QueryPipeline pipeline; pipeline.init(std::move(*elem->pipe)); + elem->pipe.reset(); pipeline.resize(1); auto sink = std::make_shared(pipeline.getHeader(), *this, *elem, std::move(on_cancel)); pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr diff --git a/src/Client/Connection.h b/src/Client/Connection.h index 6c7edfb2761..80dbd9ed44e 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -41,6 +41,7 @@ struct ExternalTableData /// Pipe of data form table; std::unique_ptr pipe; std::string table_name; + std::function()> creating_pipe_callback; /// Flag if need to stop reading. std::atomic_bool is_cancelled = false; }; diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 0961dd41458..d724603db63 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -458,8 +458,6 @@ void RemoteQueryExecutor::sendScalars() void RemoteQueryExecutor::sendExternalTables() { - SelectQueryInfo query_info; - size_t count = connections->size(); { @@ -474,24 +472,31 @@ void RemoteQueryExecutor::sendExternalTables() for (const auto & table : external_tables) { StoragePtr cur = table.second; - auto metadata_snapshot = cur->getInMemoryMetadataPtr(); - QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( - context, QueryProcessingStage::Complete, query_info); - Pipe pipe = cur->read( - metadata_snapshot->getColumns().getNamesOfPhysical(), - metadata_snapshot, query_info, context, - read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); + auto data = std::make_unique(); data->table_name = table.first; + data->creating_pipe_callback = [cur, context = this->context]() + { + SelectQueryInfo query_info; + auto metadata_snapshot = cur->getInMemoryMetadataPtr(); + QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( + context, QueryProcessingStage::Complete, query_info); - if (pipe.empty()) - data->pipe = std::make_unique( + Pipe pipe = cur->read( + metadata_snapshot->getColumns().getNamesOfPhysical(), + metadata_snapshot, query_info, context, + read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); + + if (pipe.empty()) + return std::make_unique( std::make_shared(metadata_snapshot->getSampleBlock(), Chunk())); - else - data->pipe = std::make_unique(std::move(pipe)); + return std::make_unique(std::move(pipe)); + }; + + data->pipe = data->creating_pipe_callback(); res.emplace_back(std::move(data)); } external_tables_data.push_back(std::move(res)); diff --git a/tests/queries/0_stateless/01851_hedged_connections_external_tables.reference b/tests/queries/0_stateless/01851_hedged_connections_external_tables.reference new file mode 100644 index 00000000000..573541ac970 --- /dev/null +++ b/tests/queries/0_stateless/01851_hedged_connections_external_tables.reference @@ -0,0 +1 @@ +0 diff --git a/tests/queries/0_stateless/01851_hedged_connections_external_tables.sql b/tests/queries/0_stateless/01851_hedged_connections_external_tables.sql new file mode 100644 index 00000000000..c4625720e59 --- /dev/null +++ b/tests/queries/0_stateless/01851_hedged_connections_external_tables.sql @@ -0,0 +1 @@ +select number from remote('127.0.0.{3|2}', numbers(2)) where number global in (select number from numbers(1)) settings async_socket_for_remote=1, use_hedged_requests = 1, sleep_in_send_data_ms=10, receive_data_timeout_ms=1;