Merge pull request #70878 from ClickHouse/backport/24.8/70511

Backport #70511 to 24.8: Fix `StorageTableFunction::supportsReplication` creating source storage unnecessarily
This commit is contained in:
Kseniia Sumarokova 2024-10-23 12:29:56 +02:00 committed by GitHub
commit 2bf8723b0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 134 additions and 1 deletions

View File

@ -62,6 +62,7 @@ public:
/// Avoid loading nested table by returning nullptr/false for all table functions.
StoragePolicyPtr getStoragePolicy() const override { return nullptr; }
bool storesDataOnDisk() const override { return false; }
bool supportsReplication() const override { return false; }
void startup() override { }
void shutdown(bool is_drop) override

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -33,6 +33,10 @@ import pyarrow as pa
import pyarrow.parquet as pq
from deltalake.writer import write_deltalake
import helpers.client
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.s3_tools import (
prepare_s3_bucket,
upload_directory,
@ -68,10 +72,25 @@ def started_cluster():
cluster = ClickHouseCluster(__file__, with_spark=True)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/named_collections.xml"],
main_configs=[
"configs/config.d/named_collections.xml",
"configs/config.d/remote_servers.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=[
"configs/config.d/named_collections.xml",
"configs/config.d/remote_servers.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
stay_alive=True,
with_zookeeper=True,
)
logging.info("Starting cluster...")
@ -824,3 +843,100 @@ def test_complex_types(started_cluster):
f"SELECT metadata FROM deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{table_name}' , 'minio', 'minio123')"
)
)
def test_replicated_database_and_unavailable_s3(started_cluster):
node1 = started_cluster.instances["node1"]
node2 = started_cluster.instances["node2"]
DB_NAME = randomize_table_name("db")
TABLE_NAME = randomize_table_name("test_replicated_database_and_unavailable_s3")
minio_client = started_cluster.minio_client
bucket = started_cluster.minio_restricted_bucket
if not minio_client.bucket_exists(bucket):
minio_client.make_bucket(bucket)
node1.query(
f"CREATE DATABASE {DB_NAME} ENGINE=Replicated('/clickhouse/databases/{DB_NAME}', 'shard1', 'node1')"
)
node2.query(
f"CREATE DATABASE {DB_NAME} ENGINE=Replicated('/clickhouse/databases/{DB_NAME}', 'shard1', 'node2')"
)
parquet_data_path = create_initial_data_file(
started_cluster,
node1,
"SELECT number, toString(number) FROM numbers(100)",
TABLE_NAME,
)
endpoint_url = f"http://{started_cluster.minio_ip}:{started_cluster.minio_port}"
aws_access_key_id = "minio"
aws_secret_access_key = "minio123"
schema = pa.schema(
[
("id", pa.int32()),
("name", pa.string()),
]
)
data = [
pa.array([1, 2, 3], type=pa.int32()),
pa.array(["John Doe", "Jane Smith", "Jake Johnson"], type=pa.string()),
]
storage_options = {
"AWS_ENDPOINT_URL": endpoint_url,
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
path = f"s3://root/{TABLE_NAME}"
table = pa.Table.from_arrays(data, schema=schema)
write_deltalake(path, table, storage_options=storage_options)
with PartitionManager() as pm:
pm_rule_reject = {
"probability": 1,
"destination": node2.ip_address,
"source_port": started_cluster.minio_port,
"action": "REJECT --reject-with tcp-reset",
}
pm_rule_drop_all = {
"destination": node2.ip_address,
"source_port": started_cluster.minio_port,
"action": "DROP",
}
pm._add_rule(pm_rule_reject)
node1.query(
f"""
DROP TABLE IF EXISTS {DB_NAME}.{TABLE_NAME};
CREATE TABLE {DB_NAME}.{TABLE_NAME}
AS deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/root/{TABLE_NAME}' , 'minio', 'minio123')
"""
)
assert TABLE_NAME in node1.query(
f"select name from system.tables where database = '{DB_NAME}'"
)
assert TABLE_NAME in node2.query(
f"select name from system.tables where database = '{DB_NAME}'"
)
replica_path = f"/clickhouse/databases/{DB_NAME}/replicas/shard1|node2"
zk = started_cluster.get_kazoo_client("zoo1")
zk.set(replica_path + "/digest", "123456".encode())
assert "123456" in node2.query(
f"SELECT * FROM system.zookeeper WHERE path = '{replica_path}'"
)
node2.restart_clickhouse()
assert "123456" not in node2.query(
f"SELECT * FROM system.zookeeper WHERE path = '{replica_path}'"
)