This commit is contained in:
Michael Kolupaev 2014-07-23 15:02:22 +04:00
parent bfe126ab04
commit 8b18acffec
2 changed files with 58 additions and 64 deletions

View File

@ -96,11 +96,6 @@ public:
return UNLOCKED; return UNLOCKED;
} }
static void createAbandonedIfNotExists(const String & path, zkutil::ZooKeeper & zookeeper)
{
zookeeper.createIfNotExists(path, "");
}
private: private:
zkutil::ZooKeeper & zookeeper; zkutil::ZooKeeper & zookeeper;
String path_prefix; String path_prefix;

View File

@ -1340,72 +1340,71 @@ void StorageReplicatedMergeTree::partCheckThread()
/// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то. /// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то.
else else
{ {
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << "."); ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
bool found = false; /** Будем проверять только куски, не полученные в результате слияния.
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas"); * Для кусков, полученных в результате слияния такая проверка была бы некорректной,
for (const String & replica : replicas) * потому что слитого куска может еще ни у кого не быть.
*/
if (part_info.left == part_info.right)
{ {
Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts"); LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
for (const String & part_on_replica : parts)
bool found = false;
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
for (const String & replica : replicas)
{ {
if (ActiveDataPartSet::contains(part_on_replica, part_name)) Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
for (const String & part_on_replica : parts)
{ {
found = true; if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
}
}
if (found)
break;
}
if (!found)
{
/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
* получил operation timeout, удалил локальный кусок и бросил исключение,
* а на самом деле, кусок добавился в ZK.
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если ни у кого нет такого куска, удалим его из нашей очереди и добавим его в block_numbers.
String month_name = part_name.substr(0, 6);
zookeeper->createIfNotExists(zookeeper_path + "/block_numbers/" + month_name, "");
ActiveDataPartSet::Part part_info;
ActiveDataPartSet::parsePartName(part_name, part_info);
if (part_info.left != part_info.right)
LOG_ERROR(log, "Lost part " << part_name << " is a result of a merge. "
"This means some data is definitely lost (or there's a bug).");
for (size_t index = part_info.left; index <= part_info.right; ++index)
{
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(index), *zookeeper);
}
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/** NOTE: Не удалятся записи в очереди, которые сейчас выполняются.
* Они пофейлятся и положат кусок снова в очередь на проверку.
* Расчитываем, что это редкая ситуация.
*/
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{
if (it->new_part_name == part_name)
{ {
zookeeper->remove(replica_path + "/queue/" + it->znode_name); found = true;
queue.erase(it++); LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
break;
} }
else }
if (found)
break;
}
if (!found)
{
/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
* получил operation timeout, удалил локальный кусок и бросил исключение,
* а на самом деле, кусок добавился в ZK.
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/** Если ни у кого нет такого куска, удалим его из нашей очереди.
*
* Еще можно было бы добавить его в block_numbers, чтобы он не мешал слияниям,
* но если так сделать, ZooKeeper почему-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка. TODO: можно придумать workaround.
*/
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/** NOTE: Не удалятся записи в очереди, которые сейчас выполняются.
* Они пофейлятся и положат кусок снова в очередь на проверку.
* Расчитываем, что это редкая ситуация.
*/
for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
{ {
++it; if (it->new_part_name == part_name)
{
zookeeper->remove(replica_path + "/queue/" + it->znode_name);
queue.erase(it++);
}
else
{
++it;
}
} }
} }
} }