Distinguish KILL MUTATION for different tables.

Before this patch KILL MUTATION marks mutation as canceled just by name
(and part numbers) so if you have multiple tables with the same part
name, then killing mutation for one table, will mark it as killed for
another too.

Fix this by comparing StorageID too (it is better to use StorageID over
database/table to avoid ambiguity by using UUIDs for comparing).

Here is a failure of the 01414_freeze_does_not_prevent_alters on CI [1].

  [1]: https://clickhouse-test-reports.s3.yandex.net/24069/9fb69dcf98c71a939d200cad3c8491bf43a44622/functional_stateless_tests_(ubsan).html#fail1
This commit is contained in:
Azat Khuzhin 2021-06-06 15:24:49 +03:00
parent 33b0429dd1
commit 1062d0ec91
6 changed files with 57 additions and 19 deletions

View File

@ -8,9 +8,8 @@
namespace DB
{
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}
, table{table_}
MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part)
: table_id{table_id_}
, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_part_path{future_part.path}
@ -60,8 +59,8 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
MergeInfo MergeListElement::getInfo() const
{
MergeInfo res;
res.database = database;
res.table = table;
res.database = table_id.getDatabaseName();
res.table = table_id.getTableName();
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.partition_id = partition_id;

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <list>
@ -54,8 +55,7 @@ struct FutureMergedMutatedPart;
struct MergeListElement : boost::noncopyable
{
const std::string database;
const std::string table;
const StorageID table_id;
std::string partition_id;
const std::string result_part_name;
@ -94,7 +94,7 @@ struct MergeListElement : boost::noncopyable
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);
MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part);
MergeInfo getInfo() const;
@ -122,12 +122,13 @@ public:
--merges_with_ttl_counter;
}
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : entries)
{
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.table_id == table_id
&& merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;

View File

@ -625,7 +625,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
if (!to_kill)
return CancellationCode::NotFound;
getContext()->getMergeList().cancelPartMutations({}, to_kill->block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id);
{
@ -817,9 +817,8 @@ bool StorageMergeTree::mergeSelectedParts(
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch;
MutableDataPartPtr new_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -964,9 +963,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder)
{
auto & future_part = merge_mutate_entry.future_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
Stopwatch stopwatch;
MutableDataPartPtr new_part;

View File

@ -1726,7 +1726,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
auto table_id = getStorageID();
/// Add merge to list
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_merged_part);
Transaction transaction(*this);
MutableDataPartPtr part;
@ -1871,9 +1871,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.updatePath(*this, reserved_space);
future_mutated_part.type = source_part->getType();
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(
table_id.database_name, table_id.table_name, future_mutated_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_mutated_part);
Stopwatch stopwatch;
@ -5934,7 +5932,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
{
const String & partition_id = pair.first;
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(partition_id, block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
return CancellationCode::CancelSent;
}

View File

@ -0,0 +1,2 @@
1 bar_100
2 bar_100

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
#
# Check that KILL MUTATION can be executed in parallel for different tables.
# For this two identical tables will be created:
# - on one table ALTER + KILL MUTATION will be executed
# - on another table only ALTER, that should be succeed
#
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data_01900_1;
drop table if exists data_01900_2;
create table data_01900_1 (k UInt64, s String) engine=MergeTree() order by k;
create table data_01900_2 (k UInt64, s String) engine=MergeTree() order by k;
insert into data_01900_1 values (1, 'hello'), (2, 'world');
insert into data_01900_2 values (1, 'hello'), (2, 'world');
"
# default finished_mutations_to_keep is 100
# so 100 mutations will be scheduled and killed later.
for i in {1..100}; do
echo "alter table data_01900_1 update s = 'foo_$i' where 1;"
done | $CLICKHOUSE_CLIENT -nm
# but these mutations should not be killed.
(
for i in {1..100}; do
echo "alter table data_01900_2 update s = 'bar_$i' where 1;"
done | $CLICKHOUSE_CLIENT -nm --mutations_sync=1
) &
$CLICKHOUSE_CLIENT --format Null -nm -q "kill mutation where table = 'data_01900_1' and database = '$CLICKHOUSE_DATABASE';"
wait
$CLICKHOUSE_CLIENT -nm -q "select * from data_01900_2"