load active abandonable locks during pullLogsToQueue (TODO: load quorum parts)

This commit is contained in:
Alexey Zatelepin 2018-05-04 23:17:10 +03:00
parent 35ecb331ba
commit 9f4377b771
3 changed files with 140 additions and 21 deletions

View File

@ -94,7 +94,7 @@ void ReplicatedMergeTreeQueue::initialize(
void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed, std::lock_guard<std::mutex> &)
{
virtual_parts.add(entry->new_part_name);
next_virtual_parts.add(entry->new_part_name);
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
@ -122,6 +122,7 @@ void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPt
{
std::lock_guard<std::mutex> lock(mutex);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
/// TODO: do something with next_virtual_parts
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
@ -259,6 +260,59 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
}
std::unordered_map<String, std::set<Int64>> ReplicatedMergeTreeQueue::loadCurrentInserts(zkutil::ZooKeeperPtr & zookeeper) const
{
std::unordered_map<String, std::set<Int64>> result;
std::unordered_set<String> abandonable_lock_holders;
for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp"))
{
if (startsWith(entry, "abandonable_lock-"))
abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry);
}
if (abandonable_lock_holders.empty())
return result;
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::vector<std::future<zkutil::ListResponse>> lock_futures;
for (const String & partition : partitions)
lock_futures.push_back(zookeeper->asyncGetChildren(zookeeper_path + "/block_numbers/" + partition));
struct BlockInfo
{
String partition;
Int64 number;
String zk_path;
std::future<zkutil::GetResponse> contents_future;
};
std::vector<BlockInfo> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = lock_futures[i].get().names;
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.
/// We won't need to check them on the next iteration.
Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
String zk_path = zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry;
block_infos.push_back(
BlockInfo{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
}
}
for (BlockInfo & block : block_infos)
{
zkutil::GetResponse resp = block.contents_future.get();
if (!resp.error && abandonable_lock_holders.count(resp.data))
result[block.partition].insert(block.number);
}
return result;
}
bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event)
{
std::lock_guard lock(pull_logs_to_queue_mutex);
@ -395,6 +449,37 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
}
}
auto new_current_inserts = loadCurrentInserts(zookeeper);
Strings new_log_entries = zookeeper->getChildren(zookeeper_path + "/log");
if (!log_entries.empty())
{
new_log_entries.erase(
std::remove_if(new_log_entries.begin(), new_log_entries.end(),
[&](const String & entry) { return entry <= log_entries.back(); }),
new_log_entries.end());
}
std::vector<std::future<zkutil::GetResponse>> new_log_entry_futures;
for (const String & entry : new_log_entries)
new_log_entry_futures.push_back(zookeeper->asyncTryGet(zookeeper_path + "/log/" + entry));
std::vector<String> new_virtual_parts;
for (auto & future : new_log_entry_futures)
{
zkutil::GetResponse res = future.get();
new_virtual_parts.emplace_back(LogEntry::parse(res.data, res.stat)->new_part_name);
}
{
std::lock_guard lock(mutex);
virtual_parts = next_virtual_parts;
current_inserts = new_current_inserts;
for (const String & new_part : new_virtual_parts)
next_virtual_parts.add(new_part);
}
return !log_entries.empty();
}
@ -884,6 +969,17 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const MergeTreeDataPart & left, con
if (left.name == right.name)
return false;
if (left.info.partition_id != right.info.partition_id)
{
/// If we end here, most likely it is a bug in the merge selector,
/// but we still can return sensible results in this case.
if (out_reason)
*out_reason = "Parts " + left.name + " and " + right.name + " belong to different partitions";
return false;
}
std::lock_guard lock(mutex);
auto set_reason = [&out_reason] (const String & part_name)
{
if (out_reason)
@ -891,14 +987,49 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const MergeTreeDataPart & left, con
return false;
};
std::lock_guard lock(mutex);
if (virtual_parts.getContainingPart(left.info) != left.info)
return set_reason(left.name);
if (virtual_parts.getContainingPart(right.info) != right.info)
return set_reason(right.name);
Int64 left_max_block = left.info.max_block;
Int64 right_min_block = right.info.min_block;
if (left_max_block > right_min_block)
std::swap(left_max_block, right_min_block);
if (left_max_block + 1 < right_min_block)
{
auto current_inserts_in_partition = current_inserts.find(left.info.partition_id);
if (current_inserts_in_partition != current_inserts.end())
{
const std::set<Int64> & ephemeral_block_numbers = current_inserts_in_partition->second;
auto left_eph_it = ephemeral_block_numbers.upper_bound(left_max_block);
if (left_eph_it != ephemeral_block_numbers.end() && *left_eph_it < right_min_block)
{
if (out_reason)
*out_reason = "Block number " + toString(*left_eph_it) + " is still being inserted between parts "
+ left.name + " and " + right.name;
return false;
}
}
MergeTreePartInfo gap_part_info(
left.info.partition_id, left_max_block + 1, right_min_block - 1, 999999);
Strings covered = next_virtual_parts.getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts that are still not ready between " + left.name + " and " + right.name;
return false;
}
}
Int64 left_mutation = getCurrentMutationVersion(left.info, lock);
Int64 right_mutation = getCurrentMutationVersion(right.info, lock);
if (left_mutation != right_mutation)

View File

@ -79,6 +79,8 @@ private:
* This set is protected by its mutex.
*/
ActiveDataPartSet virtual_parts;
std::unordered_map<String, std::set<Int64>> current_inserts;
ActiveDataPartSet next_virtual_parts;
std::list<ReplicatedMergeTreeMutationEntry> mutations;
std::unordered_map<String, std::map<Int64, const ReplicatedMergeTreeMutationEntry *>> mutations_by_partition;
@ -128,6 +130,9 @@ private:
/// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command
Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard<std::mutex> &) const;
/// Get the map: partition ID -> block numbers of inserts that are currently committing.
std::unordered_map<String, std::set<Int64>> loadCurrentInserts(zkutil::ZooKeeperPtr & zookeeper) const;
/// Marks the element of the queue as running.
class CurrentlyExecuting
{
@ -151,6 +156,7 @@ public:
ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
, virtual_parts(format_version)
, next_virtual_parts(format_version)
{
}

View File

@ -1879,24 +1879,6 @@ namespace
}
}
/// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks.
/// TODO: don't forbid merging across mutations.
const String & partition_id = left->info.partition_id;
for (Int64 number = left->info.max_block + 1; number <= right->info.min_block - 1; ++number)
{
String path1 = zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + partition_id + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
{
if (out_reason)
*out_reason = "Block " + toString(number) + " in gap between merging parts " + left->name + " and "
+ right->name + " is not abandoned";
return false;
}
}
return true;
}