This commit is contained in:
Michael Kolupaev 2014-04-14 17:08:26 +04:00
parent d0452204ad
commit 68049a0162
5 changed files with 126 additions and 39 deletions

View File

@ -125,15 +125,26 @@ public:
{
struct Checksum
{
size_t size;
uint128 hash;
size_t file_size;
uint128 file_hash;
bool is_compressed = false;
size_t uncompressed_size;
uint128 uncompressed_hash;
void checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const;
void checkSize(const String & path) const;
};
typedef std::map<String, Checksum> FileChecksums;
FileChecksums files;
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
void check(const Checksums & rhs) const;
/// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов.
void checkEqual(const Checksums & rhs, bool have_uncompressed) const;
/// Проверяет, что в директории есть все нужные файлы правильных размеров. Не проверяет чексуммы.
void checkSizes(const String & path) const;
/// Сериализует и десериализует в человекочитаемом виде.
void readText(ReadBuffer & in);

View File

@ -26,15 +26,19 @@ protected:
ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path, size_t max_compress_block_size = DEFAULT_MAX_COMPRESS_BLOCK_SIZE) :
escaped_column_name(escaped_column_name_),
plain_file(data_path, max_compress_block_size, O_TRUNC | O_CREAT | O_WRONLY),
compressed_buf(plain_file),
marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY),
compressed(compressed_buf), marks(marks_file) {}
plain_hashing(plain_file), compressed_buf(plain_hashing), compressed(compressed_buf),
marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY), marks(marks_file) {}
String escaped_column_name;
/// compressed -> compressed_buf -> plain_hashing -> plain_file
WriteBufferFromFile plain_file;
HashingWriteBuffer plain_hashing;
CompressedWriteBuffer compressed_buf;
WriteBufferFromFile marks_file;
HashingWriteBuffer compressed;
/// marks -> marks_file
WriteBufferFromFile marks_file;
HashingWriteBuffer marks;
void finalize()
@ -54,10 +58,15 @@ protected:
{
if (name == "")
name = escaped_column_name;
checksums.files[name + ".bin"].size = compressed.count();
checksums.files[name + ".bin"].hash = compressed.getHash();
checksums.files[name + ".mrk"].size = marks.count();
checksums.files[name + ".mrk"].hash = marks.getHash();
checksums.files[name + ".bin"].is_compressed = true;
checksums.files[name + ".bin"].uncompressed_size = compressed.count();
checksums.files[name + ".bin"].uncompressed_hash = compressed.getHash();
checksums.files[name + ".bin"].file_size = plain_hashing.count();
checksums.files[name + ".bin"].file_hash = plain_hashing.getHash();
checksums.files[name + ".mrk"].file_size = marks.count();
checksums.files[name + ".mrk"].file_hash = marks.getHash();
}
};
@ -126,7 +135,7 @@ protected:
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain_file.count(), stream.marks);
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
@ -157,7 +166,7 @@ protected:
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain_file.count(), stream.marks);
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
@ -251,8 +260,8 @@ public:
MergeTreeData::DataPart::Checksums checksums;
index_stream->next();
checksums.files["primary.idx"].size = index_stream->count();
checksums.files["primary.idx"].hash = index_stream->getHash();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{

View File

@ -37,7 +37,7 @@ public:
/// Блок с таким ID уже когда-то вставляли. Проверим чексуммы и не будем его вставлять.
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
expected_checksums.check(part->checksums);
expected_checksums.checkEqual(part->checksums, true);
part->remove();

View File

@ -427,7 +427,7 @@ static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesLis
}
/// одинаковыми считаются имена, вида "name.*"
static bool namesWithDotEqual(const String & name_with_dot, const DB::NameAndTypePair & name_type)
static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePair & name_type)
{
return (name_with_dot == name_type.first.substr(0, name_with_dot.length()));
}
@ -492,7 +492,7 @@ void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
try
{
while(DB::Block b = in.read())
while(Block b = in.read())
out.write(b);
in.readSuffix();
@ -793,7 +793,36 @@ MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_
}
void MergeTreeData::DataPart::Checksums::check(const Checksums & rhs) const
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
if (is_compressed && have_uncompressed)
{
if (!rhs.is_compressed)
throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
if (rhs.uncompressed_size != uncompressed_size)
throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (rhs.uncompressed_hash != uncompressed_hash)
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
return;
}
if (rhs.file_size != file_size)
throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (rhs.file_hash != file_hash)
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
Poco::File file(path);
if (!file.exists())
throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
size_t size = file.getSize();
if (size != file_size)
throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
{
for (const auto & it : rhs.files)
{
@ -811,14 +840,16 @@ void MergeTreeData::DataPart::Checksums::check(const Checksums & rhs) const
if (jt == rhs.files.end())
throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);
const Checksum & expected = it.second;
const Checksum & found = jt->second;
it.second.checkEqual(jt->second, have_uncompressed, name);
}
}
if (expected.size != found.size)
throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
if (expected.hash != found.hash)
throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
for (const auto & it : files)
{
const String & name = it.first;
it.second.checkSize(path + name);
}
}
@ -827,7 +858,12 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
files.clear();
size_t count;
DB::assertString("checksums format version: 1\n", in);
DB::assertString("checksums format version: ", in);
int format_version;
DB::readText(format_version, in);
if (format_version < 1 || format_version > 2)
throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
DB::assertString("\n",in);
DB::readText(count, in);
DB::assertString(" files:\n", in);
@ -838,12 +874,27 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
DB::readString(name, in);
DB::assertString("\n\tsize: ", in);
DB::readText(sum.size, in);
DB::readText(sum.file_size, in);
DB::assertString("\n\thash: ", in);
DB::readText(sum.hash.first, in);
DB::readText(sum.file_hash.first, in);
DB::assertString(" ", in);
DB::readText(sum.hash.second, in);
DB::readText(sum.file_hash.second, in);
DB::assertString("\n", in);
if (format_version == 2)
{
DB::assertString("\tcompressed: ", in);
DB::readText(sum.is_compressed, in);
if (sum.is_compressed)
{
DB::assertString("\n\tuncompressed size: ", in);
DB::readText(sum.uncompressed_size, in);
DB::assertString("\n\tuncompressed hash: ", in);
DB::readText(sum.uncompressed_hash.first, in);
DB::assertString(" ", in);
DB::readText(sum.uncompressed_hash.second, in);
}
DB::assertString("\n", in);
}
files.insert(std::make_pair(name, sum));
}
@ -851,20 +902,34 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
void MergeTreeData::DataPart::Checksums::writeText(WriteBuffer & out) const
{
DB::writeString("checksums format version: 1\n", out);
DB::writeString("checksums format version: 2\n", out);
DB::writeText(files.size(), out);
DB::writeString(" files:\n", out);
for (const auto & it : files)
{
DB::writeString(it.first, out);
const String & name = it.first;
const Checksum & sum = it.second;
DB::writeString(name, out);
DB::writeString("\n\tsize: ", out);
DB::writeText(it.second.size, out);
DB::writeText(sum.file_size, out);
DB::writeString("\n\thash: ", out);
DB::writeText(it.second.hash.first, out);
DB::writeText(sum.file_hash.first, out);
DB::writeString(" ", out);
DB::writeText(it.second.hash.second, out);
DB::writeText(sum.file_hash.second, out);
DB::writeString("\n\tcompressed: ", out);
DB::writeText(sum.is_compressed, out);
DB::writeString("\n", out);
if (sum.is_compressed)
{
DB::writeString("\tuncompressed size: ", out);
DB::writeText(sum.uncompressed_size, out);
DB::writeString("\n\tuncompressed hash: ", out);
DB::writeText(sum.uncompressed_hash.first, out);
DB::writeString(" ", out);
DB::writeText(sum.uncompressed_hash.second, out);
DB::writeString("\n", out);
}
}
}

View File

@ -298,10 +298,12 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
String another_replica = findReplicaHavingPart(part->name, false);
if (!another_replica.empty())
{
String checksums_str =
zookeeper.get(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums");
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.check(part->checksums);
String checksums_str;
if (zookeeper.tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
{
auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
checksums.checkEqual(part->checksums, true);
}
}
ops.push_back(new zkutil::Op::Create(