Merge pull request #66536 from qoega/external-replace-to-null

Add settings to replace external engines to Null during create
This commit is contained in:
Alexey Milovidov 2024-07-30 09:03:35 +00:00 committed by GitHub
commit 95a5dc6f34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 305 additions and 1 deletions

View File

@ -5608,3 +5608,15 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`

View File

@ -893,6 +893,8 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \

View File

@ -80,7 +80,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -959,12 +959,40 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1015,6 +1043,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}

View File

@ -0,0 +1,14 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
<replica>
<host>replica3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,218 @@
import pytest
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"]
node1 = cluster.add_instance(
"replica1",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node2 = cluster.add_instance(
"replica2",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node3 = cluster.add_instance(
"replica3",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
nodes = [node1, node2, node3]
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}/')"
def cleanup_nodes(nodes, dbname):
for node in nodes:
node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC")
def fill_nodes(nodes, dbname):
cleanup_nodes(nodes, dbname)
for node in nodes:
node.query(
f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')"
)
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`")
def get_mysql_conn(cluster):
conn = pymysql.connect(
user="root",
password="clickhouse",
host=cluster.mysql8_ip,
port=cluster.mysql8_port,
)
return conn
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS clickhouse")
cursor.execute("CREATE DATABASE clickhouse")
cursor.execute("DROP TABLE IF EXISTS clickhouse.inference_table")
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
cursor.execute(
"INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')"
)
conn.commit()
parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})"
)
node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id")
node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine")
== expected
)
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function")
== expected
)
assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n"
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n"
)
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function")
== "100\n"
)
assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def test_restore_table(start_cluster):
fill_tables(cluster, "replicated")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine"
)
== "1\t100\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function"
)
== "1\t100\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n"
)
cleanup_nodes(nodes, "replicated")
def test_restore_table_null(start_cluster):
fill_tables(cluster, "replicated2")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engines_to_null=1, restore_replace_external_table_functions_to_null=1"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine"
)
== "0\t0\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function"
)
== "0\t0\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'"
)
== "Null\nNull\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'"
)
== "MergeTree\n"
)
cleanup_nodes(nodes, "replicated2")