modify settings name to external_xxx and rewrite Storage MySQL max_block_size

This commit is contained in:
TCeason 2021-04-14 13:41:15 +08:00
parent 87aa904440
commit 63403c709c
8 changed files with 53 additions and 12 deletions

View File

@ -445,8 +445,8 @@ class IColumn;
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \ M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \ M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \ M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
M(UInt64, max_read_mysql_rows, DEFAULT_BLOCK_SIZE, "Limit maximum number of rows when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ M(UInt64, external_storage_max_read_rows, 0, "Limit maximum number of rows when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \
M(UInt64, max_read_mysql_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Limit maximum number of bytes when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \ M(UInt64, external_storage_max_read_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Limit maximum number of bytes when MaterializeMySQL should flush history data. If equal to 0, this setting is disabled", 0) \
\ \
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\ \

View File

@ -31,8 +31,8 @@ namespace ErrorCodes
} }
StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_) StreamSettings::StreamSettings(const Settings & settings, bool auto_close_, bool fetch_by_name_, size_t max_retry_)
: max_read_mysql_rows(settings.max_read_mysql_rows) : max_read_mysql_row_nums(settings.external_storage_max_read_rows)
, max_read_bytes_size(settings.max_read_mysql_bytes) , max_read_mysql_bytes_size(settings.external_storage_max_read_bytes)
, auto_close(auto_close_) , auto_close(auto_close_)
, fetch_by_name(fetch_by_name_) , fetch_by_name(fetch_by_name_)
, default_num_tries_on_connection_loss(max_retry_) , default_num_tries_on_connection_loss(max_retry_)
@ -258,7 +258,7 @@ Block MySQLBlockInputStream::readImpl()
} }
++num_rows; ++num_rows;
if (num_rows == settings->max_read_mysql_rows || (settings->max_read_bytes_size && read_bytes_size >= settings->max_read_bytes_size)) if (num_rows == settings->max_read_mysql_row_nums || (settings->max_read_mysql_bytes_size && read_bytes_size >= settings->max_read_mysql_bytes_size))
break; break;
row = connection->result.fetch(); row = connection->result.fetch();

View File

@ -13,8 +13,8 @@ namespace DB
struct StreamSettings struct StreamSettings
{ {
size_t max_read_mysql_rows; size_t max_read_mysql_row_nums;
size_t max_read_bytes_size; size_t max_read_mysql_bytes_size;
bool auto_close; bool auto_close;
bool fetch_by_name; bool fetch_by_name;
size_t default_num_tries_on_connection_loss; size_t default_num_tries_on_connection_loss;

View File

@ -71,7 +71,7 @@ Pipe StorageMySQL::read(
SelectQueryInfo & query_info_, SelectQueryInfo & query_info_,
ContextPtr context_, ContextPtr context_,
QueryProcessingStage::Enum /*processed_stage*/, QueryProcessingStage::Enum /*processed_stage*/,
size_t /*max_read_mysql_rows*/, size_t max_block_size,
unsigned) unsigned)
{ {
metadata_snapshot->check(column_names_, getVirtuals(), getStorageID()); metadata_snapshot->check(column_names_, getVirtuals(), getStorageID());
@ -95,6 +95,9 @@ Pipe StorageMySQL::read(
sample_block.insert({ column_data.type, column_data.name }); sample_block.insert({ column_data.type, column_data.name });
} }
if (!context_->getSettingsRef().external_storage_max_read_rows)
context_->setSetting("external_storage_max_read_rows", max_block_size);
StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false); StreamSettings mysql_input_stream_settings(context_->getSettingsRef(), true, false);
return Pipe(std::make_shared<SourceFromInputStream>( return Pipe(std::make_shared<SourceFromInputStream>(
std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings))); std::make_shared<MySQLWithFailoverBlockInputStream>(pool, query, sample_block, mysql_input_stream_settings)));

View File

@ -4,8 +4,8 @@
<default> <default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql> <allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine> <default_database_engine>Atomic</default_database_engine>
<max_read_mysql_rows>1</max_read_mysql_rows> <external_storage_max_read_rows>1</external_storage_max_read_rows>
<max_read_mysql_bytes>0</max_read_mysql_bytes> <external_storage_max_read_bytes>0</external_storage_max_read_bytes>
</default> </default>
</profiles> </profiles>

View File

@ -4,8 +4,8 @@
<default> <default>
<allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql> <allow_experimental_database_materialize_mysql>1</allow_experimental_database_materialize_mysql>
<default_database_engine>Atomic</default_database_engine> <default_database_engine>Atomic</default_database_engine>
<max_read_mysql_rows>0</max_read_mysql_rows> <external_storage_max_read_rows>0</external_storage_max_read_rows>
<max_read_mysql_bytes>1</max_read_mysql_bytes> <external_storage_max_read_bytes>1</external_storage_max_read_bytes>
</default> </default>
</profiles> </profiles>

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<max_block_size>2</max_block_size>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</yandex>

View File

@ -9,6 +9,7 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True) node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_mysql_cluster=True) node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_mysql_cluster=True)
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users.xml'], with_mysql=True)
create_table_sql_template = """ create_table_sql_template = """
CREATE TABLE `clickhouse`.`{}` ( CREATE TABLE `clickhouse`.`{}` (
@ -260,6 +261,25 @@ def test_mysql_distributed(started_cluster):
assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n') assert(result == 'host2\nhost4\n' or result == 'host3\nhost4\n')
def test_external_settings(started_cluster):
table_name = 'test_external_settings'
conn = get_mysql_conn()
create_mysql_table(conn, table_name)
node3.query('''
CREATE TABLE {}(id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL('mysql1:3306', 'clickhouse', '{}', 'root', 'clickhouse');
'''.format(table_name, table_name))
node3.query(
"INSERT INTO {}(id, name, money) select number, concat('name_', toString(number)), 3 from numbers(100) ".format(
table_name))
assert node3.query("SELECT count() FROM {}".format(table_name)).rstrip() == '100'
assert node3.query("SELECT sum(money) FROM {}".format(table_name)).rstrip() == '300'
node3.query("select value from system.settings where name = 'max_block_size' FORMAT TSV") == "2\n"
node3.query("select value from system.settings where name = 'external_storage_max_read_rows' FORMAT TSV") == "0\n"
assert node3.query("SELECT COUNT(DISTINCT blockNumber()) FROM {} FORMAT TSV".format(table_name)) == '50\n'
conn.close()
if __name__ == '__main__': if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster: with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()): for name, instance in list(cluster.instances.items()):