Merge pull request #5395 from clemrodriguez/master

Close MySQL connections opened by external MySQL dictionaries
This commit is contained in:
alexey-milovidov 2019-05-25 14:13:51 +03:00 committed by GitHub
commit b79582e15a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 11 deletions

View File

@ -6,6 +6,7 @@
#include "DictionaryStructure.h"
namespace DB
{
namespace ErrorCodes
@ -47,7 +48,6 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
# include <Formats/MySQLBlockInputStream.h>
# include "readInvalidateQuery.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
@ -71,6 +71,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection{config.getBool(config_prefix + ".close_connection", false)}
{
}
@ -91,6 +92,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, last_modification{other.last_modification}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, close_connection{other.close_connection}
{
}
@ -117,7 +119,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadAll()
last_modification = getLastModification();
LOG_TRACE(log, load_all_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_all_query, sample_block, max_block_size, close_connection);
}
BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
@ -126,7 +128,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadUpdatedAll()
std::string load_update_query = getUpdateFieldAndDate();
LOG_TRACE(log, load_update_query);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), load_update_query, sample_block, max_block_size, close_connection);
}
BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & ids)
@ -134,7 +136,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadIds(const std::vector<UInt64> & i
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, close_connection);
}
BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
@ -142,7 +144,7 @@ BlockInputStreamPtr MySQLDictionarySource::loadKeys(const Columns & key_columns,
/// We do not log in here and do not update the modification time, as the request can be large, and often called.
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size);
return std::make_shared<MySQLBlockInputStream>(pool.Get(), query, sample_block, max_block_size, close_connection);
}
bool MySQLDictionarySource::isModified() const
@ -253,7 +255,7 @@ std::string MySQLDictionarySource::doInvalidateQuery(const std::string & request
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
MySQLBlockInputStream block_input_stream(pool.Get(), request, invalidate_sample_block, 1);
MySQLBlockInputStream block_input_stream(pool.Get(), request, invalidate_sample_block, 1, close_connection);
return readInvalidateQuery(block_input_stream);
}

View File

@ -81,6 +81,7 @@ private:
LocalDateTime last_modification;
std::string invalidate_query;
mutable std::string invalidate_query_response;
const bool close_connection;
};
}

View File

@ -20,8 +20,8 @@ namespace ErrorCodes
MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size)
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size}
const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size, const bool auto_close)
: entry{entry}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size}, auto_close{auto_close}
{
if (sample_block.columns() != result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while "
@ -93,7 +93,11 @@ Block MySQLBlockInputStream::readImpl()
{
auto row = result.fetch();
if (!row)
{
if (auto_close)
entry.disconnect();
return {};
}
MutableColumns columns(description.sample_block.columns());
for (const auto i : ext::range(0, columns.size()))
@ -126,7 +130,8 @@ Block MySQLBlockInputStream::readImpl()
row = result.fetch();
}
if (auto_close)
entry.disconnect();
return description.sample_block.cloneWithColumns(std::move(columns));
}

View File

@ -18,7 +18,8 @@ public:
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size);
const UInt64 max_block_size,
const bool auto_close = false);
String getName() const override { return "MySQL"; }
@ -31,6 +32,7 @@ private:
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
const UInt64 max_block_size;
const bool auto_close;
ExternalResultDescription description;
};

View File

@ -110,6 +110,8 @@ public:
return "pool is null";
}
void disconnect();
friend class Pool;
private:

View File

@ -168,13 +168,22 @@ Pool::Entry Pool::tryGet()
}
void Pool::Entry::disconnect()
{
if (data)
{
decrementRefCount();
data->conn.disconnect();
}
}
void Pool::Entry::forceConnected() const
{
if (data == nullptr)
throw Poco::RuntimeException("Tried to access NULL database connection.");
Poco::Util::Application & app = Poco::Util::Application::instance();
if (data->conn.ping())
return;