mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #65636 from kirillgarbar/attaching_parts
Fix "attaching_" prefix for deduplicated parts
This commit is contained in:
commit
dbb20fc864
@ -619,8 +619,17 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
|
||||
String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : "";
|
||||
bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num).second;
|
||||
|
||||
int error = 0;
|
||||
/// Set a special error code if the block is duplicate
|
||||
int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0;
|
||||
/// And remove attaching_ prefix
|
||||
if (deduplicate && deduplicated)
|
||||
{
|
||||
error = ErrorCodes::INSERT_WAS_DEDUPLICATED;
|
||||
if (!endsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_" + part->name + "/"))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a deduplicated part: {}", part->getDataPartStorage().getRelativePath());
|
||||
fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info);
|
||||
part->renameTo(new_relative_path, false);
|
||||
}
|
||||
PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error));
|
||||
return deduplicated;
|
||||
}
|
||||
@ -880,8 +889,9 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
|
||||
}
|
||||
}
|
||||
|
||||
/// Save the current temporary path in case we need to revert the change to retry (ZK connection loss)
|
||||
/// Save the current temporary path and name in case we need to revert the change to retry (ZK connection loss) or in case part is deduplicated.
|
||||
const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory();
|
||||
const String initial_part_name = part->name;
|
||||
|
||||
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
||||
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
|
||||
@ -1050,16 +1060,6 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
|
||||
zkutil::KeeperMultiException::check(multi_code, ops, responses);
|
||||
}
|
||||
|
||||
transaction.rollback();
|
||||
|
||||
if (!Coordination::isUserError(multi_code))
|
||||
throw Exception(
|
||||
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
|
||||
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
|
||||
block_number,
|
||||
toString(block_id),
|
||||
multi_code);
|
||||
|
||||
auto failed_op_idx = zkutil::getFailedOpIndex(multi_code, responses);
|
||||
String failed_op_path = ops[failed_op_idx]->getPath();
|
||||
|
||||
@ -1069,6 +1069,11 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
|
||||
LOG_INFO(log, "Block with ID {} already exists (it was just appeared) for part {}. Ignore it.",
|
||||
toString(block_id), part->name);
|
||||
|
||||
transaction.rollbackPartsToTemporaryState();
|
||||
part->is_temp = true;
|
||||
part->setName(initial_part_name);
|
||||
part->renameTo(temporary_part_relative_path, false);
|
||||
|
||||
if constexpr (async_insert)
|
||||
{
|
||||
retry_context.conflict_block_ids = std::vector<String>({failed_op_path});
|
||||
@ -1080,6 +1085,16 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
|
||||
return CommitRetryContext::DUPLICATED_PART;
|
||||
}
|
||||
|
||||
transaction.rollback();
|
||||
|
||||
if (!Coordination::isUserError(multi_code))
|
||||
throw Exception(
|
||||
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
|
||||
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
|
||||
block_number,
|
||||
toString(block_id),
|
||||
multi_code);
|
||||
|
||||
if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx)
|
||||
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED,
|
||||
"Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE",
|
||||
|
@ -0,0 +1,88 @@
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
ch1 = cluster.add_instance(
|
||||
"ch1",
|
||||
with_zookeeper=True,
|
||||
macros={"replica": "node1"},
|
||||
stay_alive=True,
|
||||
)
|
||||
|
||||
database_name = "dedup_attach"
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def q(query):
|
||||
return ch1.query(database=database_name, sql=query)
|
||||
|
||||
|
||||
def test_deduplicated_attached_part_renamed_after_attach(started_cluster):
|
||||
ch1.query(f"CREATE DATABASE {database_name}")
|
||||
|
||||
q(
|
||||
"CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;"
|
||||
)
|
||||
q("INSERT INTO dedup VALUES (1),(2),(3);")
|
||||
|
||||
table_data_path = q(
|
||||
"SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'"
|
||||
).strip("'[]\n")
|
||||
|
||||
ch1.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
f"cp -r {table_data_path}/all_0_0_0 {table_data_path}/detached/all_0_0_0",
|
||||
]
|
||||
)
|
||||
# Part is attached as all_1_1_0
|
||||
q("ALTER TABLE dedup ATTACH PART 'all_0_0_0'")
|
||||
|
||||
assert 2 == int(
|
||||
q(
|
||||
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
|
||||
).strip()
|
||||
)
|
||||
|
||||
ch1.exec_in_container(
|
||||
[
|
||||
"bash",
|
||||
"-c",
|
||||
f"cp -r {table_data_path}/all_1_1_0 {table_data_path}/detached/all_1_1_0",
|
||||
]
|
||||
)
|
||||
# Part is deduplicated and not attached
|
||||
q("ALTER TABLE dedup ATTACH PART 'all_1_1_0'")
|
||||
|
||||
assert 2 == int(
|
||||
q(
|
||||
f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'"
|
||||
).strip()
|
||||
)
|
||||
assert 1 == int(
|
||||
q(
|
||||
f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
|
||||
).strip()
|
||||
)
|
||||
# Check that it is not 'attaching_all_1_1_0'
|
||||
assert (
|
||||
"all_1_1_0"
|
||||
== q(
|
||||
f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'"
|
||||
).strip()
|
||||
)
|
||||
|
||||
q("DROP TABLE dedup")
|
||||
q("SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/dedup_attach/dedup/s1'")
|
||||
ch1.query(f"DROP DATABASE {database_name}")
|
Loading…
Reference in New Issue
Block a user