CLICKHOUSE-3847 add "is_lost". load() in Queue can check duplicate records

This commit is contained in:
VadimPE 2018-08-09 18:06:39 +03:00
parent e3495a6484
commit 75900f494f
6 changed files with 75 additions and 13 deletions

View File

@ -93,6 +93,9 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
: 0];
String min_pointer_inactive_replica_str;
std::unordered_map<String, String> log_pointers_lost_replicas;
std::unordered_map<String, UInt32> log_pointers_version;
for (const String & replica : replicas)
{
zkutil::Stat log_pointer_stat;
@ -110,6 +113,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer);
else
{
log_pointers_lost_replicas[replica] = log_pointer_str;
log_pointers_version[replica] = log_pointer_stat.version;
if (log_pointer_str >= min_saved_record_log_str)
{
@ -132,11 +137,21 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
/// We will not touch records that are no less than `min_pointer_active_replica`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), min_pointer_replica_str), entries.end());
/// We must check if we are only active_node
if (entries.empty())
return;
/// We must mark lost replicas.
try
{
markLostReplicas(log_pointers_lost_replicas, log_pointers_version, entries[0], zookeeper);
}
catch (const zkutil::KeeperException & e)
{
if (e.code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw e;
else return;
}
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
{
@ -155,6 +170,30 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
}
void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map<String, String> & log_pointers_lost_replicas,
const std::unordered_map<String, UInt32> & log_pointers_version,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper)
{
std::vector<zkutil::ZooKeeper::FutureMulti> futures;
for (auto pair : log_pointers_lost_replicas)
{
String replica = pair.first;
if (pair.second <= remove_border)
{
zkutil::Requests ops;
/// If log pointer changes version we can not mark replicas, so we check it.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", log_pointers_version.at(replica)));
ops.emplace_back(zkutil::makeCreateRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "", zkutil::CreateMode::Persistent));
futures.push_back(zookeeper->asyncMulti(ops));
}
}
for (auto & future : futures)
future.get();
}
struct ReplicatedMergeTreeCleanupThread::NodeWithStat
{
String node;

View File

@ -39,6 +39,11 @@ private:
/// Remove old records from ZooKeeper.
void clearOldLogs();
/// Mark lost replicas.
void markLostReplicas(const std::unordered_map<String, String> & log_pointers_lost_replicas,
const std::unordered_map <String, UInt32> & log_pointers_version,
const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper);
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks();

View File

@ -47,13 +47,25 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
std::optional<time_t> min_unprocessed_insert_time_changed;
{
std::lock_guard lock(state_mutex);
std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex);
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
Strings children = zookeeper->getChildren(queue_path);
LOG_DEBUG(log, "Having " << children.size() << " queue entries to load.");
std::unordered_set<String> already_loaded_paths;
auto to_remove_it = std::remove_if(
children.begin(), children.end(), [&](const String & path)
{
return already_loaded_paths.count(path);
});
LOG_DEBUG(log,
"Having " << (to_remove_it - children.begin()) << " queue entries to load, "
<< (children.end() - to_remove_it) << " entries already loaded.");
children.erase(to_remove_it, children.end());
std::sort(children.begin(), children.end());
zkutil::AsyncResponses<zkutil::GetResponse> futures;
@ -68,6 +80,8 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
entry->znode_name = future.first;
std::lock_guard lock(state_mutex);
insertUnlocked(entry, min_unprocessed_insert_time_changed, lock);
updated = true;
@ -93,7 +107,6 @@ void ReplicatedMergeTreeQueue::initialize(
log = &Logger::get(logger_name);
addVirtualParts(parts);
load(zookeeper);
}

View File

@ -147,9 +147,6 @@ private:
/// Put a set of (already existing) parts in virtual_parts.
void addVirtualParts(const MergeTreeData::DataParts & parts);
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
bool load(zkutil::ZooKeeperPtr zookeeper);
void insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock);
@ -233,6 +230,9 @@ public:
*/
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
/// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
bool load(zkutil::ZooKeeperPtr zookeeper);
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.

View File

@ -200,11 +200,15 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.cloneReplicaIfNeeded();
const auto & zookeeper = storage.getZooKeeper();
storage.queue.load(zookeeper);
/// pullLogsToQueue() after we mark replica 'is_active' and clone();
/// because cleanup_thread don't del our log_pointer.
storage.queue.pullLogsToQueue(storage.getZooKeeper());
storage.last_queue_update_finish_time.store(time(nullptr));
storage.cloneReplicaIfNeeded();
updateQuorumIfWeHavePart();

View File

@ -1977,11 +1977,10 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (!entries.empty())
if (entries.empty())
return false;
std::sort(entries.begin(), entries.end());
if ("log-" + padIndex(parse<UInt64>(raw_log_pointer)) < entries[0])
return false;
@ -2053,12 +2052,14 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
{
String source_replica_path = zookeeper_path + "/replicas/" + replica_name;
String source_log_pointer_raw = zookeeper->get(source_replica_path + "/log_pointer");
if ((source_replica_path != replica_path) && (!source_log_pointer_raw.empty()) && ("log-" + padIndex(parse<UInt64>(source_log_pointer_raw)) >= entries[0]))
if ((source_replica_path != replica_path) && (!zookeeper->exists(source_replica_path + "/is_lost")))
source_replica = replica_name;
}
}
} while (cloneReplica(source_replica, zookeeper));
} while (!cloneReplica(source_replica, zookeeper));
queueUpdatingTask();
}