From cfbc15c0b4b3789b2d12ff7f100ba866dfafcf0f Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Thu, 15 Jan 2015 18:05:03 +0300 Subject: [PATCH] dbms: Server: queries with several replicas: development [#METR-14410] --- dbms/include/DB/Client/ReplicasConnections.h | 4 ++ dbms/include/DB/Core/ErrorCodes.h | 1 + .../DB/DataStreams/RemoteBlockInputStream.h | 37 +++++++++++-------- dbms/src/Client/ReplicasConnections.cpp | 19 ++++++++++ 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/dbms/include/DB/Client/ReplicasConnections.h b/dbms/include/DB/Client/ReplicasConnections.h index ed4403e24ee..d5cfe3b0aa7 100644 --- a/dbms/include/DB/Client/ReplicasConnections.h +++ b/dbms/include/DB/Client/ReplicasConnections.h @@ -44,6 +44,10 @@ namespace DB std::string dumpAddresses() const; + size_t size() const; + + void sendExternalTablesData(std::vector & data); + private: using ConnectionHash = std::unordered_map; diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index 2ed5a0fee0b..410a3bde63b 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -274,6 +274,7 @@ namespace ErrorCodes INCOMPATIBLE_TYPE_OF_JOIN, NO_AVAILABLE_REPLICA, UNEXPECTED_REPLICA, + MISMATCH_REPLICAS_DATA_SOURCES, POCO_EXCEPTION = 1000, STD_EXCEPTION, diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 6c838c91e1f..cea246d923c 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -122,25 +122,32 @@ protected: /// Отправить на удаленные сервера все временные таблицы void sendExternalTables() { - ExternalTablesData res; - for (const auto & table : external_tables) + size_t count = use_many_replicas ? replicas_connections->size() : 1; + + std::vector instances; + instances.reserve(count); + + for (size_t i = 0; i < count; ++i) { - StoragePtr cur = table.second; - QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; - DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings, - stage, DEFAULT_BLOCK_SIZE, 1); - if (input.size() == 0) - res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first)); - else - res.push_back(std::make_pair(input[0], table.first)); + ExternalTablesData res; + for (const auto & table : external_tables) + { + StoragePtr cur = table.second; + QueryProcessingStage::Enum stage = QueryProcessingStage::Complete; + DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings, + stage, DEFAULT_BLOCK_SIZE, 1); + if (input.size() == 0) + res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first)); + else + res.push_back(std::make_pair(input[0], table.first)); + } + instances.push_back(std::move(res)); } + if (use_many_replicas) - { - /// XXX Отправить res по всем соединениям. - //replicas_connections->sendExternalTablesData(res); - } + replicas_connections->sendExternalTablesData(instances); else - connection->sendExternalTablesData(res); + connection->sendExternalTablesData(instances[0]); } Block readImpl() override diff --git a/dbms/src/Client/ReplicasConnections.cpp b/dbms/src/Client/ReplicasConnections.cpp index 788e802599d..425646a713a 100644 --- a/dbms/src/Client/ReplicasConnections.cpp +++ b/dbms/src/Client/ReplicasConnections.cpp @@ -236,4 +236,23 @@ namespace DB return os.str(); } + + size_t ReplicasConnections::size() const + { + return connection_hash.size(); + } + + void ReplicasConnections::sendExternalTablesData(std::vector & data) + { + if (data.size() != connection_hash.size()) + throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES); + + auto it = data.begin(); + for (auto & e : connection_hash) + { + Connection * connection = e.second.connection; + connection->sendExternalTablesData(*it); + ++it; + } + } }