ClickHouse/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp

2171 lines
86 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/ReplicatedMergeTreeQueue.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
2019-10-10 16:30:30 +00:00
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
2020-09-18 10:57:33 +00:00
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
2016-01-10 04:44:12 +00:00
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
2016-01-10 04:44:12 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
extern const int ABORTED;
}
2016-01-10 04:44:12 +00:00
2020-09-18 10:57:33 +00:00
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_)
: storage(storage_)
2020-09-18 10:57:33 +00:00
, merge_strategy_picker(merge_strategy_picker_)
2019-05-03 02:00:57 +00:00
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
2021-07-05 19:58:36 +00:00
, drop_ranges(format_version)
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
logger_name = storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)";
log = &Poco::Logger::get(logger_name);
}
2021-06-01 13:25:23 +00:00
void ReplicatedMergeTreeQueue::clear()
{
2021-06-01 13:25:23 +00:00
auto locks = lockQueue();
assert(future_parts.empty());
current_parts.clear();
virtual_parts.clear();
queue.clear();
inserts_by_time.clear();
mutations_by_znode.clear();
mutations_by_partition.clear();
mutation_pointer.clear();
}
2021-06-01 13:25:23 +00:00
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
2016-01-10 04:44:12 +00:00
{
std::lock_guard lock(state_mutex);
2020-03-09 02:55:28 +00:00
for (const auto & part : parts)
{
2021-06-30 12:29:09 +00:00
current_parts.add(part->name, nullptr);
virtual_parts.add(part->name, nullptr);
}
2016-01-10 04:44:12 +00:00
}
2019-09-10 11:21:59 +00:00
bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
2019-08-20 08:38:02 +00:00
{
std::lock_guard lock(state_mutex);
2020-11-12 17:36:02 +00:00
auto virtual_part_name = virtual_parts.getContainingPart(data_part->info);
return !virtual_part_name.empty() && virtual_part_name != data_part->name;
2019-08-20 08:38:02 +00:00
}
2021-06-30 15:24:51 +00:00
bool ReplicatedMergeTreeQueue::checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const
{
std::lock_guard lock(state_mutex);
2021-07-02 09:29:45 +00:00
bool found = false;
2021-06-30 19:11:00 +00:00
for (const auto & entry : queue)
2021-06-30 15:24:51 +00:00
{
2021-07-02 09:29:45 +00:00
if (entry->new_part_name == part_name && entry->source_parts.size() > source_parts.size())
2021-06-30 15:24:51 +00:00
{
2021-07-02 09:29:45 +00:00
source_parts.clear();
2021-06-30 19:11:00 +00:00
source_parts.insert(source_parts.end(), entry->source_parts.begin(), entry->source_parts.end());
2021-07-02 09:29:45 +00:00
found = true;
2021-06-30 15:24:51 +00:00
}
}
2021-07-02 09:29:45 +00:00
return found;
2021-06-30 15:24:51 +00:00
}
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
2016-01-10 04:44:12 +00:00
{
2021-05-08 10:59:55 +00:00
String queue_path = fs::path(replica_path) / "queue";
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Loading queue from {}", queue_path);
bool updated = false;
std::optional<time_t> min_unprocessed_insert_time_changed;
{
std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex);
2016-01-10 04:44:12 +00:00
/// Reset batch size on initialization to recover from possible errors of too large batch size.
current_multi_batch_size = 1;
2018-08-20 13:31:24 +00:00
std::unordered_set<String> already_loaded_paths;
{
std::lock_guard lock(state_mutex);
for (const LogEntryPtr & log_entry : queue)
already_loaded_paths.insert(log_entry->znode_name);
}
Strings children = zookeeper->getChildren(queue_path);
auto to_remove_it = std::remove_if(
2019-05-03 02:00:57 +00:00
children.begin(), children.end(), [&](const String & path)
{
return already_loaded_paths.count(path);
});
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Having {} queue entries to load, {} entries already loaded.", (to_remove_it - children.begin()), (children.end() - to_remove_it));
children.erase(to_remove_it, children.end());
std::sort(children.begin(), children.end());
2016-01-10 04:44:12 +00:00
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(children.size());
2016-01-10 04:44:12 +00:00
for (const String & child : children)
2021-05-08 10:59:55 +00:00
futures.emplace_back(child, zookeeper->asyncGet(fs::path(queue_path) / child));
for (auto & future : futures)
{
Coordination::GetResponse res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
entry->znode_name = future.first;
std::lock_guard lock(state_mutex);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
updated = true;
}
2021-05-08 10:59:55 +00:00
zookeeper->tryGet(fs::path(replica_path) / "mutation_pointer", mutation_pointer);
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
2020-09-18 10:57:33 +00:00
merge_strategy_picker.refreshState();
LOG_TRACE(log, "Loaded queue");
return updated;
2016-01-10 04:44:12 +00:00
}
void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
2020-02-01 20:33:36 +00:00
std::lock_guard<std::mutex> & state_lock)
2016-01-10 04:44:12 +00:00
{
2021-05-13 11:29:59 +00:00
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
2021-06-30 12:29:09 +00:00
virtual_parts.add(virtual_part_name, nullptr);
2021-06-23 19:24:43 +00:00
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type != LogEntry::DROP_RANGE)
addPartToMutations(virtual_part_name);
}
2018-03-28 12:48:40 +00:00
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
2021-06-23 19:24:43 +00:00
{
2018-03-28 12:48:40 +00:00
queue.push_back(entry);
2021-06-23 19:24:43 +00:00
}
2018-03-28 12:48:40 +00:00
else
2021-06-23 19:24:43 +00:00
{
2021-07-06 10:58:53 +00:00
drop_ranges.addDropRange(entry);
2021-07-06 16:51:23 +00:00
/// DROP PART remove parts, so we remove it from virtual parts to
/// preserve invariant virtual_parts = current_parts + queue
if (entry->isDropPart(format_version))
virtual_parts.removePartAndCoveredParts(*entry->getDropRange(format_version));
2021-07-05 19:58:36 +00:00
2018-03-28 12:48:40 +00:00
queue.push_front(entry);
2021-06-23 19:24:43 +00:00
}
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
inserts_by_time.insert(entry);
if (entry->create_time && (!min_unprocessed_insert_time || entry->create_time < min_unprocessed_insert_time))
{
min_unprocessed_insert_time = entry->create_time;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
}
2020-01-28 17:15:22 +00:00
if (entry->type == LogEntry::ALTER_METADATA)
2020-02-14 20:13:57 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Adding alter metadata version {} to the queue", entry->alter_version);
alter_sequence.addMetadataAlter(entry->alter_version, state_lock);
2020-02-14 20:13:57 +00:00
}
2016-01-10 04:44:12 +00:00
}
void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
2016-01-10 04:44:12 +00:00
{
std::optional<time_t> min_unprocessed_insert_time_changed;
{
std::lock_guard lock(state_mutex);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {});
}
void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
const LogEntryPtr & entry,
bool is_successful,
std::optional<time_t> & min_unprocessed_insert_time_changed,
std::optional<time_t> & max_processed_insert_time_changed,
2020-02-01 20:33:36 +00:00
std::unique_lock<std::mutex> & state_lock)
{
/// Update insert times.
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
inserts_by_time.erase(entry);
if (inserts_by_time.empty())
{
min_unprocessed_insert_time = 0;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
else if ((*inserts_by_time.begin())->create_time > min_unprocessed_insert_time)
{
min_unprocessed_insert_time = (*inserts_by_time.begin())->create_time;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
if (entry->create_time > max_processed_insert_time)
{
max_processed_insert_time = entry->create_time;
max_processed_insert_time_changed = max_processed_insert_time;
}
}
if (is_successful)
{
2020-02-19 12:50:11 +00:00
if (!entry->actual_new_part_name.empty())
{
/// We don't add bigger fetched part to current_parts because we
/// have an invariant `virtual_parts` = `current_parts` + `queue`.
///
/// But we remove covered parts from mutations, because we actually
/// have replacing part.
2020-06-16 10:34:59 +00:00
///
/// NOTE actual_new_part_name is very confusing and error-prone. This approach must be fixed.
2020-06-16 10:50:47 +00:00
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
2020-02-19 12:50:11 +00:00
}
2021-05-13 11:29:59 +00:00
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
2021-06-30 12:29:09 +00:00
current_parts.add(virtual_part_name, nullptr);
/// These parts are already covered by newer part, we don't have to
/// mutate it.
2020-06-16 10:50:47 +00:00
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
2021-06-01 13:25:23 +00:00
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
2021-07-06 16:51:23 +00:00
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
current_parts.removePartAndCoveredParts(*drop_range_part_name);
else
current_parts.remove(*drop_range_part_name);
2021-06-01 13:25:23 +00:00
virtual_parts.remove(*drop_range_part_name);
}
2020-01-28 17:15:22 +00:00
2021-07-05 19:58:36 +00:00
if (entry->type == LogEntry::DROP_RANGE)
{
2021-07-06 10:58:53 +00:00
drop_ranges.removeDropRange(entry);
2021-07-05 19:58:36 +00:00
}
2020-01-28 17:15:22 +00:00
if (entry->type == LogEntry::ALTER_METADATA)
2020-02-14 20:13:57 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Finishing metadata alter with version {}", entry->alter_version);
2020-02-17 18:07:22 +00:00
alter_sequence.finishMetadataAlter(entry->alter_version, state_lock);
2020-02-14 20:13:57 +00:00
}
}
else
{
2021-07-05 19:58:36 +00:00
if (entry->type == LogEntry::DROP_RANGE)
{
2021-07-06 10:58:53 +00:00
drop_ranges.removeDropRange(entry);
2021-07-05 19:58:36 +00:00
}
2021-05-13 11:29:59 +00:00
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
/// Because execution of the entry is unsuccessful,
/// `virtual_part_name` will never appear so we won't need to mutate
/// it.
2020-06-16 10:50:47 +00:00
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ true, /*remove_covered_parts = */ false);
}
}
}
2020-06-16 10:50:47 +00:00
void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & part_name, bool remove_part, bool remove_covered_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return;
bool some_mutations_are_probably_done = false;
2021-05-26 12:10:38 +00:00
for (auto & it : in_partition->second)
{
2021-05-26 12:10:38 +00:00
MutationStatus & status = *it.second;
2020-02-11 14:54:46 +00:00
2020-06-16 10:50:47 +00:00
if (remove_part && remove_covered_parts)
2020-06-16 10:34:59 +00:00
status.parts_to_do.removePartAndCoveredParts(part_name);
2020-06-16 10:50:47 +00:00
else if (remove_covered_parts)
2020-06-16 10:34:59 +00:00
status.parts_to_do.removePartsCoveredBy(part_name);
2020-06-16 10:50:47 +00:00
else if (remove_part)
status.parts_to_do.remove(part_name);
else
throw Exception("Called remove part from mutations, but nothing removed", ErrorCodes::LOGICAL_ERROR);
2020-06-16 10:34:59 +00:00
2020-03-09 02:55:28 +00:00
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
2020-02-05 16:30:02 +00:00
if (!status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info))
{
status.latest_failed_part.clear();
status.latest_failed_part_info = MergeTreePartInfo();
status.latest_fail_time = 0;
status.latest_fail_reason.clear();
}
}
if (some_mutations_are_probably_done)
storage.mutations_finalizing_task->schedule();
}
2020-02-05 16:30:02 +00:00
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
2021-01-25 09:51:06 +00:00
/// Do not add special virtual parts to parts_to_do
2021-06-01 13:25:23 +00:00
if (part_info.isFakeDropRangePart())
2021-01-25 09:51:06 +00:00
return;
2021-01-25 13:15:47 +00:00
2020-02-05 16:30:02 +00:00
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return;
auto from_it = in_partition->second.upper_bound(part_info.getDataVersion());
for (auto it = from_it; it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;
status.parts_to_do.add(part_name);
}
}
void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
zkutil::ZooKeeperPtr zookeeper,
std::optional<time_t> min_unprocessed_insert_time_changed,
std::optional<time_t> max_processed_insert_time_changed) const
{
/// Here there can be a race condition (with different remove at the same time)
/// because we update times in ZooKeeper with unlocked mutex, while these times may change.
/// Consider it unimportant (for a short time, ZK will have a slightly different time value).
Coordination::Requests ops;
if (min_unprocessed_insert_time_changed)
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
if (max_processed_insert_time_changed)
ops.emplace_back(zkutil::makeSetRequest(
replica_path + "/max_processed_insert_time", toString(*max_processed_insert_time_changed), -1));
if (!ops.empty())
{
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code != Coordination::Error::ZOK)
2021-06-01 13:25:23 +00:00
LOG_ERROR(log, "Couldn't set value of nodes for insert times "
"({}/min_unprocessed_insert_time, max_processed_insert_time): {}. "
"This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
}
2016-01-10 04:44:12 +00:00
}
2018-06-06 19:15:10 +00:00
void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry)
2016-01-10 04:44:12 +00:00
{
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
bool found = false;
bool need_remove_from_zk = true;
size_t queue_size = 0;
2020-05-18 08:08:55 +00:00
/// First remove from memory then from ZooKeeper
{
2019-01-02 06:44:36 +00:00
std::unique_lock lock(state_mutex);
if (entry->removed_by_other_entry)
{
need_remove_from_zk = false;
queue_size = queue.size();
}
else
{
/// Remove the job from the queue in the RAM.
/// You can not just refer to a pre-saved iterator, because someone else might be able to delete the task.
/// Why do we view the queue from the end?
/// - because the task for execution first is moved to the end of the queue, so that in case of failure it remains at the end.
for (Queue::iterator it = queue.end(); it != queue.begin();)
{
--it;
if (*it == entry)
{
found = true;
updateStateOnQueueEntryRemoval(
entry, /* is_successful = */ true,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
queue.erase(it);
queue_size = queue.size();
break;
}
}
}
}
if (!found && need_remove_from_zk)
2021-06-01 13:25:23 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size);
if (!need_remove_from_zk)
return;
2021-05-08 10:59:55 +00:00
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / entry->znode_name);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't remove {}/queue/{}: {}. This shouldn't happen often.", replica_path, entry->znode_name, Coordination::errorMessage(code));
2020-05-18 08:08:55 +00:00
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
2016-01-10 04:44:12 +00:00
}
2021-06-01 13:25:23 +00:00
bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info)
{
2021-06-01 13:25:23 +00:00
assert(part_info.level == 0);
2019-08-20 18:00:48 +00:00
std::lock_guard lock(state_mutex);
return virtual_parts.remove(part_info);
}
int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
2016-01-10 04:44:12 +00:00
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (pull_log_blocker.isCancelled())
throw Exception("Log pulling is cancelled", ErrorCodes::ABORTED);
2021-05-08 10:59:55 +00:00
String index_str = zookeeper->get(fs::path(replica_path) / "log_pointer");
UInt64 index;
2016-01-10 04:44:12 +00:00
/// The version of "/log" is modified when new entries to merge/mutate/drop appear.
Coordination::Stat stat;
2021-05-08 10:59:55 +00:00
zookeeper->get(fs::path(zookeeper_path) / "log", &stat);
2021-05-08 10:59:55 +00:00
Strings log_entries = zookeeper->getChildrenWatch(fs::path(zookeeper_path) / "log", nullptr, watch_callback);
/// We update mutations after we have loaded the list of log entries, but before we insert them
/// in the queue.
/// With this we ensure that if you read the log state L1 and then the state of mutations M1,
/// then L1 "happened-before" M1.
updateMutations(zookeeper);
if (index_str.empty())
{
/// If we do not already have a pointer to the log, put a pointer to the first entry in it.
index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-")));
2016-01-10 04:44:12 +00:00
2021-05-08 10:59:55 +00:00
zookeeper->set(fs::path(replica_path) / "log_pointer", toString(index));
}
else
{
index = parse<UInt64>(index_str);
}
2016-01-10 04:44:12 +00:00
String min_log_entry = "log-" + padIndex(index);
2016-01-10 04:44:12 +00:00
/// Multiple log entries that must be copied to the queue.
2016-01-10 04:44:12 +00:00
log_entries.erase(
2018-08-22 14:10:00 +00:00
std::remove_if(log_entries.begin(), log_entries.end(), [&min_log_entry](const String & entry) { return entry < min_log_entry; }),
log_entries.end());
2016-01-10 04:44:12 +00:00
if (!log_entries.empty())
{
std::sort(log_entries.begin(), log_entries.end());
2016-01-10 04:44:12 +00:00
2020-10-28 11:03:26 +00:00
for (size_t entry_idx = 0, num_entries = log_entries.size(); entry_idx < num_entries;)
{
auto begin = log_entries.begin() + entry_idx;
auto end = entry_idx + current_multi_batch_size >= log_entries.size()
2018-08-22 14:10:00 +00:00
? log_entries.end()
: (begin + current_multi_batch_size);
auto last = end - 1;
2020-10-28 11:03:26 +00:00
/// Increment entry_idx before batch size increase (we copied at most current_multi_batch_size entries)
entry_idx += current_multi_batch_size;
/// Increase the batch size exponentially, so it will saturate to MAX_MULTI_OPS.
if (current_multi_batch_size < MAX_MULTI_OPS)
current_multi_batch_size = std::min<size_t>(MAX_MULTI_OPS, current_multi_batch_size * 2);
String last_entry = *last;
if (!startsWith(last_entry, "log-"))
throw Exception("Error in zookeeper data: unexpected node " + last_entry + " in " + zookeeper_path + "/log",
2018-08-22 14:10:00 +00:00
ErrorCodes::UNEXPECTED_NODE_IN_ZOOKEEPER);
UInt64 last_entry_index = parse<UInt64>(last_entry.substr(strlen("log-")));
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Pulling {} entries to queue: {} - {}", (end - begin), *begin, *last);
zkutil::AsyncResponses<Coordination::GetResponse> futures;
futures.reserve(end - begin);
for (auto it = begin; it != end; ++it)
2021-05-08 10:59:55 +00:00
futures.emplace_back(*it, zookeeper->asyncGet(fs::path(zookeeper_path) / "log" / *it));
/// Simultaneously add all new entries to the queue and move the pointer to the log.
Coordination::Requests ops;
std::vector<LogEntryPtr> copied_entries;
copied_entries.reserve(end - begin);
std::optional<time_t> min_unprocessed_insert_time_changed;
for (auto & future : futures)
{
Coordination::GetResponse res = future.second.get();
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
ops.emplace_back(zkutil::makeCreateRequest(
2021-05-08 10:59:55 +00:00
fs::path(replica_path) / "queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
const auto & entry = *copied_entries.back();
if (entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
{
std::lock_guard state_lock(state_mutex);
if (entry.create_time && (!min_unprocessed_insert_time || entry.create_time < min_unprocessed_insert_time))
{
min_unprocessed_insert_time = entry.create_time;
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
}
}
ops.emplace_back(zkutil::makeSetRequest(
2021-05-08 10:59:55 +00:00
fs::path(replica_path) / "log_pointer", toString(last_entry_index + 1), -1));
if (min_unprocessed_insert_time_changed)
ops.emplace_back(zkutil::makeSetRequest(
2021-05-08 10:59:55 +00:00
fs::path(replica_path) / "min_unprocessed_insert_time", toString(*min_unprocessed_insert_time_changed), -1));
auto responses = zookeeper->multi(ops);
/// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.
try
{
std::lock_guard state_lock(state_mutex);
for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx)
{
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created;
copied_entries[copied_entry_idx]->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
std::optional<time_t> unused = false;
insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock);
}
last_queue_update = time(nullptr);
}
catch (...)
{
2020-01-15 13:00:08 +00:00
tryLogCurrentException(log);
/// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves.
/// This is possible only if there is an unknown logical error.
std::terminate();
}
if (!copied_entries.empty())
2020-09-18 10:57:33 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size());
2020-11-03 15:58:17 +00:00
merge_strategy_picker.refreshState();
2020-09-18 10:57:33 +00:00
}
}
2020-10-15 07:43:50 +00:00
storage.background_executor.triggerTask();
}
return stat.version;
2016-01-10 04:44:12 +00:00
}
2020-02-17 12:47:34 +00:00
namespace
{
2020-01-15 13:00:08 +00:00
Names getPartNamesToMutate(
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts)
{
Names result;
for (const auto & pair : mutation.block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
/// Note that we cannot simply count all parts to mutate using getPartsCoveredBy(appropriate part_info)
/// because they are not consecutive in `parts`.
MergeTreePartInfo covering_part_info(
partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info))
{
auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion());
if (part_info.getDataVersion() < block_num)
result.push_back(covered_part_name);
}
}
return result;
}
2020-01-15 13:00:08 +00:00
}
void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
{
std::lock_guard lock(update_mutations_mutex);
2021-05-08 10:59:55 +00:00
Strings entries_in_zk = zookeeper->getChildrenWatch(fs::path(zookeeper_path) / "mutations", nullptr, watch_callback);
StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end());
/// Compare with the local state, delete obsolete entries and determine which new entries to load.
Strings entries_to_load;
bool some_active_mutations_were_killed = false;
{
std::lock_guard state_lock(state_mutex);
2018-11-24 01:48:06 +00:00
for (auto it = mutations_by_znode.begin(); it != mutations_by_znode.end();)
{
const ReplicatedMergeTreeMutationEntry & entry = *it->second.entry;
if (!entries_in_zk_set.count(entry.znode_name))
{
if (!it->second.is_done)
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removing killed mutation {} from local state.", entry.znode_name);
some_active_mutations_were_killed = true;
if (entry.isAlterMutation())
{
LOG_DEBUG(log, "Removed alter {} because mutation {} were killed.", entry.alter_version, entry.znode_name);
alter_sequence.finishDataAlter(entry.alter_version, state_lock);
}
}
else
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removing obsolete mutation {} from local state.", entry.znode_name);
for (const auto & partition_and_block_num : entry.block_numbers)
{
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
in_partition.erase(partition_and_block_num.second);
if (in_partition.empty())
mutations_by_partition.erase(partition_and_block_num.first);
}
it = mutations_by_znode.erase(it);
}
else
++it;
}
for (const String & znode : entries_in_zk_set)
{
if (!mutations_by_znode.count(znode))
entries_to_load.push_back(znode);
}
}
2020-10-14 07:22:48 +00:00
if (some_active_mutations_were_killed)
2020-10-15 07:43:50 +00:00
storage.background_executor.triggerTask();
if (!entries_to_load.empty())
{
2020-05-23 22:24:01 +00:00
LOG_INFO(log, "Loading {} mutation entries: {} - {}", toString(entries_to_load.size()), entries_to_load.front(), entries_to_load.back());
std::vector<std::future<Coordination::GetResponse>> futures;
for (const String & entry : entries_to_load)
2021-05-08 10:59:55 +00:00
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path) / "mutations" / entry));
2018-06-07 11:00:43 +00:00
std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations;
for (size_t i = 0; i < entries_to_load.size(); ++i)
{
2018-06-07 11:00:43 +00:00
new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>(
ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])));
}
bool some_mutations_are_probably_done = false;
{
std::lock_guard state_lock(state_mutex);
2018-06-07 11:00:43 +00:00
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
2020-02-05 16:30:02 +00:00
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
for (const auto & pair : entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation);
LOG_TRACE(log, "Adding mutation {} for partition {} for all block numbers less than {}", entry->znode_name, partition_id, block_num);
}
/// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`.
2020-02-05 16:30:02 +00:00
Strings current_parts_to_mutate = getPartNamesToMutate(*entry, current_parts);
for (const String & current_part_to_mutate : current_parts_to_mutate)
2021-01-22 16:07:19 +00:00
{
assert(MergeTreePartInfo::fromPartName(current_part_to_mutate, format_version).level < MergeTreePartInfo::MAX_LEVEL);
2020-02-05 16:30:02 +00:00
mutation.parts_to_do.add(current_part_to_mutate);
2021-01-22 16:07:19 +00:00
}
/// And next we would need to mutate all parts with getDataVersion() greater than
/// mutation block number that would appear as a result of executing the queue.
for (const auto & queue_entry : queue)
{
2021-05-13 11:29:59 +00:00
for (const String & produced_part_name : queue_entry->getVirtualPartNames(format_version))
{
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
2020-10-05 19:16:28 +00:00
/// Oddly enough, getVirtualPartNames() may return _virtual_ part name.
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
2021-06-01 13:25:23 +00:00
if (part_info.isFakeDropRangePart())
2020-10-05 19:16:28 +00:00
continue;
auto it = entry->block_numbers.find(part_info.partition_id);
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
2020-02-05 16:30:02 +00:00
mutation.parts_to_do.add(produced_part_name);
}
}
2020-03-09 02:55:28 +00:00
if (mutation.parts_to_do.size() == 0)
2020-02-14 20:13:57 +00:00
{
some_mutations_are_probably_done = true;
2020-02-14 20:13:57 +00:00
}
2020-02-10 16:55:09 +00:00
2020-02-14 20:13:57 +00:00
/// otherwise it's already done
2020-02-17 16:33:05 +00:00
if (entry->isAlterMutation() && entry->znode_name > mutation_pointer)
2020-02-14 20:13:57 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Adding mutation {} with alter version {} to the queue", entry->znode_name, entry->alter_version);
2020-02-17 18:07:22 +00:00
alter_sequence.addMutationForAlter(entry->alter_version, state_lock);
2020-02-14 20:13:57 +00:00
}
}
}
storage.merge_selecting_task->schedule();
if (some_mutations_are_probably_done)
storage.mutations_finalizing_task->schedule();
}
}
ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
zkutil::ZooKeeperPtr zookeeper, const String & mutation_id)
{
std::lock_guard lock(update_mutations_mutex);
2021-05-08 10:59:55 +00:00
auto rc = zookeeper->tryRemove(fs::path(zookeeper_path) / "mutations" / mutation_id);
if (rc == Coordination::Error::ZOK)
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removed mutation {} from ZooKeeper.", mutation_id);
ReplicatedMergeTreeMutationEntryPtr entry;
bool mutation_was_active = false;
{
std::lock_guard state_lock(state_mutex);
auto it = mutations_by_znode.find(mutation_id);
if (it == mutations_by_znode.end())
return nullptr;
mutation_was_active = !it->second.is_done;
entry = it->second.entry;
for (const auto & partition_and_block_num : entry->block_numbers)
{
auto & in_partition = mutations_by_partition[partition_and_block_num.first];
in_partition.erase(partition_and_block_num.second);
if (in_partition.empty())
mutations_by_partition.erase(partition_and_block_num.first);
}
2020-02-17 16:33:05 +00:00
if (entry->isAlterMutation())
2020-02-17 13:20:09 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removed alter {} because mutation {} were killed.", entry->alter_version, entry->znode_name);
2020-02-17 18:07:22 +00:00
alter_sequence.finishDataAlter(entry->alter_version, state_lock);
2020-02-17 13:20:09 +00:00
}
mutations_by_znode.erase(it);
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
2020-10-14 07:22:48 +00:00
if (mutation_was_active)
2020-10-15 07:43:50 +00:00
storage.background_executor.triggerTask();
return entry;
}
2016-01-10 04:44:12 +00:00
ReplicatedMergeTreeQueue::StringSet ReplicatedMergeTreeQueue::moveSiblingPartsForMergeToEndOfQueue(const String & part_name)
{
std::lock_guard lock(state_mutex);
/// Let's find the action to merge this part with others. Let's remember others.
StringSet parts_for_merge;
2019-01-09 15:44:20 +00:00
Queue::iterator merge_entry = queue.end();
for (Queue::iterator it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->type == LogEntry::MERGE_PARTS || (*it)->type == LogEntry::MUTATE_PART)
{
if (std::find((*it)->source_parts.begin(), (*it)->source_parts.end(), part_name)
!= (*it)->source_parts.end())
{
parts_for_merge = StringSet((*it)->source_parts.begin(), (*it)->source_parts.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Move to the end of queue actions that result in one of the parts in `parts_for_merge`.
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
const auto t = (*it0)->type;
if ((t == LogEntry::MERGE_PARTS ||
t == LogEntry::GET_PART ||
t == LogEntry::ATTACH_PART ||
t == LogEntry::MUTATE_PART)
&& parts_for_merge.count((*it0)->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
}
return parts_for_merge;
2016-01-10 04:44:12 +00:00
}
bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const
{
if (entry_ptr->type != LogEntry::REPLACE_RANGE)
return false;
assert(entry_ptr->replace_range_entry);
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE)
return false;
if (entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves
return false;
2021-06-24 08:16:54 +00:00
if (!part_info.contains(MergeTreePartInfo::fromPartName(entry_ptr->replace_range_entry->drop_range_part_name, format_version)))
return false;
size_t number_of_covered_parts = 0;
for (const String & new_part_name : entry_ptr->replace_range_entry->new_part_names)
{
if (part_info.contains(MergeTreePartInfo::fromPartName(new_part_name, format_version)))
++number_of_covered_parts;
}
2016-01-10 04:44:12 +00:00
/// It must either cover all new parts from REPLACE_RANGE or no one. Otherwise it's a bug in replication,
/// which may lead to intersecting entries.
assert(number_of_covered_parts == 0 || number_of_covered_parts == entry_ptr->replace_range_entry->new_part_names.size());
return number_of_covered_parts == entry_ptr->replace_range_entry->new_part_names.size();
}
2016-01-10 04:44:12 +00:00
2019-08-16 15:57:19 +00:00
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
zkutil::ZooKeeperPtr zookeeper,
const MergeTreePartInfo & part_info,
const ReplicatedMergeTreeLogEntryData & current)
2016-01-10 04:44:12 +00:00
{
/// TODO is it possible to simplify it?
Queue to_wait;
size_t removed_entries = 0;
std::optional<time_t> min_unprocessed_insert_time_changed;
std::optional<time_t> max_processed_insert_time_changed;
/// Remove operations with parts, contained in the range to be deleted, from the queue.
2019-01-02 06:44:36 +00:00
std::unique_lock lock(state_mutex);
[[maybe_unused]] bool called_from_alter_query_directly = current.replace_range_entry && current.replace_range_entry->columns_version < 0;
assert(currently_executing_drop_or_replace_range || called_from_alter_query_directly);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
auto type = (*it)->type;
2021-05-10 18:03:37 +00:00
bool is_simple_producing_op = type == LogEntry::GET_PART ||
type == LogEntry::ATTACH_PART ||
type == LogEntry::MERGE_PARTS ||
type == LogEntry::MUTATE_PART;
bool simple_op_covered = is_simple_producing_op && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version));
if (simple_op_covered || checkReplaceRangeCanBeRemoved(part_info, *it, current))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
2021-05-08 10:59:55 +00:00
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / (*it)->znode_name);
if (code != Coordination::Error::ZOK)
2021-05-08 10:59:55 +00:00
LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / (*it)->znode_name).string(), Coordination::errorMessage(code));
updateStateOnQueueEntryRemoval(
*it, /* is_successful = */ false,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
(*it)->removed_by_other_entry = true;
queue.erase(it++);
++removed_entries;
}
else
++it;
}
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Removed {} entries from queue. Waiting for {} entries that are currently executing.", removed_entries, to_wait.size());
/// Let's wait for the operations with the parts contained in the range to be deleted.
for (LogEntryPtr & entry : to_wait)
entry->execution_complete.wait(lock, [&entry] { return !entry->currently_executing; });
2016-01-10 04:44:12 +00:00
}
2020-10-30 12:41:39 +00:00
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name,
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
{
/// Let's check if the same part is now being created by another action.
if (future_parts.count(new_part_name))
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} for part {} "
"because another log entry for the same part is being processed. This shouldn't happen often.";
LOG_INFO(log, format_str, log_entry_name, new_part_name);
out_reason = fmt::format(format_str, log_entry_name, new_part_name);
return false;
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
* and queue element will be processed.
* Immediately in the `executeLogEntry` function it will be found that we already have a part,
* and queue element will be immediately treated as processed.
*/
}
/// A more complex check is whether another part is currently created by other action that will cover this part.
/// NOTE The above is redundant, but left for a more convenient message in the log.
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It can slow down when the size of `future_parts` is large. But it can not be large, since background pool is limited.
for (const auto & future_part_elem : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);
if (future_part.contains(result_part))
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} for part {} "
"because it is covered by part {} that is currently executing.";
LOG_TRACE(log, format_str, log_entry_name, new_part_name, future_part_elem.first);
out_reason = fmt::format(format_str, log_entry_name, new_part_name, future_part_elem.first);
return false;
}
}
return true;
}
bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason)
{
std::lock_guard lock(state_mutex);
2021-07-06 10:58:53 +00:00
/// FIXME get rid of actual_part_name.
/// If new covering part jumps over DROP_RANGE we should execute drop range first
2021-07-06 08:36:39 +00:00
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
return false;
2020-10-30 12:41:39 +00:00
if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
return true;
}
return false;
}
bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
const LogEntry & entry,
String & out_postpone_reason,
MergeTreeDataMergerMutator & merger_mutator,
2018-03-03 16:26:06 +00:00
MergeTreeData & data,
2020-02-01 20:33:36 +00:00
std::lock_guard<std::mutex> & state_lock) const
2016-01-10 04:44:12 +00:00
{
2020-08-08 00:47:03 +00:00
/// If our entry produce part which is already covered by
2020-02-17 16:33:05 +00:00
/// some other entry which is currently executing, then we can postpone this entry.
for (const String & new_part_name : entry.getVirtualPartNames(format_version))
{
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
}
2021-07-05 19:58:36 +00:00
if (entry.type != LogEntry::DROP_RANGE && drop_ranges.isAffectedByDropRange(entry, out_postpone_reason))
return false;
2020-11-09 09:14:20 +00:00
/// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
&& !storage.canExecuteFetch(entry, out_postpone_reason))
2020-10-26 11:02:47 +00:00
{
/// Don't print log message about this, because we can have a lot of fetches,
/// for example during replica recovery.
return false;
2020-10-26 11:02:47 +00:00
}
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
{
/** If any of the required parts are now fetched or in merge process, wait for the end of this operation.
* Otherwise, even if all the necessary parts for the merge are not present, you should try to make a merge.
* If any parts are missing, instead of merge, there will be an attempt to download a part.
* Such a situation is possible if the receive of a part has failed, and it was moved to the end of the queue.
*/
size_t sum_parts_size_in_bytes = 0;
for (const auto & name : entry.source_parts)
{
if (future_parts.count(name))
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} of type {} for part {} "
"because part {} is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
/// Copy-paste of above because we need structured logging (instead of already formatted message).
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
return false;
}
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
if (part)
{
2020-06-05 20:47:46 +00:00
if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
}
}
2019-08-01 15:36:12 +00:00
if (merger_mutator.merges_blocker.isCancelled())
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
}
2020-11-03 15:58:17 +00:00
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
2020-09-18 10:57:33 +00:00
{
2020-11-03 15:58:17 +00:00
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
return false;
}
2020-09-18 10:57:33 +00:00
}
UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge()
: merger_mutator.getMaxSourcePartSizeForMutation();
/** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed),
* then ignore value returned by getMaxSourcePartsSizeForMerge() and execute merge of any size,
* because it may be ordered by OPTIMIZE or early with different settings.
* Setting max_bytes_to_merge_at_max_space_in_pool still working for regular merges,
* because the leader replica does not assign merges of greater size (except OPTIMIZE PARTITION and OPTIMIZE FINAL).
*/
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
2020-09-04 10:08:09 +00:00
bool ignore_max_size = false;
if (entry.type == LogEntry::MERGE_PARTS)
{
ignore_max_size = max_source_parts_size == data_settings->max_bytes_to_merge_at_max_space_in_pool;
if (isTTLMergeType(entry.merge_type))
{
if (merger_mutator.ttl_merges_blocker.isCancelled())
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
LOG_DEBUG(log, format_str,
entry.znode_name, entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.new_part_name);
2020-09-04 10:08:09 +00:00
return false;
}
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
{
2020-10-30 12:41:39 +00:00
const char * format_str = "Not executing log entry {} for part {}"
2020-09-04 10:08:09 +00:00
" because {} merges with TTL already executing, maximum {}.";
2020-11-30 08:44:01 +00:00
LOG_DEBUG(log, format_str,
entry.znode_name, entry.new_part_name, total_merges_with_ttl,
2020-09-04 10:08:09 +00:00
data_settings->max_number_of_merges_with_ttl_in_pool);
2020-11-30 08:44:01 +00:00
out_postpone_reason = fmt::format(format_str,
entry.znode_name, entry.new_part_name, total_merges_with_ttl,
2020-09-04 10:08:09 +00:00
data_settings->max_number_of_merges_with_ttl_in_pool);
return false;
}
}
}
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
{
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
size_t thread_pool_size = data.getContext()->getSettingsRef().background_pool_size;
size_t free_threads = thread_pool_size - busy_threads_in_pool;
size_t required_threads = data_settings->number_of_free_entries_in_pool_to_execute_mutation;
out_postpone_reason = fmt::format("Not executing log entry {} of type {} for part {}"
" because source parts size ({}) is greater than the current maximum ({})."
" {} free of {} threads, required {} free threads.",
entry.znode_name, entry.typeToString(), entry.new_part_name,
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size),
free_threads, thread_pool_size, required_threads);
LOG_DEBUG(log, out_postpone_reason);
2020-05-30 21:35:52 +00:00
return false;
}
}
2020-02-17 16:33:05 +00:00
/// Alters must be executed one by one. First metadata change, and after that data alter (MUTATE_PART entries with).
/// corresponding alter_version.
2020-01-28 17:15:22 +00:00
if (entry.type == LogEntry::ALTER_METADATA)
2020-02-13 11:38:04 +00:00
{
2020-02-17 18:07:22 +00:00
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
2020-02-05 11:18:11 +00:00
{
2020-02-17 18:07:22 +00:00
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
2020-10-30 12:41:39 +00:00
const char * format_str = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
2020-10-30 15:47:58 +00:00
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
2020-02-05 11:18:11 +00:00
return false;
}
2020-01-31 12:25:31 +00:00
}
2020-02-17 16:33:05 +00:00
/// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one
if (entry.isAlterMutation())
2020-01-31 12:25:31 +00:00
{
2020-02-17 18:07:22 +00:00
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
2020-02-05 11:18:11 +00:00
{
2020-02-17 18:07:22 +00:00
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
2020-02-13 14:48:38 +00:00
if (head_alter == entry.alter_version)
2020-10-30 12:41:39 +00:00
{
const char * format_str = "Cannot execute alter data {} with version {} because metadata still not altered";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version);
}
2020-02-13 14:48:38 +00:00
else
2020-10-30 12:41:39 +00:00
{
const char * format_str = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
}
2020-02-13 14:48:38 +00:00
2020-02-05 11:18:11 +00:00
return false;
}
2020-01-13 16:39:20 +00:00
}
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::REPLACE_RANGE)
{
/// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (currently_executing_drop_or_replace_range)
2020-10-30 12:41:39 +00:00
{
const char * format_str = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.";
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
return false;
2020-10-30 12:41:39 +00:00
}
}
return true;
2016-01-10 04:44:12 +00:00
}
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl(
const String & partition_id, Int64 data_version, std::lock_guard<std::mutex> & /* state_lock */) const
{
auto in_partition = mutations_by_partition.find(partition_id);
if (in_partition == mutations_by_partition.end())
return 0;
auto it = in_partition->second.upper_bound(data_version);
if (it == in_partition->second.begin())
return 0;
--it;
return it->first;
}
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const String & partition_id, Int64 data_version) const
{
std::lock_guard lock(state_mutex);
return getCurrentMutationVersionImpl(partition_id, data_version, lock);
}
2019-08-03 11:02:40 +00:00
ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_)
: entry(entry_), queue(queue_)
{
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
{
assert(!queue.currently_executing_drop_or_replace_range);
queue.currently_executing_drop_or_replace_range = true;
}
entry->currently_executing = true;
++entry->num_tries;
entry->last_attempt_time = time(nullptr);
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{
if (!queue.future_parts.emplace(new_part_name, entry).second)
2021-06-01 13:25:23 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
new_part_name, entry->znode_name, entry->toString());
}
}
void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedMergeTreeQueue::LogEntry & entry,
const String & actual_part_name, ReplicatedMergeTreeQueue & queue)
{
if (!entry.actual_new_part_name.empty())
throw Exception("Entry actual part isn't empty yet. This is a bug.", ErrorCodes::LOGICAL_ERROR);
entry.actual_new_part_name = actual_part_name;
/// Check if it is the same (and already added) part.
if (entry.actual_new_part_name == entry.new_part_name)
return;
if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
2021-06-01 13:25:23 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
entry.actual_new_part_name, entry.znode_name, entry.toString());
}
ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
{
std::lock_guard lock(queue.state_mutex);
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
{
assert(queue.currently_executing_drop_or_replace_range);
queue.currently_executing_drop_or_replace_range = false;
}
entry->currently_executing = false;
entry->execution_complete.notify_all();
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{
if (!queue.future_parts.erase(new_part_name))
2021-06-01 13:25:23 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
2021-06-01 13:25:23 +00:00
assert(false);
}
}
if (!entry->actual_new_part_name.empty())
{
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
2021-06-01 13:25:23 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
2021-06-01 13:25:23 +00:00
assert(false);
}
entry->actual_new_part_name.clear();
}
}
2020-10-23 08:54:00 +00:00
ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data)
2016-01-10 04:44:12 +00:00
{
LogEntryPtr entry;
2019-01-02 06:44:36 +00:00
std::lock_guard lock(state_mutex);
for (auto it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->currently_executing)
continue;
if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock))
{
entry = *it;
2020-02-17 18:07:22 +00:00
/// We gave a chance for the entry, move it to the tail of the queue, after that
/// we move it to the end of the queue.
queue.splice(queue.end(), queue, it);
break;
}
else
{
++(*it)->num_postponed;
(*it)->last_postpone_time = time(nullptr);
}
}
if (entry)
2020-10-23 08:54:00 +00:00
return std::make_shared<SelectedEntry>(entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) });
else
return {};
2016-01-10 04:44:12 +00:00
}
bool ReplicatedMergeTreeQueue::processEntry(
std::function<zkutil::ZooKeeperPtr()> get_zookeeper,
LogEntryPtr & entry,
const std::function<bool(LogEntryPtr &)> func)
2016-01-10 04:44:12 +00:00
{
std::exception_ptr saved_exception;
try
{
2020-02-17 18:07:22 +00:00
/// We don't have any backoff for failed entries
2020-08-08 00:47:03 +00:00
/// we just count amount of tries for each of them.
if (func(entry))
2018-06-06 19:15:10 +00:00
removeProcessedEntry(get_zookeeper(), entry);
}
catch (...)
{
saved_exception = std::current_exception();
}
if (saved_exception)
{
std::lock_guard lock(state_mutex);
entry->exception = saved_exception;
if (entry->type == ReplicatedMergeTreeLogEntryData::MUTATE_PART)
{
/// Record the exception in the system.mutations table.
Int64 result_data_version = MergeTreePartInfo::fromPartName(entry->new_part_name, format_version)
.getDataVersion();
auto source_part_info = MergeTreePartInfo::fromPartName(
entry->source_parts.at(0), format_version);
auto in_partition = mutations_by_partition.find(source_part_info.partition_id);
if (in_partition != mutations_by_partition.end())
{
auto mutations_begin_it = in_partition->second.upper_bound(source_part_info.getDataVersion());
auto mutations_end_it = in_partition->second.upper_bound(result_data_version);
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
MutationStatus & status = *it->second;
status.latest_failed_part = entry->source_parts.at(0);
status.latest_failed_part_info = source_part_info;
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
}
}
}
return false;
}
return true;
2016-01-10 04:44:12 +00:00
}
2020-09-03 13:00:13 +00:00
ReplicatedMergeTreeQueue::OperationsInQueue ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
{
std::lock_guard lock(state_mutex);
2019-08-21 13:10:33 +00:00
size_t count_merges = 0;
size_t count_mutations = 0;
2020-09-03 13:00:13 +00:00
size_t count_merges_with_ttl = 0;
for (const auto & entry : queue)
2019-08-21 13:10:33 +00:00
{
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS)
2020-09-03 13:00:13 +00:00
{
2019-08-21 13:10:33 +00:00
++count_merges;
2020-09-03 13:00:13 +00:00
if (isTTLMergeType(entry->merge_type))
++count_merges_with_ttl;
}
2019-08-21 13:10:33 +00:00
else if (entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count_mutations;
}
2020-09-03 13:00:13 +00:00
return OperationsInQueue{count_merges, count_mutations, count_merges_with_ttl};
}
size_t ReplicatedMergeTreeQueue::countMutations() const
{
std::lock_guard lock(state_mutex);
return mutations_by_znode.size();
}
size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
for (const auto & pair : mutations_by_znode)
{
const auto & mutation = pair.second;
if (!mutation.is_done)
break;
++count;
}
return count;
}
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper)
2016-01-10 04:44:12 +00:00
{
return ReplicatedMergeTreeMergePredicate(*this, zookeeper);
2016-01-10 04:44:12 +00:00
}
2020-03-24 17:05:38 +00:00
MutationCommands ReplicatedMergeTreeQueue::getFirstAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
std::lock_guard lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
return MutationCommands{};
Int64 part_version = part->info.getDataVersion();
for (auto [mutation_version, mutation_status] : in_partition->second)
if (mutation_version > part_version && mutation_status->entry->alter_version != -1)
return mutation_status->entry->commands;
return MutationCommands{};
}
MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
const MergeTreeData::DataPartPtr & part, Int64 desired_mutation_version) const
{
/// NOTE: If the corresponding mutation is not found, the error is logged (and not thrown as an exception)
/// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry
/// from /mutations in ZK and the replicas will simply skip the mutation.
if (part->info.getDataVersion() > desired_mutation_version)
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Data version of part {} is already greater than desired mutation version {}", part->name, desired_mutation_version);
return MutationCommands{};
}
std::lock_guard lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
if (in_partition == mutations_by_partition.end())
{
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "There are no mutations for partition ID {} (trying to mutate part {} to {})", part->info.partition_id, part->name, toString(desired_mutation_version));
return MutationCommands{};
}
auto begin = in_partition->second.upper_bound(part->info.getDataVersion());
auto end = in_partition->second.lower_bound(desired_mutation_version);
if (end == in_partition->second.end() || end->first != desired_mutation_version)
2020-05-23 22:24:01 +00:00
LOG_WARNING(log, "Mutation with version {} not found in partition ID {} (trying to mutate part {}", desired_mutation_version, part->info.partition_id, part->name + ")");
else
++end;
MutationCommands commands;
for (auto it = begin; it != end; ++it)
commands.insert(commands.end(), it->second->entry->commands.begin(), it->second->entry->commands.end());
return commands;
}
bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeeper)
{
std::vector<ReplicatedMergeTreeMutationEntryPtr> candidates;
{
std::lock_guard lock(state_mutex);
for (auto & kv : mutations_by_znode)
{
const String & znode = kv.first;
MutationStatus & mutation = kv.second;
if (mutation.is_done)
continue;
if (znode <= mutation_pointer)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Marking mutation {} done because it is <= mutation_pointer ({})", znode, mutation_pointer);
mutation.is_done = true;
2020-02-17 18:07:22 +00:00
alter_sequence.finishDataAlter(mutation.entry->alter_version, lock);
2020-02-05 16:30:02 +00:00
if (mutation.parts_to_do.size() != 0)
{
2020-05-23 22:24:01 +00:00
LOG_INFO(log, "Seems like we jumped over mutation {} when downloaded part with bigger mutation number.{}", znode, " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas.");
2020-02-05 16:30:02 +00:00
mutation.parts_to_do.clear();
}
}
2020-03-09 02:55:28 +00:00
else if (mutation.parts_to_do.size() == 0)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Will check if mutation {} is done", mutation.entry->znode_name);
candidates.push_back(mutation.entry);
}
}
}
if (candidates.empty())
return false;
else
2020-05-23 22:24:01 +00:00
LOG_DEBUG(log, "Trying to finalize {} mutations", candidates.size());
auto merge_pred = getMergePredicate(zookeeper);
std::vector<const ReplicatedMergeTreeMutationEntry *> finished;
for (const ReplicatedMergeTreeMutationEntryPtr & candidate : candidates)
{
if (merge_pred.isMutationFinished(*candidate))
finished.push_back(candidate.get());
}
if (!finished.empty())
{
2021-05-08 10:59:55 +00:00
zookeeper->set(fs::path(replica_path) / "mutation_pointer", finished.back()->znode_name);
std::lock_guard lock(state_mutex);
mutation_pointer = finished.back()->znode_name;
for (const ReplicatedMergeTreeMutationEntry * entry : finished)
{
auto it = mutations_by_znode.find(entry->znode_name);
if (it != mutations_by_znode.end())
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Mutation {} is done", entry->znode_name);
it->second.is_done = true;
2020-02-17 16:33:05 +00:00
if (entry->isAlterMutation())
2020-02-05 11:18:11 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
2020-02-17 18:07:22 +00:00
alter_sequence.finishDataAlter(entry->alter_version, lock);
2020-02-05 11:18:11 +00:00
}
}
}
}
2020-02-28 08:55:17 +00:00
/// Mutations may finish in non sequential order because we may fetch
/// already mutated parts from other replicas. So, because we updated
/// mutation pointer we have to recheck all previous mutations, they may be
/// also finished.
return !finished.empty();
}
2018-03-03 16:26:06 +00:00
ReplicatedMergeTreeQueue::Status ReplicatedMergeTreeQueue::getStatus() const
2016-01-10 04:44:12 +00:00
{
std::lock_guard lock(state_mutex);
Status res;
res.future_parts = future_parts.size();
res.queue_size = queue.size();
res.last_queue_update = last_queue_update;
res.inserts_in_queue = 0;
res.merges_in_queue = 0;
res.part_mutations_in_queue = 0;
res.queue_oldest_time = 0;
res.inserts_oldest_time = 0;
res.merges_oldest_time = 0;
res.part_mutations_oldest_time = 0;
for (const LogEntryPtr & entry : queue)
{
if (entry->create_time && (!res.queue_oldest_time || entry->create_time < res.queue_oldest_time))
res.queue_oldest_time = entry->create_time;
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
++res.inserts_in_queue;
if (entry->create_time && (!res.inserts_oldest_time || entry->create_time < res.inserts_oldest_time))
{
res.inserts_oldest_time = entry->create_time;
res.oldest_part_to_get = entry->new_part_name;
}
}
if (entry->type == LogEntry::MERGE_PARTS)
{
++res.merges_in_queue;
if (entry->create_time && (!res.merges_oldest_time || entry->create_time < res.merges_oldest_time))
{
res.merges_oldest_time = entry->create_time;
res.oldest_part_to_merge_to = entry->new_part_name;
}
}
if (entry->type == LogEntry::MUTATE_PART)
{
++res.part_mutations_in_queue;
if (entry->create_time && (!res.part_mutations_oldest_time || entry->create_time < res.part_mutations_oldest_time))
{
res.part_mutations_oldest_time = entry->create_time;
res.oldest_part_to_mutate_to = entry->new_part_name;
}
}
}
return res;
2016-01-10 04:44:12 +00:00
}
2018-03-03 16:26:06 +00:00
void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const
2016-01-10 04:44:12 +00:00
{
res.clear();
std::lock_guard lock(state_mutex);
2016-01-10 04:44:12 +00:00
res.reserve(queue.size());
for (const auto & entry : queue)
res.emplace_back(*entry);
2016-01-10 04:44:12 +00:00
}
void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const
{
std::lock_guard lock(state_mutex);
out_min_unprocessed_insert_time = min_unprocessed_insert_time;
out_max_processed_insert_time = max_processed_insert_time;
}
2020-07-31 11:37:16 +00:00
std::optional<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getIncompleteMutationsStatus(const String & znode_name, std::set<String> * mutation_ids) const
{
std::lock_guard lock(state_mutex);
auto current_mutation_it = mutations_by_znode.find(znode_name);
/// killed
if (current_mutation_it == mutations_by_znode.end())
return {};
const MutationStatus & status = current_mutation_it->second;
MergeTreeMutationStatus result
{
.is_done = status.is_done,
.latest_failed_part = status.latest_failed_part,
.latest_fail_time = status.latest_fail_time,
.latest_fail_reason = status.latest_fail_reason,
};
if (mutation_ids && !status.latest_fail_reason.empty())
{
const auto & latest_failed_part_info = status.latest_failed_part_info;
auto in_partition = mutations_by_partition.find(latest_failed_part_info.partition_id);
if (in_partition != mutations_by_partition.end())
{
const auto & version_to_status = in_partition->second;
auto begin_it = version_to_status.upper_bound(latest_failed_part_info.getDataVersion());
for (auto it = begin_it; it != version_to_status.end(); ++it)
{
/// All mutations with the same failure
if (!it->second->is_done && it->second->latest_fail_reason == status.latest_fail_reason)
2020-07-31 11:37:16 +00:00
mutation_ids->insert(it->second->entry->znode_name);
}
}
}
return result;
}
std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatus() const
{
std::lock_guard lock(state_mutex);
std::vector<MergeTreeMutationStatus> result;
for (const auto & pair : mutations_by_znode)
{
const MutationStatus & status = pair.second;
const ReplicatedMergeTreeMutationEntry & entry = *status.entry;
2020-02-05 16:30:02 +00:00
Names parts_to_mutate = status.parts_to_do.getParts();
for (const MutationCommand & command : entry.commands)
{
2020-11-09 16:05:40 +00:00
WriteBufferFromOwnString buf;
formatAST(*command.ast, buf, false, true);
result.push_back(MergeTreeMutationStatus
{
entry.znode_name,
2020-11-09 16:05:40 +00:00
buf.str(),
entry.create_time,
entry.block_numbers,
parts_to_mutate,
status.is_done,
status.latest_failed_part,
status.latest_fail_time,
status.latest_fail_reason,
});
}
}
return result;
}
ReplicatedMergeTreeQueue::QueueLocks ReplicatedMergeTreeQueue::lockQueue()
{
return QueueLocks(state_mutex, pull_logs_to_queue_mutex, update_mutations_mutex);
}
ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
ReplicatedMergeTreeQueue & queue_, zkutil::ZooKeeperPtr & zookeeper)
: queue(queue_)
, prev_virtual_parts(queue.format_version)
{
{
std::lock_guard lock(queue.state_mutex);
prev_virtual_parts = queue.virtual_parts;
}
/// Load current quorum status.
2021-05-08 10:59:55 +00:00
auto quorum_status_future = zookeeper->asyncTryGet(fs::path(queue.zookeeper_path) / "quorum" / "status");
/// Load current inserts
std::unordered_set<String> lock_holder_paths;
2021-05-08 10:59:55 +00:00
for (const String & entry : zookeeper->getChildren(fs::path(queue.zookeeper_path) / "temp"))
{
if (startsWith(entry, "abandonable_lock-"))
2021-05-08 10:59:55 +00:00
lock_holder_paths.insert(fs::path(queue.zookeeper_path) / "temp" / entry);
}
if (!lock_holder_paths.empty())
{
2021-05-08 10:59:55 +00:00
Strings partitions = zookeeper->getChildren(fs::path(queue.zookeeper_path) / "block_numbers");
std::vector<std::future<Coordination::ListResponse>> lock_futures;
for (const String & partition : partitions)
2021-05-08 10:59:55 +00:00
lock_futures.push_back(zookeeper->asyncGetChildren(fs::path(queue.zookeeper_path) / "block_numbers" / partition));
struct BlockInfoInZooKeeper
{
String partition;
Int64 number;
String zk_path;
std::future<Coordination::GetResponse> contents_future;
};
std::vector<BlockInfoInZooKeeper> block_infos;
for (size_t i = 0; i < partitions.size(); ++i)
{
Strings partition_block_numbers = lock_futures[i].get().names;
for (const String & entry : partition_block_numbers)
{
/// TODO: cache block numbers that are abandoned.
/// We won't need to check them on the next iteration.
if (startsWith(entry, "block-"))
{
Int64 block_number = parse<Int64>(entry.substr(strlen("block-")));
2021-05-08 10:59:55 +00:00
String zk_path = fs::path(queue.zookeeper_path) / "block_numbers" / partitions[i] / entry;
2019-08-03 11:02:40 +00:00
block_infos.emplace_back(
BlockInfoInZooKeeper{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)});
}
}
}
2019-08-03 11:02:40 +00:00
for (auto & block : block_infos)
{
Coordination::GetResponse resp = block.contents_future.get();
if (resp.error == Coordination::Error::ZOK && lock_holder_paths.count(resp.data))
committing_blocks[block.partition].insert(block.number);
}
}
merges_version = queue_.pullLogsToQueue(zookeeper);
{
/// We avoid returning here a version to be used in a lightweight transaction.
///
/// When pinned parts set is changed a log entry is added to the queue in the same transaction.
/// The log entry serves as a synchronization point, and it also increments `merges_version`.
///
/// If pinned parts are fetched after logs are pulled then we can safely say that it contains all locks up to `merges_version`.
String s = zookeeper->get(queue.zookeeper_path + "/pinned_part_uuids");
pinned_part_uuids.fromString(s);
}
Coordination::GetResponse quorum_status_response = quorum_status_future.get();
if (quorum_status_response.error == Coordination::Error::ZOK)
{
ReplicatedMergeTreeQuorumEntry quorum_status;
quorum_status.fromString(quorum_status_response.data);
inprogress_quorum_part = quorum_status.part_name;
}
else
inprogress_quorum_part.clear();
}
bool ReplicatedMergeTreeMergePredicate::operator()(
2020-04-16 18:47:20 +00:00
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason) const
{
if (left)
return canMergeTwoParts(left, right, out_reason);
else
return canMergeSinglePart(right, out_reason);
}
bool ReplicatedMergeTreeMergePredicate::canMergeTwoParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
String * out_reason) const
{
/// A sketch of a proof of why this method actually works:
///
/// The trickiest part is to ensure that no new parts will ever appear in the range of blocks between left and right.
/// Inserted parts get their block numbers by acquiring an ephemeral lock (see EphemeralLockInZooKeeper.h).
/// These block numbers are monotonically increasing in a partition.
///
/// Because there is a window between the moment the inserted part gets its block number and
/// the moment it is committed (appears in the replication log), we can't get the name of all parts up to the given
/// block number just by looking at the replication log - some parts with smaller block numbers may be currently committing
/// and will appear in the log later than the parts with bigger block numbers.
///
/// We also can't take a consistent snapshot of parts that are already committed plus parts that are about to commit
/// due to limitations of ZooKeeper transactions.
///
/// So we do the following (see the constructor):
/// * copy virtual_parts from queue to prev_virtual_parts
/// (a set of parts which corresponds to executing the replication log up to a certain point)
/// * load committing_blocks (inserts and mutations that have already acquired a block number but haven't appeared in the log yet)
/// * do pullLogsToQueue() again to load fresh queue.virtual_parts and mutations.
///
/// Now we have an invariant: if some part is in prev_virtual_parts then:
/// * all parts with smaller block numbers are either in committing_blocks or in queue.virtual_parts
/// (those that managed to commit before we loaded committing_blocks).
/// * all mutations with smaller block numbers are either in committing_blocks or in queue.mutations_by_partition
///
/// So to check that no new parts will ever appear in the range of blocks between left and right we first check that
/// left and right are already present in prev_virtual_parts (we can't give a definite answer for parts that were committed later)
/// and then check that there are no blocks between them in committing_blocks and no parts in queue.virtual_parts.
///
/// Similarly, to check that there will be no mutation with a block number between two parts from prev_virtual_parts
/// (only then we can merge them without mutating the left part), we first check committing_blocks
/// and then check that these two parts have the same mutation version according to queue.mutations_by_partition.
if (left->info.partition_id != right->info.partition_id)
{
2021-05-10 18:03:37 +00:00
throw Exception(ErrorCodes::LOGICAL_ERROR, "Parts {} and {} belong to different partitions", left->name, right->name);
}
for (const MergeTreeData::DataPartPtr & part : {left, right})
{
if (pinned_part_uuids.part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
return false;
}
if (part->name == inprogress_quorum_part)
{
if (out_reason)
*out_reason = "Quorum insert for part " + part->name + " is currently in progress";
return false;
}
if (prev_virtual_parts.getContainingPart(part->info).empty())
{
if (out_reason)
*out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
return false;
}
}
Int64 left_max_block = left->info.max_block;
Int64 right_min_block = right->info.min_block;
if (left_max_block > right_min_block)
std::swap(left_max_block, right_min_block);
if (left_max_block + 1 < right_min_block)
{
auto committing_blocks_in_partition = committing_blocks.find(left->info.partition_id);
if (committing_blocks_in_partition != committing_blocks.end())
{
const std::set<Int64> & block_numbers = committing_blocks_in_partition->second;
auto block_it = block_numbers.upper_bound(left_max_block);
if (block_it != block_numbers.end() && *block_it < right_min_block)
{
if (out_reason)
*out_reason = "Block number " + toString(*block_it) + " is still being inserted between parts "
+ left->name + " and " + right->name;
return false;
}
}
}
std::lock_guard lock(queue.state_mutex);
for (const MergeTreeData::DataPartPtr & part : {left, right})
{
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
/// and it is guaranteed that it will contain all merges assigned before this object is constructed.
String containing_part = queue.virtual_parts.getContainingPart(part->info);
if (containing_part != part->name)
{
if (out_reason)
*out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
return false;
}
}
if (left_max_block + 1 < right_min_block)
{
2019-08-16 15:57:19 +00:00
/// Fake part which will appear as merge result
MergeTreePartInfo gap_part_info(
left->info.partition_id, left_max_block + 1, right_min_block - 1,
MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
2019-08-16 15:57:19 +00:00
/// We don't select parts if any smaller part covered by our merge must exist after
/// processing replication log up to log_pointer.
Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
*out_reason = "There are " + toString(covered.size()) + " parts (from " + covered.front()
2020-04-10 17:23:45 +00:00
+ " to " + covered.back() + ") that are still not present or being processed by "
2019-08-16 15:57:19 +00:00
+ " other background process on this replica between " + left->name + " and " + right->name;
return false;
}
}
Int64 left_mutation_ver = queue.getCurrentMutationVersionImpl(
left->info.partition_id, left->info.getDataVersion(), lock);
2019-08-16 15:57:19 +00:00
Int64 right_mutation_ver = queue.getCurrentMutationVersionImpl(
left->info.partition_id, right->info.getDataVersion(), lock);
2019-08-16 15:57:19 +00:00
if (left_mutation_ver != right_mutation_ver)
{
if (out_reason)
*out_reason = "Current mutation versions of parts " + left->name + " and " + right->name + " differ: "
+ toString(left_mutation_ver) + " and " + toString(right_mutation_ver) + " respectively";
return false;
}
return MergeTreeData::partsContainSameProjections(left, right);
}
2020-04-16 18:47:20 +00:00
bool ReplicatedMergeTreeMergePredicate::canMergeSinglePart(
const MergeTreeData::DataPartPtr & part,
String * out_reason) const
2020-04-10 21:29:54 +00:00
{
if (pinned_part_uuids.part_uuids.contains(part->uuid))
{
if (out_reason)
*out_reason = "Part " + part->name + " has uuid " + toString(part->uuid) + " which is currently pinned";
return false;
}
2020-04-10 21:29:54 +00:00
if (part->name == inprogress_quorum_part)
{
if (out_reason)
*out_reason = "Quorum insert for part " + part->name + " is currently in progress";
return false;
}
if (prev_virtual_parts.getContainingPart(part->info).empty())
{
if (out_reason)
*out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
return false;
}
std::lock_guard<std::mutex> lock(queue.state_mutex);
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
/// and it is guaranteed that it will contain all merges assigned before this object is constructed.
String containing_part = queue.virtual_parts.getContainingPart(part->info);
if (containing_part != part->name)
{
if (out_reason)
2020-04-13 15:58:29 +00:00
*out_reason = "Part " + part->name + " has already been assigned a merge into " + containing_part;
2020-04-10 21:29:54 +00:00
return false;
}
return true;
}
2020-01-31 12:25:31 +00:00
std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as
2019-08-19 19:02:20 +00:00
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
/// This means that if we have loaded the mutation with version number X then all mutations with
/// the version numbers less than X are also loaded and if there is no merge or mutation assigned to
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
/// version X for this part.
2020-04-10 17:23:45 +00:00
/// We cannot mutate part if it's being inserted with quorum and it's not
/// already reached.
if (part->name == inprogress_quorum_part)
return {};
std::lock_guard lock(queue.state_mutex);
if (queue.virtual_parts.getContainingPart(part->info) != part->name)
return {};
auto in_partition = queue.mutations_by_partition.find(part->info.partition_id);
if (in_partition == queue.mutations_by_partition.end())
return {};
Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
Int64 max_version = in_partition->second.rbegin()->first;
2020-01-31 19:30:33 +00:00
2020-01-31 12:25:31 +00:00
int alter_version = -1;
for (auto [mutation_version, mutation_status] : in_partition->second)
{
max_version = mutation_version;
2020-05-04 16:30:39 +00:00
if (mutation_status->entry->isAlterMutation())
2020-01-31 12:25:31 +00:00
{
2020-05-04 16:30:39 +00:00
/// We want to assign mutations for part which version is bigger
2020-08-08 00:47:03 +00:00
/// than part current version. But it doesn't make sense to assign
2020-05-04 16:30:39 +00:00
/// more fresh versions of alter-mutations if previous alter still
/// not done because alters execute one by one in strict order.
if (mutation_version > current_version || !mutation_status->is_done)
{
alter_version = mutation_status->entry->alter_version;
break;
}
2020-01-31 12:25:31 +00:00
}
}
2020-02-13 12:03:30 +00:00
if (current_version >= max_version)
return {};
2020-01-31 12:25:31 +00:00
return std::make_pair(max_version, alter_version);
}
bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const
{
for (const auto & kv : mutation.block_numbers)
{
const String & partition_id = kv.first;
Int64 block_num = kv.second;
auto partition_it = committing_blocks.find(partition_id);
if (partition_it != committing_blocks.end())
{
size_t blocks_count = std::distance(
partition_it->second.begin(), partition_it->second.lower_bound(block_num));
if (blocks_count)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(queue.log, "Mutation {} is not done yet because in partition ID {} there are still {} uncommitted blocks.", mutation.znode_name, partition_id, blocks_count);
return false;
}
}
}
{
std::lock_guard lock(queue.state_mutex);
size_t suddenly_appeared_parts = getPartNamesToMutate(mutation, queue.virtual_parts).size();
if (suddenly_appeared_parts)
{
2020-05-23 22:24:01 +00:00
LOG_TRACE(queue.log, "Mutation {} is not done yet because {} parts to mutate suddenly appeared.", mutation.znode_name, suddenly_appeared_parts);
return false;
}
}
return true;
}
2021-07-05 19:58:36 +00:00
bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
{
std::lock_guard lock(queue.state_mutex);
return queue.drop_ranges.hasDropRange(new_drop_range_info);
}
ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(state_mutex);
std::lock_guard lock_subscribers(subscribers_mutex);
auto it = subscribers.emplace(subscribers.end(), std::move(callback));
/// Atomically notify about current size
(*it)(queue.size());
return SubscriberHandler(it, *this);
}
ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(queue.subscribers_mutex);
queue.subscribers.erase(it);
}
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size);
}
ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0);
}
2016-01-10 04:44:12 +00:00
String padIndex(Int64 index)
{
String index_str = toString(index);
return std::string(10 - index_str.size(), '0') + index_str;
2016-01-10 04:44:12 +00:00
}
void ReplicatedMergeTreeQueue::removeCurrentPartsFromMutations()
{
std::lock_guard state_lock(state_mutex);
for (const auto & part_name : current_parts.getParts())
2020-06-16 10:50:47 +00:00
removeCoveredPartsFromMutations(part_name, /*remove_part = */ true, /*remove_covered_parts = */ true);
}
2020-06-16 10:50:47 +00:00
2016-01-10 04:44:12 +00:00
}