Merge pull request #15299 from CurtizJ/fix-mysql-hung

Fix hang of queries with a lot of subqueries to same mysql table
This commit is contained in:
Nikita Mikhaylov 2020-09-28 12:51:35 +03:00 committed by GitHub
commit 73573472e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 14 deletions

View File

@ -23,13 +23,27 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
MySQLBlockInputStream::Connection::Connection(
const mysqlxx::PoolWithFailover::Entry & entry_,
const std::string & query_str)
: entry(entry_)
, query{entry->query(query_str)}
, result{query.use()}
{
}
MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_)
: entry{entry_}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size_}, auto_close{auto_close_}
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_)
: connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_}
, auto_close{auto_close_}
{
if (sample_block.columns() != result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while "
if (sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
@ -106,11 +120,11 @@ namespace
Block MySQLBlockInputStream::readImpl()
{
auto row = result.fetch();
auto row = connection->result.fetch();
if (!row)
{
if (auto_close)
entry.disconnect();
connection->entry.disconnect();
return {};
}
@ -145,11 +159,42 @@ Block MySQLBlockInputStream::readImpl()
if (num_rows == max_block_size)
break;
row = result.fetch();
row = connection->result.fetch();
}
return description.sample_block.cloneWithColumns(std::move(columns));
}
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_)
: max_block_size(max_block_size_)
, auto_close(auto_close_)
{
description.init(sample_block_);
}
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_)
, pool(pool_)
, query_str(query_str_)
{
}
void MySQLLazyBlockInputStream::readPrefix()
{
connection = std::make_unique<Connection>(pool.get(), query_str);
if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(description.sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
}
}
#endif

View File

@ -10,12 +10,13 @@
namespace DB
{
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream final : public IBlockInputStream
class MySQLBlockInputStream : public IBlockInputStream
{
public:
MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry_,
const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
@ -25,15 +26,43 @@ public:
Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private:
protected:
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_);
Block readImpl() override;
mysqlxx::PoolWithFailover::Entry entry;
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
struct Connection
{
Connection(const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str);
mysqlxx::PoolWithFailover::Entry entry;
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
};
std::unique_ptr<Connection> connection;
const UInt64 max_block_size;
const bool auto_close;
ExternalResultDescription description;
};
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool.
class MySQLLazyBlockInputStream final : public MySQLBlockInputStream
{
public:
MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_ = false);
private:
void readPrefix() override;
mysqlxx::Pool & pool;
std::string query_str;
};
}

View File

@ -96,7 +96,7 @@ Pipe StorageMySQL::read(
/// TODO: rewrite MySQLBlockInputStream
return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size_)));
std::make_shared<MySQLLazyBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
}

View File

@ -33,6 +33,25 @@ def started_cluster():
cluster.shutdown()
def test_many_connections(started_cluster):
table_name = 'test_many_connections'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
query = "SELECT count() FROM ("
for i in range (24):
query += "SELECT id FROM {t} UNION ALL "
query += "SELECT id FROM {t})"
assert node1.query(query.format(t=table_name)) == '250\n'
conn.close()
def test_insert_select(started_cluster):
table_name = 'test_insert_select'
conn = get_mysql_conn()