better code, add test

This commit is contained in:
Alexander Tokmakov 2021-05-10 21:03:37 +03:00
parent a4a2a61eed
commit eef899ae63
8 changed files with 138 additions and 72 deletions

View File

@ -2275,7 +2275,7 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c
if (part->info.partition_id != drop_range.partition_id) if (part->info.partition_id != drop_range.partition_id)
throw Exception("Unexpected partition_id of part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("Unexpected partition_id of part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
if (part->info.min_block < drop_range.min_block) if (part->info.min_block < drop_range.min_block) /// Always false, because drop_range.min_block == 0
{ {
if (drop_range.min_block <= part->info.max_block) if (drop_range.min_block <= part->info.max_block)
{ {

View File

@ -250,7 +250,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = nullptr; prev_part = nullptr;
} }
/// Check predicate only for first part in each partition. /// Check predicate only for the first part in each range.
if (!prev_part) if (!prev_part)
{ {
/* Parts can be merged with themselves for TTL needs for example. /* Parts can be merged with themselves for TTL needs for example.
@ -267,8 +267,8 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (!can_merge_callback(*prev_part, part, nullptr)) if (!can_merge_callback(*prev_part, part, nullptr))
{ {
/// Starting new interval in the same partition /// Starting new interval in the same partition
if (!parts_ranges.back().empty()) assert(!parts_ranges.back().empty());
parts_ranges.emplace_back(); parts_ranges.emplace_back();
/// Now we have no previous part, but it affects only logging /// Now we have no previous part, but it affects only logging
prev_part = nullptr; prev_part = nullptr;
@ -292,7 +292,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id if (prev_part && part->info.partition_id == (*prev_part)->info.partition_id
&& part->info.min_block <= (*prev_part)->info.max_block) && part->info.min_block <= (*prev_part)->info.max_block)
{ {
LOG_ERROR(log, "Part {} intersects previous part {}", part->name, (*prev_part)->name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}", part->name, (*prev_part)->name);
} }
prev_part = &part; prev_part = &part;

View File

@ -926,12 +926,12 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
{ {
auto type = (*it)->type; auto type = (*it)->type;
if (((type == LogEntry::GET_PART || bool is_simple_producing_op = type == LogEntry::GET_PART ||
type == LogEntry::ATTACH_PART || type == LogEntry::ATTACH_PART ||
type == LogEntry::MERGE_PARTS || type == LogEntry::MERGE_PARTS ||
type == LogEntry::MUTATE_PART) type == LogEntry::MUTATE_PART;
&& part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version))) bool simple_op_covered = is_simple_producing_op && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version));
|| checkReplaceRangeCanBeRemoved(part_info, *it, current)) if (simple_op_covered || checkReplaceRangeCanBeRemoved(part_info, *it, current))
{ {
if ((*it)->currently_executing) if ((*it)->currently_executing)
to_wait.push_back(*it); to_wait.push_back(*it);
@ -964,50 +964,6 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
} }
size_t ReplicatedMergeTreeQueue::getConflictsCountForRange(
const MergeTreePartInfo & range, const LogEntry & entry,
String * out_description, std::lock_guard<std::mutex> & /* queue_lock */) const
{
std::vector<std::pair<String, LogEntryPtr>> conflicts;
for (const auto & future_part_elem : future_parts)
{
/// Do not check itself log entry
if (future_part_elem.second->znode_name == entry.znode_name)
continue;
if (!range.isDisjoint(MergeTreePartInfo::fromPartName(future_part_elem.first, format_version)))
{
conflicts.emplace_back(future_part_elem.first, future_part_elem.second);
continue;
}
}
if (out_description)
{
WriteBufferFromOwnString ss;
ss << "Can't execute command for range " << range.getPartName() << " (entry " << entry.znode_name << "). ";
ss << "There are " << conflicts.size() << " currently executing entries blocking it: ";
for (const auto & conflict : conflicts)
ss << conflict.second->typeToString() << " part " << conflict.first << ", ";
*out_description = ss.str();
}
return conflicts.size();
}
void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry)
{
String conflicts_description;
std::lock_guard lock(state_mutex);
if (0 != getConflictsCountForRange(range, entry, &conflicts_description, lock))
throw Exception(conflicts_description, ErrorCodes::UNFINISHED);
}
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name, bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name,
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
{ {
@ -1625,8 +1581,11 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name) void ReplicatedMergeTreeQueue::disableMergesInBlockRange(const String & part_name)
{ {
std::lock_guard lock(state_mutex); {
virtual_parts.add(part_name); std::lock_guard lock(state_mutex);
virtual_parts.add(part_name);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500)); //FIXME
} }
@ -1912,9 +1871,7 @@ bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
if (left->info.partition_id != right->info.partition_id) if (left->info.partition_id != right->info.partition_id)
{ {
if (out_reason) throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts {} and {} belong to different partitions", left->name, right->name);
*out_reason = "Parts " + left->name + " and " + right->name + " belong to different partitions";
return false;
} }
for (const MergeTreeData::DataPartPtr & part : {left, right}) for (const MergeTreeData::DataPartPtr & part : {left, right})

View File

@ -237,11 +237,6 @@ private:
std::optional<time_t> min_unprocessed_insert_time_changed, std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const; std::optional<time_t> max_processed_insert_time_changed) const;
/// Returns list of currently executing parts blocking execution a command modifying specified range
size_t getConflictsCountForRange(
const MergeTreePartInfo & range, const LogEntry & entry, String * out_description,
std::lock_guard<std::mutex> & state_lock) const;
/// Marks the element of the queue as running. /// Marks the element of the queue as running.
class CurrentlyExecuting class CurrentlyExecuting
{ {
@ -322,10 +317,6 @@ public:
*/ */
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current); void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, const ReplicatedMergeTreeLogEntryData & current);
/** Throws and exception if there are currently executing entries in the range .
*/
void checkThereAreNoConflictsInRange(const MergeTreePartInfo & range, const LogEntry & entry);
/** In the case where there are not enough parts to perform the merge in part_name /** In the case where there are not enough parts to perform the merge in part_name
* - move actions with merged parts to the end of the queue * - move actions with merged parts to the end of the queue
* (in order to download a already merged part from another replica). * (in order to download a already merged part from another replica).
@ -477,7 +468,7 @@ public:
/// Can we assign a merge this part and some other part? /// Can we assign a merge this part and some other part?
/// For example a merge of a part and itself is needed for TTL. /// For example a merge of a part and itself is needed for TTL.
/// This predicate is checked for the first part of each partitition. /// This predicate is checked for the first part of each range.
bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const; bool canMergeSinglePart(const MergeTreeData::DataPartPtr & part, String * out_reason) const;
/// Return nonempty optional of desired mutation version and alter version. /// Return nonempty optional of desired mutation version and alter version.

View File

@ -685,7 +685,7 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool
{ {
/// This predicate is checked for the first part of each partition. /// This predicate is checked for the first part of each range.
/// (left = nullptr, right = "first part of partition") /// (left = nullptr, right = "first part of partition")
if (!left) if (!left)
return !currently_merging_mutating_parts.count(right); return !currently_merging_mutating_parts.count(right);

View File

@ -6321,7 +6321,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
MergeTreePartInfo drop_range_dest; MergeTreePartInfo drop_range_dest;
drop_range_dest.partition_id = drop_range.partition_id; drop_range_dest.partition_id = drop_range.partition_id;
drop_range_dest.max_block = drop_range.max_block; drop_range_dest.max_block = drop_range.max_block;
drop_range_dest.min_block = drop_range.max_block; drop_range_dest.min_block = drop_range.max_block; //FIXME typo?
drop_range_dest.level = drop_range.level; drop_range_dest.level = drop_range.level;
drop_range_dest.mutation = drop_range.mutation; drop_range_dest.mutation = drop_range.mutation;

View File

@ -0,0 +1 @@
Replication did not hang

View File

@ -0,0 +1,117 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
declare -A engines
engines[0]="MergeTree"
engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src', toString(randConstant()))"
engines[2]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/src_' || toString(randConstant()), 'single_replica')"
for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "CREATE TABLE dst_$i (p UInt64, k UInt64, v UInt64)
ENGINE=ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/dst', '$i')
PARTITION BY p % 10 ORDER BY k" 2>&1| grep -Pv "Retrying createReplica|created by another server at the same moment, will retry" &
engine=${engines[$((i % ${#engines[@]}))]}
$CLICKHOUSE_CLIENT -q "CREATE TABLE src_$i (p UInt64, k UInt64, v UInt64) ENGINE=$engine
PARTITION BY p % 10 ORDER BY k" 2>&1| grep -Pv "Retrying createReplica|created by another server at the same moment, will retry" &
done
wait
function create_drop_thread()
{
while true; do
REPLICA=$(($RANDOM % 16))
$CLICKHOUSE_CLIENT -q "DROP TABLE src_$REPLICA;"
arr=("$@")
engine=${arr[$RANDOM % ${#arr[@]}]}
$CLICKHOUSE_CLIENT -q "CREATE TABLE src_$REPLICA (p UInt64, k UInt64, v UInt64) ENGINE=$engine PARTITION BY p % 10 ORDER BY k"
sleep 0.$RANDOM;
done
}
function insert_thread()
{
while true; do
REPLICA=$(($RANDOM % 16))
LIMIT=$(($RANDOM % 100))
$CLICKHOUSE_CLIENT -q "INSERT INTO $1_$REPLICA SELECT * FROM generateRandom('p UInt64, k UInt64, v UInt64') LIMIT $LIMIT" 2>/dev/null
done
}
function move_partition_src_dst_thread()
{
while true; do
FROM_REPLICA=$(($RANDOM % 16))
TO_REPLICA=$(($RANDOM % 16))
PARTITION=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "ALTER TABLE src_$FROM_REPLICA MOVE PARTITION $PARTITION TO TABLE dst_$TO_REPLICA" 2>/dev/null
sleep 0.$RANDOM;
done
}
function replace_partition_src_src_thread()
{
while true; do
FROM_REPLICA=$(($RANDOM % 16))
TO_REPLICA=$(($RANDOM % 16))
PARTITION=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "ALTER TABLE src_$TO_REPLICA REPLACE PARTITION $PARTITION FROM src_$FROM_REPLICA" 2>/dev/null
sleep 0.$RANDOM;
done
}
function drop_partition_thread()
{
while true; do
REPLICA=$(($RANDOM % 16))
PARTITION=$(($RANDOM % 10))
$CLICKHOUSE_CLIENT -q "ALTER TABLE dst_$TO_REPLICA DROP PARTITION $PARTITION" 2>/dev/null
sleep 0.$RANDOM;
done
}
function optimize_thread()
{
while true; do
REPLICA=$(($RANDOM % 16))
TABLE="src_"
if (( RANDOM % 2 )); then
TABLE="dst_"
fi
$CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE ${TABLE}_$REPLICA" 2>/dev/null
sleep 0.$RANDOM;
done
}
export -f create_drop_thread;
export -f insert_thread;
export -f move_partition_src_dst_thread;
export -f replace_partition_src_src_thread;
export -f drop_partition_thread;
export -f optimize_thread;
TIMEOUT=300
timeout $TIMEOUT bash -c "create_drop_thread ${engines[@]}" &
timeout $TIMEOUT bash -c 'insert_thread src' &
timeout $TIMEOUT bash -c 'insert_thread src' &
timeout $TIMEOUT bash -c 'insert_thread dst' &
timeout $TIMEOUT bash -c move_partition_src_dst_thread &
timeout $TIMEOUT bash -c replace_partition_src_src_thread &
timeout $TIMEOUT bash -c drop_partition_thread &
timeout $TIMEOUT bash -c optimize_thread &
wait
for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA dst_$i"
$CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA src_$i" 2>/dev/null # table may not exist
done
echo "Replication did not hang"
for ((i=0; i<16; i++)) do
$CLICKHOUSE_CLIENT -q "DROP TABLE dst_$i" &
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS src_$i" &
done
wait