fix hang of queries with a lot of subqueries to same mysql table

This commit is contained in:
Anton Popov 2020-09-25 14:12:48 +03:00
parent 8adb9c9224
commit 085f63a9bf
4 changed files with 107 additions and 14 deletions

View File

@ -23,13 +23,27 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
} }
MySQLBlockInputStream::Connection::Connection(
const mysqlxx::PoolWithFailover::Entry & entry_,
const std::string & query_str)
: entry(entry_)
, query{entry->query(query_str)}
, result{query.use()}
{
}
MySQLBlockInputStream::MySQLBlockInputStream( MySQLBlockInputStream::MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_) const mysqlxx::PoolWithFailover::Entry & entry,
: entry{entry_}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size_}, auto_close{auto_close_} const std::string & query_str,
const Block & sample_block,
const UInt64 max_block_size_,
const bool auto_close_)
: connection{std::make_unique<Connection>(entry, query_str)}
, max_block_size{max_block_size_}
, auto_close{auto_close_}
{ {
if (sample_block.columns() != result.getNumFields()) if (sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(sample_block.columns()) + " expected", + toString(sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
@ -106,11 +120,11 @@ namespace
Block MySQLBlockInputStream::readImpl() Block MySQLBlockInputStream::readImpl()
{ {
auto row = result.fetch(); auto row = connection->result.fetch();
if (!row) if (!row)
{ {
if (auto_close) if (auto_close)
entry.disconnect(); connection->entry.disconnect();
return {}; return {};
} }
@ -145,11 +159,42 @@ Block MySQLBlockInputStream::readImpl()
if (num_rows == max_block_size) if (num_rows == max_block_size)
break; break;
row = result.fetch(); row = connection->result.fetch();
} }
return description.sample_block.cloneWithColumns(std::move(columns)); return description.sample_block.cloneWithColumns(std::move(columns));
} }
MySQLBlockInputStream::MySQLBlockInputStream(
const Block & sample_block_,
UInt64 max_block_size_,
bool auto_close_)
: max_block_size(max_block_size_)
, auto_close(auto_close_)
{
description.init(sample_block_);
}
MySQLLazyBlockInputStream::MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_)
: MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_)
, pool(pool_)
, query_str(query_str_)
{
}
void MySQLLazyBlockInputStream::readPrefix()
{
connection = std::make_unique<Connection>(pool.get(), query_str);
if (description.sample_block.columns() != connection->result.getNumFields())
throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while "
+ toString(description.sample_block.columns()) + " expected",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH};
}
} }
#endif #endif

View File

@ -10,12 +10,13 @@
namespace DB namespace DB
{ {
/// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining /// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining
class MySQLBlockInputStream final : public IBlockInputStream class MySQLBlockInputStream : public IBlockInputStream
{ {
public: public:
MySQLBlockInputStream( MySQLBlockInputStream(
const mysqlxx::PoolWithFailover::Entry & entry_, const mysqlxx::PoolWithFailover::Entry & entry,
const std::string & query_str, const std::string & query_str,
const Block & sample_block, const Block & sample_block,
const UInt64 max_block_size_, const UInt64 max_block_size_,
@ -25,15 +26,43 @@ public:
Block getHeader() const override { return description.sample_block.cloneEmpty(); } Block getHeader() const override { return description.sample_block.cloneEmpty(); }
private: protected:
MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_);
Block readImpl() override; Block readImpl() override;
mysqlxx::PoolWithFailover::Entry entry; struct Connection
mysqlxx::Query query; {
mysqlxx::UseQueryResult result; Connection(const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str);
mysqlxx::PoolWithFailover::Entry entry;
mysqlxx::Query query;
mysqlxx::UseQueryResult result;
};
std::unique_ptr<Connection> connection;
const UInt64 max_block_size; const UInt64 max_block_size;
const bool auto_close; const bool auto_close;
ExternalResultDescription description; ExternalResultDescription description;
}; };
/// Like MySQLBlockInputStream, but allocates connection only when reading is starting.
/// It allows to create a lot of stream objects without occupation of all connection pool.
class MySQLLazyBlockInputStream final : public MySQLBlockInputStream
{
public:
MySQLLazyBlockInputStream(
mysqlxx::Pool & pool_,
const std::string & query_str_,
const Block & sample_block_,
const UInt64 max_block_size_,
const bool auto_close_ = false);
private:
void readPrefix() override;
mysqlxx::Pool & pool;
std::string query_str;
};
} }

View File

@ -96,7 +96,7 @@ Pipe StorageMySQL::read(
/// TODO: rewrite MySQLBlockInputStream /// TODO: rewrite MySQLBlockInputStream
return Pipe(std::make_shared<SourceFromInputStream>( return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLBlockInputStream>(pool.get(), query, sample_block, max_block_size_))); std::make_shared<MySQLLazyBlockInputStream>(pool, query, sample_block, max_block_size_, /* auto_close = */ true)));
} }

View File

@ -33,6 +33,25 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def test_many_connections(started_cluster):
table_name = 'test_many_connections'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node1.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name))
query = "SELECT count() FROM ("
for i in range (24):
query += "SELECT id FROM {t} UNION ALL "
query += "SELECT id FROM {t})"
assert node1.query(query.format(t=table_name)) == '250\n'
conn.close()
def test_insert_select(started_cluster): def test_insert_select(started_cluster):
table_name = 'test_insert_select' table_name = 'test_insert_select'
conn = get_mysql_conn() conn = get_mysql_conn()