This commit is contained in:
Michael Kolupaev 2014-06-27 21:21:31 +04:00
parent 739f93e781
commit 429b5f63b2
4 changed files with 45 additions and 17 deletions

View File

@ -251,6 +251,7 @@ namespace ErrorCodes
MEMORY_LIMIT_EXCEEDED,
TABLE_IS_READ_ONLY,
NOT_ENOUGH_SPACE,
UNEXPECTED_ZOOKEEPER_ERROR,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -428,6 +428,10 @@ public:
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Убрать кусок из рабочего набора. Его данные удалятся при вызове clearOldParts, когда их перестанут читать.
*/
void deletePart(DataPartPtr part);
/** Удалить неактуальные куски. Возвращает имена удаленных кусков.
*/
Strings clearOldParts();

View File

@ -45,22 +45,6 @@ public:
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id);
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it");
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
expected_checksums.checkEqual(part->checksums, true);
part->remove();
/// Бросаем block_number_lock.
continue;
}
storage.data.renameTempPartAndAdd(part);
StorageReplicatedMergeTree::LogEntry log_entry;
@ -96,7 +80,40 @@ public:
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
storage.zookeeper->multi(ops);
auto code = storage.zookeeper->tryMulti(ops);
if (code != ZOK)
{
if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, не будем его вставлять, и удалим только что записанные данные.
/// NOTE: В короткое время между renameTempPartAndAdd и deletePart в таблице на этой реплике доступны
/// продублированные данные.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
auto found_checksums = part->checksums;
storage.data.deletePart(part);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
}
}

View File

@ -678,6 +678,12 @@ void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
part->renameAddPrefix(prefix);
}
void MergeTreeData::deletePart(DataPartPtr part)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
data_parts.erase(part);
}
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);