dbms: addition to prev. revision [#METR-10500].

This commit is contained in:
Alexey Milovidov 2014-04-07 04:09:19 +04:00
parent 763f354b3d
commit a3efebda9d

View File

@ -15,12 +15,8 @@ namespace DB
*/
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(pool_entry_), query(query_),
external_tables(external_tables_), stage(stage_)
private:
void init(const Settings * settings_)
{
if (settings_)
{
@ -30,20 +26,29 @@ public:
else
send_settings = false;
}
public:
/// Принимает готовое соединение.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: connection(&connection_), query(query_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
}
/// Принимает готовое соединение. Захватывает владение соединением из пула.
RemoteBlockInputStream(ConnectionPool::Entry & pool_entry_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool_entry(pool_entry_), connection(&*pool_entry_), query(query_), external_tables(external_tables_), stage(stage_)
{
init(settings_);
}
/// Принимает пул, из которого нужно будет достать соединение.
RemoteBlockInputStream(IConnectionPool * pool_, const String & query_, const Settings * settings_,
const Tables & external_tables_ = Tables(), QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete)
: pool(pool_), query(query_),
external_tables(external_tables_), stage(stage_)
: pool(pool_), query(query_), external_tables(external_tables_), stage(stage_)
{
if (settings_)
{
send_settings = true;
settings = *settings_;
}
else
send_settings = false;
init(settings_);
}
@ -114,7 +119,10 @@ protected:
{
/// Если надо - достаём соединение из пула.
if (pool)
connection = pool->get(send_settings ? &settings : nullptr);
{
pool_entry = pool->get(send_settings ? &settings : nullptr);
connection = &*pool_entry;
}
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
sendExternalTables();
@ -230,7 +238,8 @@ protected:
private:
IConnectionPool * pool = nullptr;
ConnectionPool::Entry connection;
ConnectionPool::Entry pool_entry;
Connection * connection = nullptr;
const String query;
bool send_settings;