dbms: clearing impossible entries from replication queue [#METR-18035].

This commit is contained in:
Alexey Milovidov 2015-09-15 01:45:19 +03:00
parent 6ffb49fbe4
commit 0743a87ebd

View File

@ -460,13 +460,9 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
{
expected_parts.erase(part->name);
}
else
{
unexpected_parts.insert(part);
}
}
/// Какие локальные куски добавить в ZK.
@ -791,7 +787,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть.
if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
{
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
@ -881,6 +877,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
zookeeper->multi(ops);
/** Удаление старых кусков из ZK и с диска делается отложенно - см. ReplicatedMergeTreeCleanupThread, clearOldParts.
*/
/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
* Это не проблема, потому что в таком случае слияние останется в очереди, и мы попробуем снова.
*/
@ -922,9 +921,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
if (replica.empty())
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
// TODO Проверить, может быть куска нет ни на одной живой или мёртвой реплике, и он исчез навсегда?
throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
}
@ -983,12 +979,14 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, Backgro
*/
if (replica.empty())
{
LOG_INFO(log, "No replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
return false;
}
}
/// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его.
/** Если ни у какой активной реплики нет куска, и в очереди нет слияний с его участием,
* проверим, есть ли у любой (активной или неактивной) реплики такой кусок или покрывающий его.
*/
if (replica.empty())
enqueuePartForCheck(entry.new_part_name);
}
@ -1723,11 +1721,12 @@ void StorageReplicatedMergeTree::partCheckThread()
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
auto part = data.getActiveContainingPart(part_name);
String part_path = replica_path + "/parts/" + part_name;
/// Этого или покрывающего куска у нас нет.
if (!part)
{
String part_path = replica_path + "/parts/" + part_name;
/// Если кусок есть в ZooKeeper, удалим его оттуда и добавим в очередь задание скачать его.
if (zookeeper->exists(part_path))
{
@ -1743,79 +1742,93 @@ void StorageReplicatedMergeTree::partCheckThread()
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
/** Будем проверять только куски, не полученные в результате слияния.
* Для кусков, полученных в результате слияния, такая проверка была бы некорректной,
* потому что слитого куска может еще ни у кого не быть.
/** Логика такая:
* - если у какой-то живой или неактивной реплики есть такой кусок, или покрывающий его кусок
* - всё Ок, ничего делать не нужно, он скачается затем при обработке очереди, когда реплика оживёт;
* - или, если реплика никогда не оживёт, то администратор удалит или создаст новую реплику с тем же адресом и см. всё сначала;
* - если ни у кого нет такого или покрывающего его куска, то
* - если у кого-то есть составляющие куски, то ничего делать не будем
* - возможно, искомый кусок затем появится в результате мерджа
* - если ни у кого нет составляющих кусков, то признаем кусок навечно потерянным,
* и удалим запись из очереди репликации.
*/
if (part_info.left == part_info.right)
{
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
bool found = false;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
bool found = false;
bool found_pieces = false;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
{
if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
found = true;
LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
}
if (ActiveDataPartSet::contains(part_name, part_on_replica))
{
found_pieces = true;
LOG_WARNING(log,
"Found part " << part_on_replica << " on " << replica << " that is covered by demanded part " << part_name
<< ". That part alone is not enough to repair demanded part.");
}
}
if (found)
break;
}
if (!found && !found_pieces)
{
LOG_ERROR(log, "No replica has part covering " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если ни у кого нет такого куска, удалим его из нашей очереди.
bool was_in_queue = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{
if ((*it)->new_part_name == part_name)
{
found = true;
LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
queue.erase(it++);
was_in_queue = true;
}
else
{
++it;
}
}
if (found)
break;
}
if (!found)
if (was_in_queue)
{
LOG_ERROR(log, "No replica has part covering " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/** Такая ситуация возможна, если на всех репликах, где был кусок, он испортился.
* Например, у реплики, которая только что его записала, отключили питание, и данные не записались из кеша на диск.
*/
LOG_ERROR(log, "Part " << part_name << " is lost forever.");
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
/// Если ни у кого нет такого куска, удалим его из нашей очереди.
bool was_in_queue = false;
{
std::lock_guard<std::mutex> lock(queue_mutex);
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{
if ((*it)->new_part_name == part_name)
{
zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
queue.erase(it++);
was_in_queue = true;
}
else
{
++it;
}
}
}
if (was_in_queue)
{
/** Такая ситуация возможна, если на всех репликах, где был кусок, он испортился.
* Например, у реплики, которая только что его записала, отключили питание, и данные не записались из кеша на диск.
*/
LOG_ERROR(log, "Part " << part_name << " is lost forever.");
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
/** Нужно добавить отсутствующий кусок в block_numbers, чтобы он не мешал слияниям.
* Вот только в сам block_numbers мы его добавить не можем - если так сделать,
* ZooKeeper зачем-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* Специально из-за этого приходится отдельно иметь nonincrement_block_numbers.
*/
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers", "");
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6), "");
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6) + "/block-" + padIndex(part_info.left),
*zookeeper);
}
/** Нужно добавить отсутствующий кусок в block_numbers, чтобы он не мешал слияниям.
* Вот только в сам block_numbers мы его добавить не можем - если так сделать,
* ZooKeeper зачем-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* Специально из-за этого приходится отдельно иметь nonincrement_block_numbers.
*/
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers", "");
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6), "");
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6) + "/block-" + padIndex(part_info.left),
*zookeeper);
}
}
}