From 63403c709ca836857c2ccaa07e26b926c8afa261 Mon Sep 17 00:00:00 2001 From: TCeason Date: Wed, 14 Apr 2021 13:41:15 +0800 Subject: [PATCH] modify settings name to external_xxx and rewrite Storage MySQL max_block_size --- src/Core/Settings.h | 4 ++-- src/Formats/MySQLBlockInputStream.cpp | 6 +++--- src/Formats/MySQLBlockInputStream.h | 4 ++-- src/Storages/StorageMySQL.cpp | 5 ++++- .../configs/users_disable_bytes_settings.xml | 4 ++-- .../configs/users_disable_rows_settings.xml | 4 ++-- .../test_storage_mysql/configs/users.xml | 18 +++++++++++++++++ tests/integration/test_storage_mysql/test.py | 20 +++++++++++++++++++ 8 files changed, 53 insertions(+), 12 deletions(-) create mode 100644 tests/integration/test_storage_mysql/configs/users.xml diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 6cd3e75c03d..be3a7ce739b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -445,8 +445,8 @@ class IColumn; M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \ M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \ M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \ - M(UInt64, max_read_mysql_rows, DEFAULT_BLOCK_SIZE, "Limit maximum number of rows when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ - M(UInt64, max_read_mysql_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Limit maximum number of bytes when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ + M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ + M(UInt64, external_storage_max_read_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Limit maximum number of bytes when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/src/Formats/MySQLBlockInputStream.cpp b/src/Formats/MySQLBlockInputStream.cpp index 10ebf89e949..445b159dc4b 100644 --- a/src/Formats/MySQLBlockInputStream.cpp +++ b/src/Formats/MySQLBlockInputStream.cpp @@ -31,8 +31,8 @@ namespace ErrorCodes } StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_) - : max_read_mysql_rows(settings.max_read_mysql_rows) - , max_read_bytes_size(settings.max_read_mysql_bytes) + : max_read_mysql_row_nums(settings.external_storage_max_read_rows) + , max_read_mysql_bytes_size(settings.external_storage_max_read_bytes) , auto_close(auto_close_) , fetch_by_name(fetch_by_name_) , default_num_tries_on_connection_loss(max_retry_) @@ -258,7 +258,7 @@ Block MySQLBlockInputStream::readImpl() } ++num_rows; - if (num_rows == settings->max_read_mysql_rows || (settings->max_read_bytes_size && read_bytes_size >= settings->max_read_bytes_size)) + if (num_rows == settings->max_read_mysql_row_nums || (settings->max_read_mysql_bytes_size && read_bytes_size >= settings->max_read_mysql_bytes_size)) break; row = connection->result.fetch(); diff --git a/src/Formats/MySQLBlockInputStream.h b/src/Formats/MySQLBlockInputStream.h index d6a16379874..4190addc71d 100644 --- a/src/Formats/MySQLBlockInputStream.h +++ b/src/Formats/MySQLBlockInputStream.h @@ -13,8 +13,8 @@ namespace DB struct StreamSettings { - size_t max_read_mysql_rows; - size_t max_read_bytes_size; + size_t max_read_mysql_row_nums; + size_t max_read_mysql_bytes_size; bool auto_close; bool fetch_by_name; size_t default_num_tries_on_connection_loss; diff --git a/src/Storages/StorageMySQL.cpp b/src/Storages/StorageMySQL.cpp index 67fbcfc781e..984682ac0a4 100644 --- a/src/Storages/StorageMySQL.cpp +++ b/src/Storages/StorageMySQL.cpp @@ -71,7 +71,7 @@ Pipe StorageMySQL::read( SelectQueryInfo & query_info_, ContextPtr context_, QueryProcessingStage::Enum /*processed_stage*/, - size_t /*max_read_mysql_rows*/, + size_t max_block_size, unsigned) { metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); @@ -95,6 +95,9 @@ Pipe StorageMySQL::read( sample_block.insert({ column_data.type, column_data.name }); } + if (!context_->getSettingsRef().external_storage_max_read_rows) + context_->setSetting("external_storage_max_read_rows", max_block_size); + StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false); return Pipe(std::make_shared( std::make_shared(pool, query, sample_block, mysql_input_stream_settings))); diff --git a/tests/integration/test_materialize_mysql_database/configs/users_disable_bytes_settings.xml b/tests/integration/test_materialize_mysql_database/configs/users_disable_bytes_settings.xml index cb5a38a57c9..4516cb80c17 100644 --- a/tests/integration/test_materialize_mysql_database/configs/users_disable_bytes_settings.xml +++ b/tests/integration/test_materialize_mysql_database/configs/users_disable_bytes_settings.xml @@ -4,8 +4,8 @@ 1 Atomic - 1 - 0 + 1 + 0 diff --git a/tests/integration/test_materialize_mysql_database/configs/users_disable_rows_settings.xml b/tests/integration/test_materialize_mysql_database/configs/users_disable_rows_settings.xml index a245b8e978b..dea20eb9e12 100644 --- a/tests/integration/test_materialize_mysql_database/configs/users_disable_rows_settings.xml +++ b/tests/integration/test_materialize_mysql_database/configs/users_disable_rows_settings.xml @@ -4,8 +4,8 @@ 1 Atomic - 0 - 1 + 0 + 1 diff --git a/tests/integration/test_storage_mysql/configs/users.xml b/tests/integration/test_storage_mysql/configs/users.xml new file mode 100644 index 00000000000..27c4d46984e --- /dev/null +++ b/tests/integration/test_storage_mysql/configs/users.xml @@ -0,0 +1,18 @@ + + + + + 2 + + + + + + + + ::/0 + + default + + + diff --git a/tests/integration/test_storage_mysql/test.py b/tests/integration/test_storage_mysql/test.py index 4520b3f3837..9c3abd799af 100644 --- a/tests/integration/test_storage_mysql/test.py +++ b/tests/integration/test_storage_mysql/test.py @@ -9,6 +9,7 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True) node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_mysql_cluster=True) +node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.xml'], with_mysql=True) create_table_sql_template = """ CREATE TABLE `clickhouse`.`{}` ( @@ -260,6 +261,25 @@ def test_mysql_distributed(started_cluster): assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n') +def test_external_settings(started_cluster): + table_name = 'test_external_settings' + conn = get_mysql_conn() + create_mysql_table(conn, table_name) + + node3.query(''' +CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse'); +'''.format(table_name, table_name)) + node3.query( + "INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(100) ".format( + table_name)) + assert node3.query("SELECT count() FROM {}".format(table_name)).rstrip() == '100' + assert node3.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '300' + node3.query("select value from system.settings where name = 'max_block_size' FORMAT TSV") == "2\n" + node3.query("select value from system.settings where name = 'external_storage_max_read_rows' FORMAT TSV") == "0\n" + assert node3.query("SELECT COUNT(DISTINCT blockNumber()) FROM {} FORMAT TSV".format(table_name)) == '50\n' + conn.close() + + if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: for name, instance in list(cluster.instances.items()):