Allow non-replicated ALTER TABLE FETCH/ATTACH in Replicated databases

`ALTER TABLE ... FETCH` and `ALTER TABLE ... ATTACH` queries were
disabled in the Replicated database engine, because it could cause
accidental duplication of data.

This enables these queries but without replicating them.

In the case of `FETCH`, the part will only be fetched on the server
where the query is issued.

Similarly, in the case of `ATTACH`, the attached part only needs to
be available on the server where the query is issued.

If the table itself is using one of the Replicated MergeTree engines,
the attached data is then replicated by the table engine itself,
without intervention of the database engine.

This change is meant to help with live backup/restore when using the
Replicated database engine, using FREEZE for backup and ATTACH for
restore.
This commit is contained in:
Kevin Michel 2021-09-20 19:08:59 +02:00
parent 81de25d37f
commit 008c3c812b
No known key found for this signature in database
GPG Key ID: 9F95C41F2EB138FC
5 changed files with 68 additions and 2 deletions

View File

@ -35,6 +35,8 @@ The [system.clusters](../../operations/system-tables/clusters.md) system table c
When creating a new replica of the database, this replica creates tables by itself. If the replica has been unavailable for a long time and has lagged behind the replication log — it checks its local metadata with the current metadata in ZooKeeper, moves the extra tables with data to a separate non-replicated database (so as not to accidentally delete anything superfluous), creates the missing tables, updates the table names if they have been renamed. The data is replicated at the `ReplicatedMergeTree` level, i.e. if the table is not replicated, the data will not be replicated (the database is responsible only for metadata).
[`ALTER TABLE FETCH`](../../sql-reference/statements/alter/partition.md) and [`ALTER TABLE ATTACH`](../../sql-reference/statements/alter/partition.md) queries are allowed but not replicated. The database engine will only add the partition/part to the current replica. However, if the table itself uses a Replicated table engine, then the data will be replicated after using `ATTACH`.
## Usage Example {#usage-example}
Creating a cluster with three hosts:

View File

@ -55,7 +55,9 @@ BlockIO InterpreterAlterQuery::execute()
DatabasePtr database = DatabaseCatalog::instance().getDatabase(table_id.database_name);
if (typeid_cast<DatabaseReplicated *>(database.get())
&& !getContext()->getClientInfo().is_replicated_database_internal)
&& !getContext()->getClientInfo().is_replicated_database_internal
&& !alter.isAttachAlter()
&& !alter.isFetchAlter())
{
auto guard = DatabaseCatalog::instance().getDDLGuard(table_id.database_name, table_id.table_name);
guard->releaseTableLock();

View File

@ -450,6 +450,17 @@ bool ASTAlterQuery::isFreezeAlter() const
|| isOneCommandTypeOnly(ASTAlterCommand::UNFREEZE_PARTITION) || isOneCommandTypeOnly(ASTAlterCommand::UNFREEZE_ALL);
}
bool ASTAlterQuery::isAttachAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::ATTACH_PARTITION);
}
bool ASTAlterQuery::isFetchAlter() const
{
return isOneCommandTypeOnly(ASTAlterCommand::FETCH_PARTITION);
}
/** Get the text that identifies this element. */
String ASTAlterQuery::getID(char delim) const
{

View File

@ -218,6 +218,10 @@ public:
bool isFreezeAlter() const;
bool isAttachAlter() const;
bool isFetchAlter() const;
String getID(char) const override;
ASTPtr clone() const override;

View File

@ -1,3 +1,5 @@
import os
import shutil
import time
import re
import pytest
@ -16,7 +18,7 @@ snapshot_recovering_node = cluster.add_instance('snapshot_recovering_node', main
all_nodes = [main_node, dummy_node, competing_node, snapshotting_node, snapshot_recovering_node]
uuid_regex = re.compile("[0-9a-f]{8}\-[0-9a-f]{4}\-[0-9a-f]{4}\-[0-9a-f]{4}\-[0-9a-f]{12}")
uuid_regex = re.compile("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
def assert_create_query(nodes, table_name, expected):
replace_uuid = lambda x: re.sub(uuid_regex, "uuid", x)
query = "show create table {}".format(table_name)
@ -100,6 +102,51 @@ def test_simple_alter_table(started_cluster, engine):
assert_create_query([main_node, dummy_node, competing_node], name, expected)
def get_table_uuid(database, name):
return main_node.query(f"SELECT uuid FROM system.tables WHERE database = '{database}' and name = '{name}'").strip()
@pytest.fixture(scope="module", name="attachable_part")
def fixture_attachable_part(started_cluster):
main_node.query(f"CREATE DATABASE testdb_attach_atomic ENGINE = Atomic")
main_node.query(f"CREATE TABLE testdb_attach_atomic.test (CounterID UInt32) ENGINE = MergeTree ORDER BY (CounterID)")
main_node.query(f"INSERT INTO testdb_attach_atomic.test VALUES (123)")
main_node.query(f"ALTER TABLE testdb_attach_atomic.test FREEZE WITH NAME 'test_attach'")
table_uuid = get_table_uuid("testdb_attach_atomic", "test")
return os.path.join(main_node.path, f"database/shadow/test_attach/store/{table_uuid[:3]}/{table_uuid}/all_1_1_0")
@pytest.mark.parametrize("engine", ["MergeTree", "ReplicatedMergeTree"])
def test_alter_attach(started_cluster, attachable_part, engine):
name = "alter_attach_test_{}".format(engine)
main_node.query(f"CREATE TABLE testdb.{name} (CounterID UInt32) ENGINE = {engine} ORDER BY (CounterID)")
table_uuid = get_table_uuid("testdb", name)
# Provide and attach a part to the main node
shutil.copytree(
attachable_part, os.path.join(main_node.path, f"database/store/{table_uuid[:3]}/{table_uuid}/detached/all_1_1_0")
)
main_node.query(f"ALTER TABLE testdb.{name} ATTACH PART 'all_1_1_0'")
# On the main node, data is attached
assert main_node.query(f"SELECT CounterID FROM testdb.{name}") == "123\n"
# On the other node, data is replicated only if using a Replicated table engine
if engine == "ReplicatedMergeTree":
assert dummy_node.query(f"SELECT CounterID FROM testdb.{name}") == "123\n"
else:
assert dummy_node.query(f"SELECT CounterID FROM testdb.{name}") == ""
def test_alter_fetch(started_cluster):
main_node.query("CREATE TABLE testdb.fetch_source (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)")
main_node.query("CREATE TABLE testdb.fetch_target (CounterID UInt32) ENGINE = ReplicatedMergeTree ORDER BY (CounterID)")
main_node.query("INSERT INTO testdb.fetch_source VALUES (123)")
table_uuid = get_table_uuid("testdb", "fetch_source")
main_node.query(f"ALTER TABLE testdb.fetch_target FETCH PART 'all_0_0_0' FROM '/clickhouse/tables/{table_uuid}/{{shard}}' ")
detached_parts_query = "SELECT name FROM system.detached_parts WHERE database='testdb' AND table='fetch_target'"
assert main_node.query(detached_parts_query) == "all_0_0_0\n"
assert dummy_node.query(detached_parts_query) == ""
def test_alters_from_different_replicas(started_cluster):
# test_alters_from_different_replicas
competing_node.query("CREATE DATABASE IF NOT EXISTS testdb ENGINE = Replicated('/clickhouse/databases/test1', 'shard1', 'replica3');")