mark queue as dirty if status of ZK operation is unknown [#CLICKHOUSE-3405]

This commit is contained in:
Alexey Zatelepin 2017-11-28 17:07:17 +03:00
parent 78aaa42ddd
commit 01d42242b1
4 changed files with 82 additions and 64 deletions

View File

@ -2,7 +2,6 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/MergeTreeDataMerger.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/StringUtils.h>
@ -16,13 +15,6 @@ namespace ErrorCodes
}
ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_)
: storage(storage_)
, virtual_parts(storage.data.format_version)
{
}
void ReplicatedMergeTreeQueue::initVirtualParts(const MergeTreeData::DataParts & parts)
{
std::lock_guard<std::mutex> lock(mutex);
@ -32,36 +24,60 @@ void ReplicatedMergeTreeQueue::initVirtualParts(const MergeTreeData::DataParts &
}
void ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{
auto queue_path = replica_path + "/queue";
LOG_DEBUG(log, "Loading queue from " << queue_path);
std::lock_guard<std::mutex> lock(mutex);
Strings children = zookeeper->getChildren(queue_path);
LOG_DEBUG(log, "Having " << children.size() << " queue entries to load.");
std::sort(children.begin(), children.end());
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
futures.reserve(children.size());
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child));
for (auto & future : futures)
bool updated = false;
bool min_unprocessed_insert_time_changed = false;
{
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
std::lock_guard<std::mutex> lock(mutex);
entry->znode_name = future.first;
insertUnlocked(entry);
std::unordered_set<String> already_loaded_paths;
for (const LogEntryPtr & log_entry : queue)
already_loaded_paths.insert(log_entry->znode_name);
Strings children = zookeeper->getChildren(queue_path);
auto to_remove_it = std::remove_if(
children.begin(), children.end(), [&](const String & path)
{
return already_loaded_paths.count(path);
});
LOG_DEBUG(log,
"Having " << (to_remove_it - children.begin()) << " queue entries to load, "
<< (children.end() - to_remove_it) << " entries already loaded.");
children.erase(to_remove_it, children.end());
std::sort(children.begin(), children.end());
std::vector<std::pair<String, zkutil::ZooKeeper::GetFuture>> futures;
futures.reserve(children.size());
for (const String & child : children)
futures.emplace_back(child, zookeeper->asyncGet(queue_path + "/" + child));
for (auto & future : futures)
{
zkutil::ZooKeeper::ValueAndStat res = future.second.get();
LogEntryPtr entry = LogEntry::parse(res.value, res.stat);
entry->znode_name = future.first;
time_t prev_min_unprocessed_insert_time = min_unprocessed_insert_time;
insertUnlocked(entry);
updated = true;
if (min_unprocessed_insert_time != prev_min_unprocessed_insert_time)
min_unprocessed_insert_time_changed = true;
}
}
updateTimesInZooKeeper(zookeeper, true, false);
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, false);
LOG_TRACE(log, "Loaded queue");
return updated;
}
@ -241,8 +257,14 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
{
std::lock_guard<std::mutex> lock(pull_logs_to_queue_mutex);
zkutil::Stat log_pointer_stat;
String index_str = zookeeper->get(replica_path + "/log_pointer", &log_pointer_stat);
bool dirty_entries_loaded = false;
if (is_dirty)
{
dirty_entries_loaded = load(zookeeper);
is_dirty = false;
}
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
Strings log_entries = zookeeper->getChildren(zookeeper_path + "/log");
@ -252,7 +274,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
/// If we do not already have a pointer to the log, put a pointer to the first entry in it.
index = log_entries.empty() ? 0 : parse<UInt64>(std::min_element(log_entries.begin(), log_entries.end())->substr(strlen("log-")));
zookeeper->set(replica_path + "/log_pointer", toString(index), -1, &log_pointer_stat);
zookeeper->set(replica_path + "/log_pointer", toString(index));
}
else
{
@ -329,35 +351,25 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
}
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/log_pointer", toString(last_entry_index + 1), log_pointer_stat.version));
replica_path + "/log_pointer", toString(last_entry_index + 1), -1));
if (min_unprocessed_insert_time_changed)
ops.emplace_back(std::make_unique<zkutil::Op::SetData>(
replica_path + "/min_unprocessed_insert_time", toString(min_unprocessed_insert_time), -1));
for (size_t num_tries = 0; ; ++num_tries)
try
{
if (storage.shutdown_called)
return false;
try
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & ex)
{
if (ex.isTemporaryError())
{
zookeeper->multi(ops);
break;
LOG_WARNING(log, "Unknown status of queue update, marking queue dirty (will reload on next iteration).");
is_dirty = true;
}
catch (const zkutil::KeeperException & ex)
{
if (num_tries > 0 && ex.code == ZBADVERSION)
{
LOG_TRACE(log, "Version mismatch while updating the queue. Assuming we have already updated it.");
break;
}
if (!ex.isTemporaryError())
throw;
tryLogCurrentException(log, "Temporary error while updating the queue (will retry)");
}
throw;
}
/// Now we have successfully updated the queue in ZooKeeper. Update it in RAM.
@ -394,7 +406,7 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, B
next_update_event->schedule();
}
return !log_entries.empty();
return dirty_entries_loaded || !log_entries.empty();
}
@ -454,7 +466,7 @@ void ReplicatedMergeTreeQueue::removeGetsAndMergesInRange(zkutil::ZooKeeperPtr z
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if (((*it)->type == LogEntry::GET_PART || (*it)->type == LogEntry::MERGE_PARTS) &&
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, storage.data.format_version))
MergeTreePartInfo::contains(part_name, (*it)->new_part_name, format_version))
{
if ((*it)->currently_executing)
to_wait.push_back(*it);
@ -493,14 +505,14 @@ ReplicatedMergeTreeQueue::Queue ReplicatedMergeTreeQueue::getConflictsForClearCo
{
if (elem->type == LogEntry::MERGE_PARTS || elem->type == LogEntry::GET_PART || elem->type == LogEntry::ATTACH_PART)
{
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, storage.data.format_version))
if (MergeTreePartInfo::contains(entry.new_part_name, elem->new_part_name, format_version))
conflicts.emplace_back(elem);
}
if (elem->type == LogEntry::CLEAR_COLUMN)
{
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, storage.data.format_version);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, storage.data.format_version);
auto cur_part = MergeTreePartInfo::fromPartName(elem->new_part_name, format_version);
auto part = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
if (part.partition_id == cur_part.partition_id)
conflicts.emplace_back(elem);
@ -556,12 +568,12 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_
/// A more complex check is whether another part is currently created by other action that will cover this part.
/// NOTE The above is redundant, but left for a more convenient message in the log.
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, storage.data.format_version);
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited.
for (const auto & future_part_name : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_name, storage.data.format_version);
auto future_part = MergeTreePartInfo::fromPartName(future_part_name, format_version);
if (future_part.contains(result_part))
{

View File

@ -13,7 +13,6 @@ namespace DB
class MergeTreeDataMerger;
class StorageReplicatedMergeTree;
class ReplicatedMergeTreeQueue
@ -41,7 +40,7 @@ private:
using InsertsByTime = std::set<LogEntryPtr, ByTime>;
const StorageReplicatedMergeTree & storage;
MergeTreeDataFormatVersion format_version;
String zookeeper_path;
String replica_path;
@ -52,6 +51,10 @@ private:
*/
Queue queue;
/// If true, the queue in RAM is possibly out of sync with ZK and we need to reload it.
/// Protected by pull_logs_to_queue_mutex.
bool is_dirty = false;
InsertsByTime inserts_by_time;
time_t min_unprocessed_insert_time = 0;
time_t max_processed_insert_time = 0;
@ -83,7 +86,7 @@ private:
void initVirtualParts(const MergeTreeData::DataParts & parts);
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
void load(zkutil::ZooKeeperPtr zookeeper);
bool load(zkutil::ZooKeeperPtr zookeeper);
void insertUnlocked(LogEntryPtr & entry);
@ -127,7 +130,11 @@ private:
};
public:
ReplicatedMergeTreeQueue(const StorageReplicatedMergeTree & storage_);
ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
, virtual_parts(format_version)
{
}
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts, zkutil::ZooKeeperPtr zookeeper);

View File

@ -188,7 +188,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }),
reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(*this),
reader(data), writer(data), merger(data, context.getBackgroundPool()), queue(data.format_version),
fetcher(data),
shutdown_event(false), part_check_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))

View File

@ -205,7 +205,6 @@ private:
friend class ReplicatedMergeTreeRestartingThread;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;