select and write part mutations to queue

This commit is contained in:
Alexey Zatelepin 2018-04-20 19:18:16 +03:00
parent c7fae4cd70
commit 0b784a197e
9 changed files with 246 additions and 75 deletions

View File

@ -23,7 +23,7 @@ void ActiveDataPartSet::addImpl(const String & name)
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
if (!getContainingPartImpl(part_info).empty())
if (getContainingPartImpl(part_info) != part_info_to_name.end())
return;
/// Parts contained in `part` are located contiguously in `part_info_to_name`, overlapping with the place where the part itself would be inserted.
@ -51,14 +51,28 @@ void ActiveDataPartSet::addImpl(const String & name)
}
String ActiveDataPartSet::getContainingPart(const String & part_name) const
std::optional<MergeTreePartInfo> ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const
{
std::lock_guard<std::mutex> lock(mutex);
return getContainingPartImpl(MergeTreePartInfo::fromPartName(part_name, format_version));
auto it = getContainingPartImpl(part_info);
if (it != part_info_to_name.end())
return it->first;
return {};
}
String ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) const
String ActiveDataPartSet::getContainingPart(const String & name) const
{
std::lock_guard<std::mutex> lock(mutex);
auto it = getContainingPartImpl(MergeTreePartInfo::fromPartName(name, format_version));
if (it != part_info_to_name.end())
return it->second;
return {};
}
std::map<MergeTreePartInfo, String>::const_iterator
ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) const
{
/// A part can only be covered/overlapped by the previous or next one in `part_info_to_name`.
auto it = part_info_to_name.lower_bound(part_info);
@ -66,17 +80,17 @@ String ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_i
if (it != part_info_to_name.end())
{
if (it->first.contains(part_info))
return it->second;
return it;
}
if (it != part_info_to_name.begin())
{
--it;
if (it->first.contains(part_info))
return it->second;
return it;
}
return String();
return part_info_to_name.end();
}

View File

@ -45,6 +45,8 @@ public:
void add(const String & name);
std::optional<MergeTreePartInfo> getContainingPart(const MergeTreePartInfo & part_info) const;
/// If not found, returns an empty string.
String getContainingPart(const String & name) const;
@ -62,7 +64,7 @@ private:
/// Do not block mutex.
void addImpl(const String & name);
String getContainingPartImpl(const MergeTreePartInfo & part_info) const;
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
};
}

View File

@ -38,14 +38,25 @@ struct MergeTreePartInfo
bool operator==(const MergeTreePartInfo & rhs) const
{
return !(*this < rhs || rhs < *this);
return !(*this != rhs);
}
/// Contains another part (obtained after merging another part with some other)
bool operator!=(const MergeTreePartInfo & rhs) const
{
return *this < rhs || rhs < *this;
}
/// True if contains rhs (this part is obtained after merging rhs with some other part or mutating rhs)
bool contains(const MergeTreePartInfo & rhs) const
{
return partition_id == rhs.partition_id /// Parts for different partitions are not merged
&& min_block <= rhs.min_block
if (partition_id != rhs.partition_id) /// Parts for different partitions are not merged
return false;
/// Mutated parts have the same block numbers but greater version.
if (min_block == rhs.min_block && max_block == rhs.max_block && level == rhs.level)
return version >= rhs.version;
return min_block <= rhs.min_block
&& max_block >= rhs.max_block
&& level >= rhs.level;
}

View File

@ -46,6 +46,13 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
<< new_part_name;
break;
case MUTATE_PART:
out << "mutate\n"
<< parts_to_merge.at(0) << "\n"
<< "to\n"
<< new_part_name;
break;
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
@ -124,6 +131,15 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
String source_part_name;
in >> "\n" >> source_part_name >> "\ninto\n" >> new_part_name;
}
else if (type_str == "mutate")
{
type = MUTATE_PART;
String source_part;
in >> source_part >> "\n"
>> "to\n"
>> new_part_name;
parts_to_merge.push_back(source_part);
}
in >> "\n";

View File

