From 085f63a9bfa6a6294451612b39b5d51c15f8f0aa Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 25 Sep 2020 14:12:48 +0300 Subject: [PATCH] fix hang of queries with a lot of subqueries to same mysql table --- src/Formats/MySQLBlockInputStream.cpp | 59 +++++++++++++++++--- src/Formats/MySQLBlockInputStream.h | 41 ++++++++++++-- src/Storages/StorageMySQL.cpp | 2 +- tests/integration/test_storage_mysql/test.py | 19 +++++++ 4 files changed, 107 insertions(+), 14 deletions(-) diff --git a/src/Formats/MySQLBlockInputStream.cpp b/src/Formats/MySQLBlockInputStream.cpp index be1e254b22f..73def337240 100644 --- a/src/Formats/MySQLBlockInputStream.cpp +++ b/src/Formats/MySQLBlockInputStream.cpp @@ -23,13 +23,27 @@ namespace ErrorCodes extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH; } +MySQLBlockInputStream::Connection::Connection( + const mysqlxx::PoolWithFailover::Entry & entry_, + const std::string & query_str) + : entry(entry_) + , query{entry->query(query_str)} + , result{query.use()} +{ +} MySQLBlockInputStream::MySQLBlockInputStream( - const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, const bool auto_close_) - : entry{entry_}, query{this->entry->query(query_str)}, result{query.use()}, max_block_size{max_block_size_}, auto_close{auto_close_} + const mysqlxx::PoolWithFailover::Entry & entry, + const std::string & query_str, + const Block & sample_block, + const UInt64 max_block_size_, + const bool auto_close_) + : connection{std::make_unique(entry, query_str)} + , max_block_size{max_block_size_} + , auto_close{auto_close_} { - if (sample_block.columns() != result.getNumFields()) - throw Exception{"mysqlxx::UseQueryResult contains " + toString(result.getNumFields()) + " columns while " + if (sample_block.columns() != connection->result.getNumFields()) + throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while " + toString(sample_block.columns()) + " expected", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; @@ -106,11 +120,11 @@ namespace Block MySQLBlockInputStream::readImpl() { - auto row = result.fetch(); + auto row = connection->result.fetch(); if (!row) { if (auto_close) - entry.disconnect(); + connection->entry.disconnect(); return {}; } @@ -145,11 +159,42 @@ Block MySQLBlockInputStream::readImpl() if (num_rows == max_block_size) break; - row = result.fetch(); + row = connection->result.fetch(); } return description.sample_block.cloneWithColumns(std::move(columns)); } +MySQLBlockInputStream::MySQLBlockInputStream( + const Block & sample_block_, + UInt64 max_block_size_, + bool auto_close_) + : max_block_size(max_block_size_) + , auto_close(auto_close_) +{ + description.init(sample_block_); +} + +MySQLLazyBlockInputStream::MySQLLazyBlockInputStream( + mysqlxx::Pool & pool_, + const std::string & query_str_, + const Block & sample_block_, + const UInt64 max_block_size_, + const bool auto_close_) + : MySQLBlockInputStream(sample_block_, max_block_size_, auto_close_) + , pool(pool_) + , query_str(query_str_) +{ +} + +void MySQLLazyBlockInputStream::readPrefix() +{ + connection = std::make_unique(pool.get(), query_str); + if (description.sample_block.columns() != connection->result.getNumFields()) + throw Exception{"mysqlxx::UseQueryResult contains " + toString(connection->result.getNumFields()) + " columns while " + + toString(description.sample_block.columns()) + " expected", + ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH}; +} + } #endif diff --git a/src/Formats/MySQLBlockInputStream.h b/src/Formats/MySQLBlockInputStream.h index 238994acbd8..2eaeb5b8d59 100644 --- a/src/Formats/MySQLBlockInputStream.h +++ b/src/Formats/MySQLBlockInputStream.h @@ -10,12 +10,13 @@ namespace DB { + /// Allows processing results of a MySQL query as a sequence of Blocks, simplifies chaining -class MySQLBlockInputStream final : public IBlockInputStream +class MySQLBlockInputStream : public IBlockInputStream { public: MySQLBlockInputStream( - const mysqlxx::PoolWithFailover::Entry & entry_, + const mysqlxx::PoolWithFailover::Entry & entry, const std::string & query_str, const Block & sample_block, const UInt64 max_block_size_, @@ -25,15 +26,43 @@ public: Block getHeader() const override { return description.sample_block.cloneEmpty(); } -private: +protected: + MySQLBlockInputStream(const Block & sample_block_, UInt64 max_block_size_, bool auto_close_); Block readImpl() override; - mysqlxx::PoolWithFailover::Entry entry; - mysqlxx::Query query; - mysqlxx::UseQueryResult result; + struct Connection + { + Connection(const mysqlxx::PoolWithFailover::Entry & entry_, const std::string & query_str); + + mysqlxx::PoolWithFailover::Entry entry; + mysqlxx::Query query; + mysqlxx::UseQueryResult result; + }; + + std::unique_ptr connection; + const UInt64 max_block_size; const bool auto_close; ExternalResultDescription description; }; +/// Like MySQLBlockInputStream, but allocates connection only when reading is starting. +/// It allows to create a lot of stream objects without occupation of all connection pool. +class MySQLLazyBlockInputStream final : public MySQLBlockInputStream +{ +public: + MySQLLazyBlockInputStream( + mysqlxx::Pool & pool_, + const std::string & query_str_, + const Block & sample_block_, + const UInt64 max_block_size_, + const bool auto_close_ = false); + +private: + void readPrefix() override; + + mysqlxx::Pool & pool; + std::string query_str; +}; + } diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index b2e622663c0..afbca0d9430 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -96,7 +96,7 @@ Pipe StorageMySQL::read( /// TODO: rewrite MySQLBlockInputStream return Pipe(std::make_shared( - std::make_shared(pool.get(), query, sample_block, max_block_size_))); + std::make_shared(pool, query, sample_block, max_block_size_, /* auto_close = */ true))); } diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index 0b73866eaee..83ef1e6c86a 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -33,6 +33,25 @@ def started_cluster(): cluster.shutdown() +def test_many_connections(started_cluster): + table_name = 'test_many_connections' + conn = get_mysql_conn() + create_mysql_table(conn, table_name) + + node1.query(''' +CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse'); +'''.format(table_name, table_name)) + + node1.query("INSERT INTO {} (id, name) SELECT number, concat('name_', toString(number)) from numbers(10) ".format(table_name)) + + query = "SELECT count() FROM (" + for i in range (24): + query += "SELECT id FROM {t} UNION ALL " + query += "SELECT id FROM {t})" + + assert node1.query(query.format(t=table_name)) == '250\n' + conn.close() + def test_insert_select(started_cluster): table_name = 'test_insert_select' conn = get_mysql_conn()