diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index c3f697c3bdc..8739414464e 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -5608,3 +5608,15 @@ Default value: `10000000`. Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached. Default value: `1GiB`. + +## restore_replace_external_engines_to_null + +For testing purposes. Replaces all external engines to Null to not initiate external connections. + +Default value: `False` + +## restore_replace_external_table_functions_to_null + +For testing purposes. Replaces all external table functions to Null to not initiate external connections. + +Default value: `False` \ No newline at end of file diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 4fc2034b855..27b71558bd3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -893,6 +893,8 @@ class IColumn; M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \ M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \ M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \ + M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \ + M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \ \ \ /* ###################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 9faf77e9087..8bea0b1eed3 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -80,7 +80,9 @@ static std::initializer_listno_empty_args = true; storage.set(storage.engine, engine_ast); } + + void setNullTableEngine(ASTStorage & storage) + { + auto engine_ast = std::make_shared(); + engine_ast->name = "Null"; + engine_ast->no_empty_args = true; + storage.set(storage.engine, engine_ast); + } + } void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const { if (create.as_table_function) + { + if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null) + { + const auto & factory = TableFunctionFactory::instance(); + + auto properties = factory.tryGetProperties(create.as_table_function->as()->name); + if (properties && properties->allow_readonly) + return; + if (!create.storage) + { + auto storage_ast = std::make_shared(); + create.set(create.storage, storage_ast); + } + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug."); + create.as_table_function = nullptr; + setNullTableEngine(*create.storage); + } return; + } if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view) return; @@ -1015,6 +1043,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const /// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one. setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value); } + /// For external tables with restore_replace_external_engine_to_null setting we replace external engines to + /// Null table engine. + else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null) + { + if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE) + setNullTableEngine(*create.storage); + } return; } diff --git a/tests/integration/test_restore_external_engines/__init__.py b/tests/integration/test_restore_external_engines/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_restore_external_engines/configs/backups_disk.xml b/tests/integration/test_restore_external_engines/configs/backups_disk.xml new file mode 100644 index 00000000000..f7d666c6542 --- /dev/null +++ b/tests/integration/test_restore_external_engines/configs/backups_disk.xml @@ -0,0 +1,14 @@ + + + + + local + /backups/ + + + + + backups + /backups/ + + diff --git a/tests/integration/test_restore_external_engines/configs/remote_servers.xml b/tests/integration/test_restore_external_engines/configs/remote_servers.xml new file mode 100644 index 00000000000..76ad3618339 --- /dev/null +++ b/tests/integration/test_restore_external_engines/configs/remote_servers.xml @@ -0,0 +1,21 @@ + + + + + true + + replica1 + 9000 + + + replica2 + 9000 + + + replica3 + 9000 + + + + + diff --git a/tests/integration/test_restore_external_engines/test.py b/tests/integration/test_restore_external_engines/test.py new file mode 100644 index 00000000000..cf189f2a6ed --- /dev/null +++ b/tests/integration/test_restore_external_engines/test.py @@ -0,0 +1,218 @@ +import pytest + +import pymysql.cursors +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"] + +node1 = cluster.add_instance( + "replica1", + with_zookeeper=True, + with_mysql8=True, + main_configs=configs, + external_dirs=["/backups/"], +) +node2 = cluster.add_instance( + "replica2", + with_zookeeper=True, + with_mysql8=True, + main_configs=configs, + external_dirs=["/backups/"], +) +node3 = cluster.add_instance( + "replica3", + with_zookeeper=True, + with_mysql8=True, + main_configs=configs, + external_dirs=["/backups/"], +) +nodes = [node1, node2, node3] + +backup_id_counter = 0 + + +def new_backup_name(): + global backup_id_counter + backup_id_counter += 1 + return f"Disk('backups', '{backup_id_counter}/')" + + +def cleanup_nodes(nodes, dbname): + for node in nodes: + node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC") + + +def fill_nodes(nodes, dbname): + cleanup_nodes(nodes, dbname) + for node in nodes: + node.query( + f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')" + ) + + +def drop_mysql_table(conn, tableName): + with conn.cursor() as cursor: + cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`") + + +def get_mysql_conn(cluster): + conn = pymysql.connect( + user="root", + password="clickhouse", + host=cluster.mysql8_ip, + port=cluster.mysql8_port, + ) + return conn + + +def fill_tables(cluster, dbname): + fill_nodes(nodes, dbname) + + conn = get_mysql_conn(cluster) + + with conn.cursor() as cursor: + cursor.execute("DROP DATABASE IF EXISTS clickhouse") + cursor.execute("CREATE DATABASE clickhouse") + cursor.execute("DROP TABLE IF EXISTS clickhouse.inference_table") + cursor.execute( + "CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)" + ) + cursor.execute( + "INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')" + ) + conn.commit() + + parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'" + + node1.query( + f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})" + ) + node1.query( + f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})" + ) + + node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id") + node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')") + + expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n" + assert ( + node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine") + == expected + ) + assert ( + node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function") + == expected + ) + assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n" + assert ( + node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n" + ) + assert ( + node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function") + == "100\n" + ) + assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n" + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + except Exception as ex: + print(ex) + + finally: + cluster.shutdown() + + +def test_restore_table(start_cluster): + fill_tables(cluster, "replicated") + backup_name = new_backup_name() + node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated") + + node2.query(f"BACKUP DATABASE replicated TO {backup_name}") + + node2.query("DROP TABLE replicated.mysql_schema_inference_engine") + node2.query("DROP TABLE replicated.mysql_schema_inference_function") + + node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated") + + assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n" + assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n" + + node3.query( + f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true" + ) + node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated") + + assert ( + node1.query( + "SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine" + ) + == "1\t100\n" + ) + assert ( + node1.query( + "SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function" + ) + == "1\t100\n" + ) + assert ( + node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n" + ) + cleanup_nodes(nodes, "replicated") + + +def test_restore_table_null(start_cluster): + fill_tables(cluster, "replicated2") + + backup_name = new_backup_name() + node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2") + + node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}") + + node2.query("DROP TABLE replicated2.mysql_schema_inference_engine") + node2.query("DROP TABLE replicated2.mysql_schema_inference_function") + + node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2") + + assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n" + assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n" + + node3.query( + f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engines_to_null=1, restore_replace_external_table_functions_to_null=1" + ) + node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2") + + assert ( + node1.query( + "SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine" + ) + == "0\t0\n" + ) + assert ( + node1.query( + "SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function" + ) + == "0\t0\n" + ) + assert ( + node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n" + ) + assert ( + node1.query( + "SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'" + ) + == "Null\nNull\n" + ) + assert ( + node1.query( + "SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'" + ) + == "MergeTree\n" + ) + cleanup_nodes(nodes, "replicated2")