mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #15537 from ClickHouse/mutation_hangs_after_replace_partition
Fix mutation may hang after REPLACE/DROP PARTITION
This commit is contained in:
commit
041015545c
@ -715,6 +715,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
for (const String & produced_part_name : queue_entry->getVirtualPartNames())
|
||||
{
|
||||
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
|
||||
|
||||
/// Oddly enough, getVirtualPartNames() may return _virtual_ part name.
|
||||
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
|
||||
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
|
||||
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
continue;
|
||||
|
||||
auto it = entry->block_numbers.find(part_info.partition_id);
|
||||
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
|
||||
mutation.parts_to_do.add(produced_part_name);
|
||||
|
@ -4053,6 +4053,7 @@ Pipe StorageReplicatedMergeTree::alterPartition(
|
||||
|
||||
|
||||
/// If new version returns ordinary name, else returns part name containing the first and last month of the month
|
||||
/// NOTE: use it in pair with getFakePartCoveringAllPartsInPartition(...)
|
||||
static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info)
|
||||
{
|
||||
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
|
||||
@ -4068,7 +4069,7 @@ static String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version,
|
||||
return part_info.getPartName();
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info)
|
||||
bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition)
|
||||
{
|
||||
/// Even if there is no data in the partition, you still need to mark the range for deletion.
|
||||
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
|
||||
@ -4091,14 +4092,21 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
|
||||
mutation_version = queue.getCurrentMutationVersion(partition_id, right);
|
||||
}
|
||||
|
||||
/// Empty partition.
|
||||
if (right == 0)
|
||||
return false;
|
||||
/// REPLACE PARTITION uses different max level and does not decrement max_block of DROP_RANGE for unknown (probably historical) reason.
|
||||
auto max_level = std::numeric_limits<decltype(part_info.level)>::max();
|
||||
if (!for_replace_partition)
|
||||
{
|
||||
max_level = MergeTreePartInfo::MAX_LEVEL;
|
||||
|
||||
--right;
|
||||
/// Empty partition.
|
||||
if (right == 0)
|
||||
return false;
|
||||
|
||||
--right;
|
||||
}
|
||||
|
||||
/// Artificial high level is chosen, to make this part "covering" all parts inside.
|
||||
part_info = MergeTreePartInfo(partition_id, left, right, MergeTreePartInfo::MAX_LEVEL, mutation_version);
|
||||
part_info = MergeTreePartInfo(partition_id, left, right, max_level, mutation_version);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -5305,11 +5313,11 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
/// Firstly, generate last block number and compute drop_range
|
||||
/// NOTE: Even if we make ATTACH PARTITION instead of REPLACE PARTITION drop_range will not be empty, it will contain a block.
|
||||
/// So, such case has special meaning, if drop_range contains only one block it means that nothing to drop.
|
||||
/// TODO why not to add normal DROP_RANGE entry to replication queue if `replace` is true?
|
||||
MergeTreePartInfo drop_range;
|
||||
drop_range.partition_id = partition_id;
|
||||
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
|
||||
drop_range.min_block = replace ? 0 : drop_range.max_block;
|
||||
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
|
||||
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
|
||||
if (!replace)
|
||||
drop_range.min_block = drop_range.max_block;
|
||||
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
@ -5388,6 +5396,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
}
|
||||
|
||||
/// We are almost ready to commit changes, remove fetches and merges from drop range
|
||||
/// FIXME it's unsafe to remove queue entries before we actually commit REPLACE_RANGE to replication log
|
||||
queue.removePartProducingOpsInRange(zookeeper, drop_range, entry);
|
||||
|
||||
/// Remove deduplication block_ids of replacing parts
|
||||
@ -5502,11 +5511,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
/// A range for log entry to remove parts from the source table (myself).
|
||||
|
||||
MergeTreePartInfo drop_range;
|
||||
drop_range.partition_id = partition_id;
|
||||
drop_range.max_block = allocateBlockNumber(partition_id, zookeeper)->getNumber();
|
||||
drop_range.min_block = 0;
|
||||
drop_range.level = std::numeric_limits<decltype(drop_range.level)>::max();
|
||||
|
||||
getFakePartCoveringAllPartsInPartition(partition_id, drop_range, true);
|
||||
String drop_range_fake_part_name = getPartNamePossiblyFake(format_version, drop_range);
|
||||
|
||||
if (drop_range.getBlocksCount() > 1)
|
||||
@ -5561,6 +5566,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
drop_range_dest.max_block = drop_range.max_block;
|
||||
drop_range_dest.min_block = drop_range.max_block;
|
||||
drop_range_dest.level = drop_range.level;
|
||||
drop_range_dest.mutation = drop_range.mutation;
|
||||
|
||||
entry.type = ReplicatedMergeTreeLogEntryData::REPLACE_RANGE;
|
||||
entry.source_replica = dest_table_storage->replica_name;
|
||||
|
@ -526,7 +526,7 @@ private:
|
||||
|
||||
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
|
||||
/// Returns false if the partition doesn't exist yet.
|
||||
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info);
|
||||
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info, bool for_replace_partition = false);
|
||||
|
||||
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
|
||||
std::unordered_set<std::string> existing_nodes_cache;
|
||||
|
@ -0,0 +1,14 @@
|
||||
1 1
|
||||
2 2
|
||||
3 3
|
||||
4 4
|
||||
mt 0 0_1_1_0 2
|
||||
rmt 0 0_0_0_0 2
|
||||
1 1
|
||||
2 2
|
||||
mt 0 0_1_1_0 2
|
||||
rmt 0 0_3_3_0 2
|
||||
0000000000 UPDATE s = concat(\'s\', toString(n)) WHERE 1 [] 0 1
|
||||
0000000001 DROP COLUMN s [] 0 1
|
||||
3
|
||||
4
|
@ -0,0 +1,39 @@
|
||||
drop table if exists mt;
|
||||
drop table if exists rmt sync;
|
||||
|
||||
create table mt (n UInt64, s String) engine = MergeTree partition by intDiv(n, 10) order by n;
|
||||
insert into mt values (3, '3'), (4, '4');
|
||||
|
||||
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
|
||||
insert into rmt values (1,'1'), (2, '2');
|
||||
|
||||
select * from rmt;
|
||||
select * from mt;
|
||||
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
|
||||
|
||||
alter table rmt update s = 's'||toString(n) where 1;
|
||||
|
||||
select * from rmt;
|
||||
alter table rmt replace partition '0' from mt;
|
||||
|
||||
select table, partition_id, name, rows from system.parts where database=currentDatabase() and table in ('mt', 'rmt') and active=1 order by table, name;
|
||||
|
||||
alter table rmt drop column s;
|
||||
|
||||
select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase() and table='rmt';
|
||||
select * from rmt;
|
||||
|
||||
drop table rmt sync;
|
||||
|
||||
set replication_alter_partitions_sync=0;
|
||||
create table rmt (n UInt64, s String) engine = ReplicatedMergeTree('/clickhouse/test_01149/rmt', 'r1') partition by intDiv(n, 10) order by n;
|
||||
insert into rmt values (1,'1'), (2, '2');
|
||||
|
||||
alter table rmt update s = 's'||toString(n) where 1;
|
||||
alter table rmt drop partition '0';
|
||||
|
||||
set replication_alter_partitions_sync=1;
|
||||
alter table rmt drop column s;
|
||||
|
||||
drop table mt;
|
||||
drop table rmt sync;
|
Loading…
Reference in New Issue
Block a user