Added tests, rewritten logic which engines and table functions to allow, added replace for create table ... AS table_function() syntax.

This commit is contained in:
Yatsishin Ilya 2024-07-22 11:33:51 +00:00
parent 478616de3d
commit 2dc264928f
8 changed files with 217 additions and 29 deletions

View File

@ -5608,3 +5608,15 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`

View File

@ -891,7 +891,8 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engine_to_null, false, "Replace all the External table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the External table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \

View File

@ -79,7 +79,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"restore_replace_external_engine_to_null", false, false, "New setting."}
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engins_to_null", false, false, "New setting."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -968,12 +968,33 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1010,34 +1031,9 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
/// For exrternal tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (create.storage->engine->name == "AzureBlobStorage" ||
create.storage->engine->name == "AzureQueue" ||
create.storage->engine->name == "COSN" ||
create.storage->engine->name == "DeltaLake" ||
create.storage->engine->name == "Dictionary" ||
create.storage->engine->name == "Executable" ||
create.storage->engine->name == "ExecutablePool" ||
create.storage->engine->name == "ExternalDistributed" ||
create.storage->engine->name == "File" ||
create.storage->engine->name == "Hudi" ||
create.storage->engine->name == "Iceberg" ||
create.storage->engine->name == "JDBC" ||
create.storage->engine->name == "Kafka" ||
create.storage->engine->name == "MaterializedPostgreSQL" ||
create.storage->engine->name == "MongoDB" ||
create.storage->engine->name == "MySQL" ||
create.storage->engine->name == "NATS" ||
create.storage->engine->name == "ODBC" ||
create.storage->engine->name == "OSS" ||
create.storage->engine->name == "PostgreSQL" ||
create.storage->engine->name == "RabbitMQ" ||
create.storage->engine->name == "Redis" ||
create.storage->engine->name == "S3" ||
create.storage->engine->name == "S3Queue" ||
create.storage->engine->name == "TinyLog" ||
create.storage->engine->name == "URL")
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (getContext()->getSettingsRef().restore_replace_external_engine_to_null)
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;

View File

@ -0,0 +1,14 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
<replica>
<host>replica3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,143 @@
import pytest
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"]
node1 = cluster.add_instance("replica1", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"])
node2 = cluster.add_instance("replica2", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"])
node3 = cluster.add_instance("replica3", with_zookeeper=True, with_mysql8=True, main_configs=configs, external_dirs=["/backups/"])
nodes = [node1, node2, node3]
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}/')"
def cleanup_nodes(nodes, dbname):
for node in nodes:
node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC")
def fill_nodes(nodes, dbname):
cleanup_nodes(nodes, dbname)
for node in nodes:
node.query(f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')")
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`")
def get_mysql_conn(cluster):
conn = pymysql.connect(
user="root", password="clickhouse", host=cluster.mysql8_ip, port=cluster.mysql8_port
)
return conn
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
cursor.execute(
"DROP DATABASE IF EXISTS clickhouse"
)
cursor.execute(
"CREATE DATABASE clickhouse"
)
cursor.execute(
"DROP TABLE IF EXISTS clickhouse.inference_table"
)
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
cursor.execute(
"INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')"
)
conn.commit()
parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})")
node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id")
node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine") == expected
assert node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function") == expected
assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n"
assert node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n"
assert node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function") == "100\n"
assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def test_restore_table(start_cluster):
fill_tables(cluster, "replicated")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated;")
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated;")
assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n"
node3.query(f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true")
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated;")
assert node1.query("SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine") == "1\t100\n"
assert node1.query("SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function") == "1\t100\n"
assert node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n"
cleanup_nodes(nodes, "replicated")
def test_restore_table_null(start_cluster):
fill_tables(cluster, "replicated2")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;")
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;")
assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n"
node3.query(f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engine_to_null=1, restore_replace_external_table_functions_to_null=1")
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2;")
assert node1.query("SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine") == "0\t0\n"
assert node1.query("SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function") == "0\t0\n"
assert node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n"
assert node1.query("SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'") == "Null\nNull\n"
assert node1.query("SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'") == "MergeTree\n"
cleanup_nodes(nodes, "replicated2")