This commit is contained in:
Alexey Milovidov 2013-09-15 03:44:32 +00:00
parent 717a3fc591
commit 808cec6d53
2 changed files with 73 additions and 0 deletions

View File

@ -340,6 +340,9 @@ private:
/// Возвращает true если имя директории совпадает с форматом имени директории кусочков
bool isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const;
/// Если директория с куском содержит битые данные, то удаляем её и возвращаем true.
bool removeIfBroken(const String & path);
};
}

View File

@ -514,9 +514,27 @@ void StorageMergeTree::loadDataParts()
{
std::string file_name = it.name();
/// Удаляем временные директории старше суток.
if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
{
Poco::File tmp_dir(full_path + file_name);
if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
{
LOG_WARNING(log, "Removing temporary directory " << full_path << file_name);
Poco::File(full_path + file_name).remove(true);
}
continue;
}
if (!isPartDirectory(file_name, matches))
continue;
/// Удалить битые куски, которые могут образовываться после грубого перезапуска сервера.
if (removeIfBroken(full_path + file_name))
continue;
DataPartPtr part = new DataPart(*this);
part->left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
part->right_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length)));
@ -1042,4 +1060,56 @@ bool StorageMergeTree::isPartDirectory(const String & dir_name, Poco::RegularExp
return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
}
bool StorageMergeTree::removeIfBroken(const String & path)
{
/// Проверяем, что первичный ключ непуст.
Poco::File index_file(path + "/primary.idx");
if (!index_file.exists() || index_file.getSize() == 0)
{
LOG_ERROR(log, "Part " << path << " is broken: primary key is empty. Removing.");
Poco::File(path).remove(true);
return true;
}
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = -1;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
{
Poco::File marks_file(path + "/" + escapeForFileName(it->first) + ".mrk");
if (!marks_file.exists()) /// Возможно, логическая ошибка. Не будем ничего удалять.
throw Exception("File " + marks_file.path() + " doesn't exist.", ErrorCodes::FILE_DOESNT_EXIST);
if (marks_size == -1)
{
marks_size = marks_file.getSize();
if (0 == marks_size)
{
LOG_ERROR(log, "Part " << path << " is broken: " << marks_file.path() << " is empty. Removing.");
Poco::File(path).remove(true);
return true;
}
}
else
{
if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
{
LOG_ERROR(log, "Part " << path << " is broken: marks have different sizes. Removing.");
Poco::File(path).remove(true);
return true;
}
}
}
return false;
}
}