CLICKHOUSE-3847 Update. Add new settings. Del is_lost

This commit is contained in:
VadimPE 2018-08-07 18:21:42 +03:00
parent e2be3fd5ce
commit 32b6965cf2
6 changed files with 214 additions and 114 deletions

View File

@ -63,9 +63,12 @@ struct MergeTreeSettings
* duplicating INSERTs during that period of time. */ \
M(SettingUInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60) /** one week */ \
\
/** How many records may be in log, if there is inactive replica */ \
M(SettingUInt64, max_replicated_logs_to_keep, 6000) \
\
/** Keep about this number of last records in ZooKeeper log, even if they are obsolete. \
* It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. */ \
M(SettingUInt64, replicated_logs_to_keep, 100) \
M(SettingUInt64, min_replicated_logs_to_keep, 5000) \
\
/** After specified amount of time passed after replication log entry creation \
* and sum size of parts is greater than threshold, \

View File

@ -75,43 +75,65 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
int children_count = stat.numChildren;
/// We will wait for 1.1 times more records to accumulate than necessary.
if (static_cast<double>(children_count) < storage.data.settings.replicated_logs_to_keep * 1.1)
if (static_cast<double>(children_count) < storage.data.settings.min_replicated_logs_to_keep * 1.1)
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
std::unordered_map<String, UInt64> log_pointers_lost_replicas;
for (const String & replica : replicas)
{
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer");
if (pointer.empty())
return;
UInt32 log_pointer = parse<UInt64>(pointer);
/// Check status of replica (active or not).
/// If replica is not active, we will save it's log_pointer.
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
min_pointer = std::min(min_pointer, log_pointer);
else
log_pointers_lost_replicas[replica] = log_pointer;
}
UInt64 min_pointer_active_replica = std::numeric_limits<UInt64>::max();
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log");
std::sort(entries.begin(), entries.end());
/// We will not touch the last `replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
if (entries.empty())
return;
/// We will mark lost replicas.
markLostReplicas(log_pointers_lost_replicas, entries.back());
std::sort(entries.begin(), entries.end());
String min_saved_record_log_str = entries[std::max(0, entries.size() - storage.data.settings.max_replicated_logs_to_keep.value)];
String min_pointer_inactive_replica_str;
for (const String & replica : replicas)
{
zkutil::Stat log_pointer_stat;
String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", &log_pointer_stat);
if (pointer.empty())
return;
UInt32 log_pointer = parse<UInt64>(pointer);
String log_pointer_str = "log-" + padIndex(log_pointer);
/// Check status of replica (active or not).
/// If replica was not active, we could check when it's log_pointer locates.
/// If replica active, but runs clocneReplica(), we should check it's log_pointer in logs.
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active") && log_pointer_str >= entries[0])
min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer);
else
{
if (log_pointer_str >= min_saved_record_log_str)
{
if (min_pointer_inactive_replica_str != "" && min_pointer_inactive_replica_str >= log_pointer_str)
min_pointer_inactive_replica_str = log_pointer_str;
else if (min_pointer_inactive_replica_str == "")
min_pointer_inactive_replica_str = log_pointer_str;
}
}
}
String min_pointer_active_replica_str = "log-" + padIndex(min_pointer_active_replica);
String min_pointer_replica_str = min_pointer_inactive_replica_str == ""
? min_pointer_active_replica_str
: std::min(min_pointer_inactive_replica_str, min_pointer_active_replica_str);
/// We will not touch the last `min_replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer_active_replica`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), min_pointer_replica_str), entries.end());
/// We must check if we are only active_node
if (entries.empty())
return;
zkutil::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)

View File

@ -360,13 +360,6 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (zookeeper->exists(replica_path + "/is_lost"))
{
restartLostReplica(zookeeper);
zookeeper->remove(replica_path + "/is_lost", -1);
}
String index_str = zookeeper->get(replica_path + "/log_pointer");
UInt64 index;
@ -400,6 +393,7 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z
if (!log_entries.empty())
{
std::sort(log_entries.begin(), log_entries.end());
/// ZK contains a limit on the number or total size of operations in a multi-request.

View File

@ -197,6 +197,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
{
removeFailedQuorumParts();
activateReplica();
storage.cloneReplicaIfNeeded();
updateQuorumIfWeHavePart();
if (storage.data.settings.replicated_can_become_leader)

View File

@ -646,73 +646,7 @@ void StorageReplicatedMergeTree::createReplica()
}
else
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
/** If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
* You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned.
* The same can be done for the table. You can automatically delete a replica/table node,
* if you see that it was not created up to the end, and the one who created it died.
*/
while (!zookeeper->exists(source_path + "/columns"))
{
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(source_path + "/columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
break;
}
event->wait();
}
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
/// Copy reference to the log from `reference/master` replica.
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
/// Let's remember the queue of the reference/master replica.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
std::sort(source_queue_names.begin(), source_queue_names.end());
Strings source_queue;
for (const String & entry_name : source_queue_names)
{
String entry;
if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
continue;
source_queue.push_back(entry);
}
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings active_parts = active_parts_set.getParts();
for (const String & name : active_parts)
{
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
}
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
/// Add content of the reference/master replica queue to the queue.
for (const String & entry : source_queue)
{
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
/// It will then be loaded into the queue variable in `queue.initialize` method.
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
cloneReplicaIfNeeded();
}
zookeeper->create(replica_path + "/columns", getColumns().toString(), zkutil::CreateMode::Persistent);
@ -2039,6 +1973,144 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
}
bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper)
{
LOG_INFO(log, "Will mimic " << source_replica);
String source_path = zookeeper_path + "/replicas/" + source_replica;
/** If the reference/master replica is not yet fully created, let's wait.
* NOTE: If something went wrong while creating it, we can hang around forever.
* You can create an ephemeral node at the time of creation to make sure that the replica is created, and not abandoned.
* The same can be done for the table. You can automatically delete a replica/table node,
* if you see that it was not created up to the end, and the one who created it died.
*/
while (!zookeeper->exists(source_path + "/columns"))
{
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(source_path + "/columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
break;
}
event->wait();
}
/// The order of the following three actions is important. Entries in the log can be duplicated, but they can not be lost.
/// Copy reference to the log from `reference/master` replica.
zkutil::Requests rec;
/// We must check is_active and set log_pointer atomically in order to cleanupThread can not clear log with our log_pointer.
rec.push_back(zkutil::makeCheckRequest(source_path + "/is_active", 0));
rec.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"), -1));
try
{
zookeeper->multi(rec);
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
return false;
else
throw e;
}
zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
/// Let's remember the queue of the reference/master replica.
Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
std::sort(source_queue_names.begin(), source_queue_names.end());
Strings source_queue;
for (const String & entry_name : source_queue_names)
{
String entry;
if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
continue;
source_queue.push_back(entry);
}
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings active_parts = active_parts_set.getParts();
for (const String & name : active_parts)
{
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = name;
log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
}
LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
/// Add content of the reference/master replica queue to the queue.
for (const String & entry : source_queue)
{
zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
}
/// It will then be loaded into the queue variable in `queue.initialize` method.
LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
return true;
}
void StorageReplicatedMergeTree::cloneReplicaIfNeeded()
{
auto zookeeper = getZooKeeper();
String raw_log_pointer = zookeeper->get(replica_path + "/log_pointer");
Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
if (entries.empty())
return;
std::sort(entries.begin(), entries.end());
if (!raw_log_pointer.empty() && "log-" + padIndex(parse<UInt64>(raw_log_pointer)) >= entries[0])
return;
clearQueue();
String source_replica;
do
{
/// It is really needed?
while (source_replica == "")
{
for (const String & replica_name : zookeeper->getChildren(zookeeper_path + "/replicas"))
{
String source_replica_path = zookeeper_path + "/replicas/" + replica_name;
if (source_replica_path != replica_path && zookeeper->exists(source_replica_path + "/is_active") && !(zookeeper->get(source_replica_path + "/log_pointer").empty()))
source_replica = replica_name;
}
}
} while (cloneReplica(source_replica, zookeeper));
zookeeper->remove(replica_path + "/is_lost", -1);
}
void StorageReplicatedMergeTree::clearQueue()
{
auto zookeeper = getZooKeeper();
zookeeper->createOrUpdate(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
}
void StorageReplicatedMergeTree::queueUpdatingTask()
{
//most probably this check is not relevant

View File

@ -397,6 +397,14 @@ private:
void mutationsUpdatingTask();
/// Clone data from another replica.
bool cloneReplica(const String & source_replica, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded();
void clearQueue();
/** Performs actions from the queue.
*/
bool queueTask();