@ -35,6 +35,7 @@ struct ReplicatedMergeTreeLogEntryData
DROP_RANGE, /// Delete the parts in the specified partition in the specified number range.
ATTACH_PART, /// Move a part from the `detached` directory. Obsolete. TODO: Remove after half year.
CLEAR_COLUMN, /// Drop specific column from specified partition.
MUTATE_PART, /// Apply one or several mutations to the part.
};
String typeToString() const
@ -46,6 +47,7 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::DROP_RANGE: return "DROP_RANGE";
case ReplicatedMergeTreeLogEntryData::ATTACH_PART: return "ATTACH_PART";
case ReplicatedMergeTreeLogEntryData::CLEAR_COLUMN: return "CLEAR_COLUMN";
case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART";
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}

View File

@ -286,6 +286,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
/// in the queue.
/// With this we ensure that if you read the queue state Q1 and then the state of mutations M1,
/// then Q1 "happened-before" M1.
/// TODO: notify merge_selecting_event if something loaded.
updateMutations(zookeeper, nullptr);
if (index_str.empty())
@ -509,7 +510,7 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
Queue::iterator merge_entry;
for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->type == LogEntry::MERGE_PARTS)
if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART)
{
if (std::find((*it)->parts_to_merge.begin(), (*it)->parts_to_merge.end(), part_name)
!= (*it)->parts_to_merge.end())
@ -523,7 +524,7 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
if (!parts_for_merge.empty())
{
/// Move to the end of queue actions that are receiving `parts_for_merge`.
/// Move to the end of queue actions that result in one of the parts in `parts_for_merge`.
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
@ -532,7 +533,7 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
if (it0 == merge_entry)
break;
if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART)
if (((*it0)->type == LogEntry::MERGE_PARTS || (*it0)->type == LogEntry::GET_PART || (*it0)->type == LogEntry::MUTATE_PART)
&& parts_for_merge.count((*it0)->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
@ -544,7 +545,7 @@ ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsFo
}
void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
{
Queue to_wait;
size_t removed_entries = 0;
@ -555,7 +556,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
std::unique_lock<std::mutex> lock(mutex);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART) &&
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, format_version))
{
if ((*it)->currently_executing)
@ -593,7 +594,10 @@ ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearCo
{
if (elem->currently_executing && elem->znode_name != entry.znode_name)
{
if (elem->type == LogEntry::MERGE_PARTS || elem->type == LogEntry::GET_PART || elem->type == LogEntry::ATTACH_PART)
if (elem->type == LogEntry::MERGE_PARTS
|| elem->type == LogEntry::GET_PART
|| elem->type == LogEntry::MUTATE_PART
|| elem->type == LogEntry::ATTACH_PART)
{
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, format_version))
conflicts.emplace_back(elem);
@ -693,7 +697,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
MergeTreeData & data,
std::lock_guard<std::mutex> & lock) const
{
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
if (entry.type == LogEntry::MERGE_PARTS
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::ATTACH_PART
|| entry.type == LogEntry::MUTATE_PART)
{
if (!isNotCoveredByFuturePartsImpl(entry.new_part_name, out_postpone_reason, lock))
{
@ -703,9 +710,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::MERGE_PARTS)
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
{
/** If any of the required parts are now transferred or in merge process, wait for the end of this operation.
/** If any of the required parts are now fetched or in merge process, wait for the end of this operation.
* Otherwise, even if all the necessary parts for the merge are not present, you should try to make a merge.
* If any parts are missing, instead of merge, there will be an attempt to download a part.
* Such a situation is possible if the receive of a part has failed, and it was moved to the end of the queue.
@ -766,6 +773,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const MergeTreePartInfo & part_info, std::lock_guard<std::mutex> &) const
{
Int64 data_version = part_info.version ? part_info.version : part_info.min_block;
auto it = mutations_by_block_number.upper_bound(data_version);
if (it == mutations_by_block_number.begin())
return -1; /// 0 can be a valid mutation block number.
--it;
return it->first;
}
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(ReplicatedMergeTreeQueue::LogEntryPtr & entry, ReplicatedMergeTreeQueue & queue)
: entry(entry), queue(queue)
{
@ -874,8 +892,11 @@ bool ReplicatedMergeTreeQueue::processEntry(
}
bool ReplicatedMergeTreeQueue::canMergeParts(const String & left, const String & right, String * out_reason) const
bool ReplicatedMergeTreeQueue::canMergeParts(const MergeTreeDataPart & left, const MergeTreeDataPart & right, String * out_reason) const
{
if (left.name == right.name)
return false;
auto set_reason = [&out_reason] (const String & part_name)
{
if (out_reason)
@ -883,19 +904,18 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const String & left, const String &
return false;
};
auto left_info = MergeTreePartInfo::fromPartName(left, format_version);
auto right_info = MergeTreePartInfo::fromPartName(right, format_version);
std::lock_guard lock(mutex);
if (prev_virtual_parts.getContainingPart(left) != left || virtual_parts.getContainingPart(left) != left)
return set_reason(left);
if (prev_virtual_parts.getContainingPart(left.info) != left.info
|| virtual_parts.getContainingPart(left.info) != left.info)
return set_reason(left.name);
if (right != left && (prev_virtual_parts.getContainingPart(right) != right || virtual_parts.getContainingPart(right) != right))
return set_reason(right);
if (prev_virtual_parts.getContainingPart(right.info) != right.info
|| virtual_parts.getContainingPart(right.info) != right.info)
return set_reason(right.name);
Int64 left_max_block = left_info.max_block;
Int64 right_min_block = right_info.min_block;
Int64 left_max_block = left.info.max_block;
Int64 right_min_block = right.info.min_block;
if (left_max_block > right_min_block)
std::swap(left_max_block, right_min_block);
@ -907,39 +927,29 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const String & left, const String &
{
if (out_reason)
*out_reason = "Block number " + toString(*left_eph_it) + " is still being inserted between parts "
+ left + " and " + right;
+ left.name + " and " + right.name;
return false;
}
MergeTreePartInfo gap_part_info(
left_info.partition_id, left_max_block + 1, right_min_block - 1, 999999);
left.info.partition_id, left_max_block + 1, right_min_block - 1, 999999);
Strings covered = virtual_parts.getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts that are still not ready between " + left + " and " + right;
*out_reason = "There are " + toString(covered.size()) + " parts that are still not ready between " + left.name + " and " + right.name;
return false;
}
}
auto get_current_mutation_version = [&](const MergeTreePartInfo & part_info) -> Int64
{
Int64 data_version = part_info.version ? part_info.version : part_info.min_block;
auto it = mutations_by_block_number.upper_bound(data_version);
if (it == mutations_by_block_number.begin())
return 0;
--it;
return it->first;
};
Int64 left_mutation = get_current_mutation_version(left_info);
Int64 right_mutation = get_current_mutation_version(right_info);
Int64 left_mutation = getCurrentMutationVersion(left.info, lock);
Int64 right_mutation = getCurrentMutationVersion(right.info, lock);
if (left_mutation != right_mutation)
{
if (out_reason)
*out_reason = "Current mutation versions of parts " + left + " and " + right + " differ: "
*out_reason = "Current mutation versions of parts " + left.name + " and " + right.name + " differ: "
+ toString(left_mutation) + " and " + toString(right_mutation) + " respectively";
return false;
@ -948,13 +958,30 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const String & left, const String &
return true;
}
bool ReplicatedMergeTreeQueue::canMutatePart(const MergeTreePartInfo & part_info, Int64 & desired_mutation_version) const
{
std::lock_guard lock(mutex);
if (mutations.empty())
return false;
if (virtual_parts.getContainingPart(part_info) != part_info)
return false;
Int64 current_version = getCurrentMutationVersion(part_info, lock);
if (current_version >= mutations.back().block_number)
return false;
desired_mutation_version = mutations.back().block_number;
return true;
}
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
{
virtual_parts.add(part_name);
}
ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
{
std::lock_guard<std::mutex> lock(mutex);
@ -967,9 +994,11 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.mutations_in_queue = 0;
res.queue_oldest_time = 0;
res.inserts_oldest_time = 0;
res.merges_oldest_time = 0;
res.mutations_oldest_time = 0;
for (const LogEntryPtr & entry : queue)
{
@ -997,6 +1026,17 @@ ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
res.oldest_part_to_merge_to = entry->new_part_name;
}
}
if (entry->type == LogEntry::MUTATE_PART)
{
++res.mutations_in_queue;
if (entry->create_time && (!res.mutations_oldest_time || entry->create_time < res.mutations_oldest_time))
{
res.mutations_oldest_time = entry->create_time;
res.oldest_part_to_mutate_to = entry->new_part_name;
}
}
}
return res;
@ -1014,17 +1054,17 @@ void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const
}
size_t ReplicatedMergeTreeQueue::countMerges() const
size_t ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
{
size_t all_merges = 0;
size_t count = 0;
std::lock_guard<std::mutex> lock(mutex);
for (const auto & entry : queue)
if (entry->type == LogEntry::MERGE_PARTS)
++all_merges;
if (entry->type == LogEntry::MERGE_PARTS || entry->type == LogEntry::MUTATE_PART)
++count;
return all_merges;
return count;
}

View File

@ -100,17 +100,23 @@ private:
void remove(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry);
/** Can I now try this action. If not, you need to leave it in the queue and try another one.
* Called under queue_mutex.
* Called under the main mutex.
*/
bool shouldExecuteLogEntry(const LogEntry & entry, String & out_postpone_reason, MergeTreeDataMerger & merger, MergeTreeData & data,
std::lock_guard<std::mutex> &) const;
/// Return the version (block number) of the last mutation that we don't need to apply to the part
/// (either this mutation was already applied or the part was created after the mutation).
/// If there is no such mutation or it has already been executed and deleted, return -1.
/// Call under the main mutex.
Int64 getCurrentMutationVersion(const MergeTreePartInfo & part_info, std::lock_guard<std::mutex> &) const;
/** Check that part isn't in currently generating parts and isn't covered by them.
* Should be called under queue's mutex.
*/
bool isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> &) const;
/// After removing the queue element, update the insertion times in the RAM. Running under queue_mutex.
/// After removing the queue element, update the insertion times in the RAM. Running under mutex.
/// Returns information about what times have changed - this information can be passed to updateTimesInZooKeeper.
void updateTimesOnRemoval(const LogEntryPtr & entry,
std::optional<time_t> & min_unprocessed_insert_time_changed,
@ -177,7 +183,7 @@ public:
/** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM).
* And also wait for the completion of their execution, if they are now being executed.
*/
void removeGetsAndMergesInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/** Disables future merges and fetches inside entry.new_part_name
* If there are currently executing merges or fetches then throws exception.
@ -204,8 +210,12 @@ public:
*/
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Will a part in the future be merged into a larger part (or merges of parts in this range are prohibited)?
bool canMergeParts(const String & left, const String & right, String * out_reason = nullptr) const;
/// Can we merge two parts according to the queue? True if the parts are of the same mutation version,
/// there is no merge or mutation already selected for these parts
/// and there are no virtual parts or unfinished inserts between them.
bool canMergeParts(const MergeTreeDataPart & left, const MergeTreeDataPart & right, String * out_reason = nullptr) const;
bool canMutatePart(const MergeTreePartInfo & part_info, Int64 & desired_mutation_version) const;
/// Prohibit merges in the specified range.
void disableMergesInRange(const String & part_name);
@ -215,8 +225,8 @@ public:
*/
bool addFuturePartIfNotCoveredByThem(const String & part_name, const LogEntry & entry, String & reject_reason);
/// Count the number of merges in the queue.
size_t countMerges() const;
/// Count the number of merges and mutations of single parts in the queue.
size_t countMergesAndPartMutations() const;
struct Status
{
@ -224,11 +234,14 @@ public:
UInt32 queue_size;
UInt32 inserts_in_queue;
UInt32 merges_in_queue;
UInt32 mutations_in_queue;
UInt32 queue_oldest_time;
UInt32 inserts_oldest_time;
UInt32 merges_oldest_time;
UInt32 mutations_oldest_time;
String oldest_part_to_get;
String oldest_part_to_merge_to;
String oldest_part_to_mutate_to;
UInt32 last_queue_update;
};

View File

@ -1484,7 +1484,7 @@ void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTr
{
LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
queue.removeGetsAndMergesInRange(getZooKeeper(), entry.new_part_name);
queue.removePartProducingOpsInRange(getZooKeeper(), entry.new_part_name);
LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
size_t removed_parts = 0;
@ -1634,7 +1634,9 @@ void StorageReplicatedMergeTree::mutationsUpdatingThread()
{
try
{
queue.updateMutations(getZooKeeper(), mutations_updating_event);
if (queue.updateMutations(getZooKeeper(), mutations_updating_event))
merge_selecting_event.set();
mutations_updating_event->wait();
}
catch (...)
@ -1873,7 +1875,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return queue.canMergeParts(left->name, right->name)
return queue.canMergeParts(*left, *right)
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
@ -1897,27 +1899,54 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_queued = queue.countMerges();
size_t merges_and_mutations_queued = queue.countMergesAndPartMutations();
if (merges_queued >= data.settings.max_replicated_merges_in_queue)
if (merges_and_mutations_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges (" << merges_queued
LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued
<< ") is greater than max_replicated_merges_in_queue ("
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge or mutate.");
}
else
{
MergeTreeDataMerger::FuturePart future_merged_part;
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
size_t max_future_part_size = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
now = std::chrono::steady_clock::now();
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(future_merged_part, false, max_parts_size_for_merge, can_merge))
MergeTreeDataMerger::FuturePart future_merged_part;
if (max_future_part_size > 0)
{
merge_selecting_logs_pulling_is_required = true;
success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate);
MergeTreeDataMerger::FuturePart future_merged_part;
if (merger.selectPartsToMerge(future_merged_part, false, max_future_part_size, can_merge))
{
merge_selecting_logs_pulling_is_required = true;
success = createLogEntryToMergeParts(future_merged_part.parts, future_merged_part.name, deduplicate);
}
else
{
/// Choose a part to mutate.
/// TODO finish early if there are no mutations.
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
for (const auto & part : data_parts)
{
/// Estimate that the part will not change its size much.
if (part->bytes_on_disk > max_future_part_size)
continue;
Int64 desired_mutation_version;
if (!queue.canMutatePart(part->info, desired_mutation_version))
continue;
if (createLogEntryToMutatePart(*part, desired_mutation_version))
{
merge_selecting_logs_pulling_is_required = true;
success = true;
break;
}
}
}
}
}
}
@ -1994,6 +2023,48 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
}
bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version)
{
auto zookeeper = getZooKeeper();
/// If there is no information about part in ZK, we will not mutate it.
if (!zookeeper->exists(replica_path + "/parts/" + part.name))
{
if (part.modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(nullptr))
{
LOG_WARNING(log, "Part " << part.name << " (that was selected for mutation)"
<< " with age " << (time(nullptr) - part.modification_time)
<< " seconds exists locally but not in ZooKeeper."
<< " Won't mutate that part and will check it.");
enqueuePartForCheck(part.name);
}
return false;
}
MergeTreePartInfo new_part_info = part.info;
new_part_info.version = mutation_version;
/// TODO: extract common code.
String new_part_name;
if (data.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
new_part_name = new_part_info.getPartNameV0(part.getMinDate(), part.getMaxDate());
else
new_part_name = new_part_info.getPartName();
ReplicatedMergeTreeLogEntryData entry;
entry.type = LogEntry::MUTATE_PART;
entry.source_replica = replica_name;
entry.parts_to_merge.push_back(part.name);
entry.new_part_name = new_part_name;
entry.create_time = time(nullptr);
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
return true;
}
void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name, zkutil::Requests & ops)
{
String part_path = replica_path + "/parts/" + part_name;
@ -2517,7 +2588,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason)
{
return queue.canMergeParts(left->name, right->name, out_reason)
return queue.canMergeParts(*left, *right, out_reason)
&& canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason);
};

View File

@ -399,6 +399,8 @@ private:
bool deduplicate,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version);
/// Exchange parts.
/** Returns an empty string if no one has a part.