Close MySQL connections after usage in MySQLDictionarySource

This commit is contained in:
Clément Rodriguez 2019-05-23 15:09:07 +02:00
parent 6835a5f236
commit 7b40d37212
5 changed files with 19 additions and 11 deletions

View File

@ -6,6 +6,7 @@
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
@ -47,7 +48,6 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
@ -117,7 +117,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
last_modification = getLastModification();
LOG_TRACE(log, load_all_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, true);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
@ -126,7 +126,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, true);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -134,7 +134,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & i
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, true);
}
BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
@ -142,7 +142,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns,
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, true);
}
bool MySQLDictionarySource::isModified() const
@ -253,7 +253,7 @@ std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream block_input_stream(pool.Get(), request, invalidate_sample_block, 1);
MySQLBlockInputStream block_input_stream(pool.Get(), request, invalidate_sample_block, 1, true);
return readInvalidateQuery(block_input_stream);
}

View File

@ -20,8 +20,8 @@ namespace ErrorCodes
MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size)
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size}
const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size, const bool auto_close)
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size}, auto_close{auto_close}
{
if (sample_block.columns() != result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while "
@ -93,7 +93,11 @@ Block MySQLBlockInputStream::readImpl()
{
auto row = result.fetch();
if (!row)
{
if (auto_close)
entry.disconnect();
return {};
}
MutableColumns columns(description.sample_block.columns());
for (const auto i : ext::range(0, columns.size()))
@ -126,7 +130,8 @@ Block MySQLBlockInputStream::readImpl()
row = result.fetch();
}
if (auto_close)
entry.disconnect();
return description.sample_block.cloneWithColumns(std::move(columns));
}

View File

@ -18,7 +18,8 @@ public:
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size);
const UInt64 max_block_size,
const bool auto_close = false);
String getName() const override { return "MySQL"; }
@ -31,6 +32,7 @@ private:
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
const UInt64 max_block_size;
const bool auto_close;
ExternalResultDescription description;
};

View File

@ -110,6 +110,8 @@ public:
return "pool is null";
}
void disconnect() { if(data) { data->conn.disconnect();} };
friend class Pool;
private:

View File

@ -174,7 +174,6 @@ void Pool::Entry::forceConnected() const
throw Poco::RuntimeException("Tried to access NULL database connection.");
Poco::Util::Application & app = Poco::Util::Application::instance();
if (data->conn.ping())
return;