dbms: lowered data size in ZK [#METR-18170].

This commit is contained in:
Alexey Milovidov 2015-09-24 08:47:17 +03:00
parent 6fc99c48e1
commit 3d817fe625
4 changed files with 37 additions and 29 deletions

View File

@ -131,10 +131,8 @@ public:
}
/// Контрольная сумма от множества контрольных сумм .bin файлов.
String summaryDataChecksum() const
void summaryDataChecksum(SipHash & hash) const
{
SipHash hash;
/// Пользуемся тем, что итерирование в детерминированном (лексикографическом) порядке.
for (const auto & it : files)
{
@ -148,10 +146,6 @@ public:
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_size), sizeof(sum.uncompressed_size));
hash.update(reinterpret_cast<const char *>(&sum.uncompressed_hash), sizeof(sum.uncompressed_hash));
}
UInt64 lo, hi;
hash.get128(lo, hi);
return DB::toString(lo) + "_" + DB::toString(hi);
}
String toString() const

View File

@ -82,12 +82,24 @@ public:
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, part_number);
String part_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
/// Хэш от данных.
SipHash hash;
part->checksums.summaryDataChecksum(hash);
union
{
char bytes[16];
UInt64 lo, hi;
} hash_value;
hash.get128(hash_value.bytes);
String checksum(hash_value.bytes, 16);
/// Если в запросе не указан ID, возьмем в качестве ID хеш от данных. То есть, не вставляем одинаковые данные дважды.
/// NOTE: Если такая дедупликация не нужна, можно вместо этого оставлять block_id пустым.
/// Можно для этого сделать настройку или синтаксис в запросе (например, ID=null).
if (block_id.empty())
{
block_id = part->checksums.summaryDataChecksum();
block_id = toString(hash_value.lo) + "_" + toString(hash_value.hi);
if (block_id.empty())
throw Exception("Logical error: block_id is empty.", ErrorCodes::LOGICAL_ERROR);
@ -105,30 +117,25 @@ public:
/// Информация о блоке.
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id,
"",
zookeeper->getDefaultACL(),
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/columns",
part->columns.toString(),
zookeeper->getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums",
part->checksums.toString(),
zookeeper->getDefaultACL(),
storage.zookeeper_path + "/blocks/" + block_id + "/checksum",
checksum,
acl,
zkutil::CreateMode::Persistent));
ops.push_back(
new zkutil::Op::Create(
storage.zookeeper_path + "/blocks/" + block_id + "/number",
toString(part_number),
zookeeper->getDefaultACL(),
acl,
zkutil::CreateMode::Persistent));
/// Информация о куске, в данных реплики.
@ -138,7 +145,7 @@ public:
ops.push_back(new zkutil::Op::Create(
storage.zookeeper_path + "/log/log-",
log_entry.toString(),
zookeeper->getDefaultACL(),
acl,
zkutil::CreateMode::PersistentSequential));
/// Удаление информации о том, что номер блока используется для записи.
@ -174,7 +181,7 @@ public:
new zkutil::Op::Create(
quorum_status_path,
quorum_entry.toString(),
zookeeper->getDefaultACL(),
acl,
zkutil::CreateMode::Persistent));
}
@ -192,16 +199,20 @@ public:
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
String expected_checksum;
if (!block_id.empty() && zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
storage.zookeeper_path + "/blocks/" + block_id + "/checksum", expected_checksum))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
if (expected_checksum != checksum)
{
if (!insert_id.empty())
throw Exception("Attempt to insert block with same ID but different checksum", ErrorCodes::CHECKSUM_DOESNT_MATCH);
else
throw Exception("Logical error: got ZNODEEXISTS while inserting data, block ID is derived from checksum but checksum doesn't match", ErrorCodes::LOGICAL_ERROR);
}
transaction.rollback();
}

View File

@ -176,9 +176,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
for (size_t i = storage.data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i)
{
/// Устаревшие ноды. Этот код можно будет убрать через пол года.
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns");
zookeeper->tryRemove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums");
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksum", -1));
ops.push_back(new zkutil::Op::Remove(storage.zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
if (ops.size() > 400 || i + 1 == timed_blocks.size())

View File

@ -2809,7 +2809,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach,
/// Такого никогда не должно происходить.
if (right == 0)
return;
throw Exception("Logical error: just allocated block number is zero", ErrorCodes::LOGICAL_ERROR);
--right;
String fake_part_name = getFakePartNameForDrop(month_name, 0, right);