From 2dc264928f311e2f4d10001044d070b6a6a05471 Mon Sep 17 00:00:00 2001 From: Yatsishin Ilya <2159081+qoega@users.noreply.github.com> Date: Mon, 22 Jul 2024 11:33:51 +0000 Subject: [PATCH] Added tests, rewritten logic which engines and table functions to allow, added replace for create table ... AS table_function() syntax. --- docs/en/operations/settings/settings.md | 12 ++ src/Core/Settings.h | 3 +- src/Core/SettingsChangesHistory.cpp | 3 +- src/Interpreters/InterpreterCreateQuery.cpp | 50 +++--- .../test_restore_external_engines/__init__.py | 0 .../configs/backups_disk.xml | 14 ++ .../configs/remote_servers.xml | 21 +++ .../test_restore_external_engines/test.py | 143 ++++++++++++++++++ 8 files changed, 217 insertions(+), 29 deletions(-) create mode 100644 tests/integration/test_restore_external_engines/__init__.py create mode 100644 tests/integration/test_restore_external_engines/configs/backups_disk.xml create mode 100644 tests/integration/test_restore_external_engines/configs/remote_servers.xml create mode 100644 tests/integration/test_restore_external_engines/test.py diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index c3f697c3bdc..65b8df7a9e2 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -5608,3 +5608,15 @@ Default value: `10000000`. Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached. Default value: `1GiB`. + +## restore_replace_external_engines_to_null + +For testing purposes. Replaces all external engines to Null to not initiate external connections. + +Default value: `False` + +## restore_replace_external_table_functions_to_null + +For testing purposes. Replaces all external engines to Null to not initiate external connections. + +Default value: `False` \ No newline at end of file diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7bf97896357..e6d2cac359b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -891,7 +891,8 @@ class IColumn; M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \ M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \ M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \ - M(Bool, restore_replace_external_engine_to_null, false, "Replace all the External table engines to Null on restore. Useful for testing purposes", 0) \ + M(Bool, restore_replace_external_engines_to_null, false, "Replace all the External table engines to Null on restore. Useful for testing purposes", 0) \ + M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \ \ \ /* ###################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index a23d9d17da2..0abcfb0cfb9 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,7 +79,8 @@ static std::initializer_listno_empty_args = true; storage.set(storage.engine, engine_ast); } + } void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const { + if (create.as_table_function) + { + if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null) + { + const auto & factory = TableFunctionFactory::instance(); + + auto properties = factory.tryGetProperties(create.as_table_function->as()->name); + if (properties && properties->allow_readonly) + return; + if (!create.storage) + { + auto storage_ast = std::make_shared(); + create.set(create.storage, storage_ast); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug."); + create.as_table_function = nullptr; + setNullTableEngine(*create.storage); + } return; + } if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view) return; @@ -1010,34 +1031,9 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value); /// For exrternal tables with restore_replace_external_engine_to_null setting we replace external engines to /// Null table engine. - else if (create.storage->engine->name == "AzureBlobStorage" || - create.storage->engine->name == "AzureQueue" || - create.storage->engine->name == "COSN" || - create.storage->engine->name == "DeltaLake" || - create.storage->engine->name == "Dictionary" || - create.storage->engine->name == "Executable" || - create.storage->engine->name == "ExecutablePool" || - create.storage->engine->name == "ExternalDistributed" || - create.storage->engine->name == "File" || - create.storage->engine->name == "Hudi" || - create.storage->engine->name == "Iceberg" || - create.storage->engine->name == "JDBC" || - create.storage->engine->name == "Kafka" || - create.storage->engine->name == "MaterializedPostgreSQL" || - create.storage->engine->name == "MongoDB" || - create.storage->engine->name == "MySQL" || - create.storage->engine->name == "NATS" || - create.storage->engine->name == "ODBC" || - create.storage->engine->name == "OSS" || - create.storage->engine->name == "PostgreSQL" || - create.storage->engine->name == "RabbitMQ" || - create.storage->engine->name == "Redis" || - create.storage->engine->name == "S3" || - create.storage->engine->name == "S3Queue" || - create.storage->engine->name == "TinyLog" || - create.storage->engine->name == "URL") + else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null) { - if (getContext()->getSettingsRef().restore_replace_external_engine_to_null) + if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE) setNullTableEngine(*create.storage); } return; diff --git a/tests/integration/test_restore_external_engines/__init__.py b/tests/integration/test_restore_external_engines/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_restore_external_engines/configs/backups_disk.xml b/tests/integration/test_restore_external_engines/configs/backups_disk.xml new file mode 100644 index 00000000000..f7d666c6542 --- /dev/null +++ b/tests/integration/test_restore_external_engines/configs/backups_disk.xml @@ -0,0 +1,14 @@ + + + + + local + /backups/ + + + + + backups + /backups/ + + diff --git a/tests/integration/test_restore_external_engines/configs/remote_servers.xml b/tests/integration/test_restore_external_engines/configs/remote_servers.xml new file mode 100644 index 00000000000..76ad3618339 --- /dev/null +++ b/tests/integration/test_restore_external_engines/configs/remote_servers.xml @@ -0,0 +1,21 @@ + + + + + true + + replica1 + 9000 + + + replica2 + 9000 + + + replica3 + 9000 + + + + + diff --git a/tests/integration/test_restore_external_engines/test.py b/tests/integration/test_restore_external_engines/test.py new file mode 100644 index 00000000000..cde4b0deb00 --- /dev/null +++ b/tests/integration/test_restore_external_engines/test.py @@ -0,0 +1,143 @@ +import pytest + +import pymysql.cursors +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"] + +node1 = cluster.add_instance("replica1", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"]) +node2 = cluster.add_instance("replica2", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"]) +node3 = cluster.add_instance("replica3", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"]) +nodes = [node1, node2, node3] + +backup_id_counter = 0 + +def new_backup_name(): + global backup_id_counter + backup_id_counter += 1 + return f"Disk('backups', '{backup_id_counter}/')" + +def cleanup_nodes(nodes, dbname): + for node in nodes: + node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC") + +def fill_nodes(nodes, dbname): + cleanup_nodes(nodes, dbname) + for node in nodes: + node.query(f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')") + +def drop_mysql_table(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`") + +def get_mysql_conn(cluster): + conn = pymysql.connect( + user="root", password="clickhouse", host=cluster.mysql8_ip, port=cluster.mysql8_port + ) + return conn + +def fill_tables(cluster, dbname): + fill_nodes(nodes, dbname) + + conn = get_mysql_conn(cluster) + + with conn.cursor() as cursor: + cursor.execute( + "DROP DATABASE IF EXISTS clickhouse" + ) + cursor.execute( + "CREATE DATABASE clickhouse" + ) + cursor.execute( + "DROP TABLE IF EXISTS clickhouse.inference_table" + ) + cursor.execute( + "CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)" + ) + cursor.execute( + "INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')" + ) + conn.commit() + + parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'" + + node1.query( + f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})" + ) + node1.query(f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})") + + node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id") + node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')") + + expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n" + assert node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine") == expected + assert node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function") == expected + assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n" + assert node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n" + assert node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function") == "100\n" + assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n" + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + except Exception as ex: + print(ex) + + finally: + cluster.shutdown() + +def test_restore_table(start_cluster): + fill_tables(cluster, "replicated") + backup_name = new_backup_name() + node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated;") + + node2.query(f"BACKUP DATABASE replicated TO {backup_name}") + + node2.query("DROP TABLE replicated.mysql_schema_inference_engine") + node2.query("DROP TABLE replicated.mysql_schema_inference_function") + + node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated;") + + assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n" + assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n" + + node3.query(f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true") + node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated;") + + assert node1.query("SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine") == "1\t100\n" + assert node1.query("SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function") == "1\t100\n" + assert node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n" + cleanup_nodes(nodes, "replicated") + + +def test_restore_table_null(start_cluster): + fill_tables(cluster, "replicated2") + + backup_name = new_backup_name() + node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;") + + node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}") + + node2.query("DROP TABLE replicated2.mysql_schema_inference_engine") + node2.query("DROP TABLE replicated2.mysql_schema_inference_function") + + node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;") + + assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n" + assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n" + + node3.query(f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engine_to_null=1, restore_replace_external_table_functions_to_null=1") + node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;") + + assert node1.query("SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine") == "0\t0\n" + assert node1.query("SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function") == "0\t0\n" + assert node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n" + assert node1.query("SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'") == "Null\nNull\n" + assert node1.query("SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'") == "MergeTree\n" + cleanup_nodes(nodes, "replicated2")