From a3efebda9d60893638625bb915c8aa6a770bc9ed Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 7 Apr 2014 04:09:19 +0400 Subject: [PATCH] dbms: addition to prev. revision [#METR-10500]. --- .../DB/DataStreams/RemoteBlockInputStream.h | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index b749097c3b0..0fbbd76f696 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -15,12 +15,8 @@ namespace DB */ class RemoteBlockInputStream : public IProfilingBlockInputStream { -public: - /// Принимает готовое соединение. - RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_, - const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) - : connection(pool_entry_), query(query_), - external_tables(external_tables_), stage(stage_) +private: + void init(const Settings * settings_) { if (settings_) { @@ -30,20 +26,29 @@ public: else send_settings = false; } +public: + /// Принимает готовое соединение. + RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, + const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) + : connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_) + { + init(settings_); + } + + /// Принимает готовое соединение. Захватывает владение соединением из пула. + RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_, + const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) + : pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), external_tables(external_tables_), stage(stage_) + { + init(settings_); + } /// Принимает пул, из которого нужно будет достать соединение. RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_, const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) - : pool(pool_), query(query_), - external_tables(external_tables_), stage(stage_) + : pool(pool_), query(query_), external_tables(external_tables_), stage(stage_) { - if (settings_) - { - send_settings = true; - settings = *settings_; - } - else - send_settings = false; + init(settings_); } @@ -114,7 +119,10 @@ protected: { /// Если надо - достаём соединение из пула. if (pool) - connection = pool->get(send_settings ? &settings : nullptr); + { + pool_entry = pool->get(send_settings ? &settings : nullptr); + connection = &*pool_entry; + } connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true); sendExternalTables(); @@ -230,7 +238,8 @@ protected: private: IConnectionPool * pool = nullptr; - ConnectionPool::Entry connection; + ConnectionPool::Entry pool_entry; + Connection * connection = nullptr; const String query; bool send_settings;