part_checker: fixes. [#METR-11980]

This commit is contained in:
Michael Kolupaev 2014-07-22 14:35:24 +04:00
parent cb7aa9dd66
commit 1be80d3410
4 changed files with 100 additions and 33 deletions

View File

@ -9,10 +9,18 @@ namespace DB
class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
{
private:
size_t size_compressed = 0;
size_t currentBlockCompressedSize() const
{
return size_compressed;
}
bool nextImpl()
{
size_t size_decompressed;
if (!readCompressedData(size_decompressed))
size_compressed = readCompressedData(size_decompressed);
if (!size_compressed)
return false;
memory.resize(size_decompressed);
@ -68,6 +76,12 @@ public:
return bytes_read;
}
/// Сжатый размер текущего блока.
size_t getSizeCompressed() const
{
return size_compressed;
}
};
}

View File

@ -15,8 +15,10 @@ public:
* - Проверяет правильность засечек.
* Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи).
* Если strict, требует, чтобы для всех столбцов из columns.txt были файлы, и чтобы засечки не указывали в конец сжатого блока.
* Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке.
*/
static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory);
static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory,
bool verbose = false);
};
}

View File

@ -28,10 +28,7 @@ struct Stream
Stream(const String & path_, const String & name_, DataTypePtr type_) : type(type_), path(path_), name(name_),
file_buf(path + name + ".bin"), compressed_hashing_buf(file_buf), uncompressing_buf(compressed_hashing_buf),
uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf)
{
std::cerr << "hi: " << compressed_hashing_buf.count() << ' ' << uncompressed_hashing_buf.offset() << std::endl;
}
uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf) {}
bool marksEOF()
{
@ -108,20 +105,23 @@ struct Stream
MarkInCompressedFile data_mark;
if (!strict)
if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end())
{
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
data_mark.offset_in_compressed_file = compressed_hashing_buf.count();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (!strict)
{
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark == data_mark)
return;
if (mrk_mark == data_mark)
return;
}
uncompressed_hashing_buf.next();
}
uncompressed_hashing_buf.nextIfAtEnd();
data_mark.offset_in_compressed_file = compressed_hashing_buf.count();
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark != data_mark)
@ -221,7 +221,8 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr
}
}
void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory)
void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory,
bool verbose)
{
if (!path.empty() && *path.rbegin() != '/')
path += "/";
@ -235,7 +236,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
assertEOF(buf);
}
if (Poco::File(path + "checksums.txt").exists())
if (strict || Poco::File(path + "checksums.txt").exists())
{
ReadBufferFromFile buf(path + "checksums.txt");
checksums_txt.readText(buf);
@ -243,33 +244,82 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity,
}
MergeTreeData::DataPart::Checksums checksums_data;
size_t primary_idx_size;
{
ReadBufferFromFile file_buf(path + "primary.idx");
HashingReadBuffer hashing_buf(file_buf);
primary_idx_size = hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash());
}
bool first = true;
size_t rows = 0;
ExceptionPtr first_exception;
for (const NameAndTypePair & column : columns)
{
if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
continue;
size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data);
if (first)
if (verbose)
{
rows = cur_rows;
first = false;
}
else if (rows != cur_rows)
{
throw Exception("Different number of rows in columns " + columns.begin()->name + " and " + column.name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
std::cerr << column.name << ":";
std::cerr.flush();
}
std::cerr << "column " << column.name << " ok" << std::endl;
bool ok = false;
try
{
if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
{
if (verbose)
std::cerr << " no files" << std::endl;
continue;
}
size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data);
if (first)
{
rows = cur_rows;
first = false;
}
else if (rows != cur_rows)
{
throw Exception("Different number of rows in columns " + columns.begin()->name + " and " + column.name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
ok = true;
}
catch (...)
{
if (!verbose)
throw;
ExceptionPtr e = cloneCurrentException();
if (!first_exception)
first_exception = e;
std::cerr << " exception" << std::endl;
std::cerr << "Code: " << e->code() << ", e.displayText() = " << e->displayText() << ", e.what() = " << e->what() << std::endl;
if (auto dbe = dynamic_cast<Exception *>(&*e))
std::cerr << "Stack trace:\n\n" << dbe->getStackTrace().toString() << std::endl;
std::cerr << std::endl;
}
if (verbose && ok)
std::cerr << " ok" << std::endl;
}
if (first)
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
checksums_txt.checkEqual(checksums_data, true);
if (primary_idx_size % ((rows - 1) / index_granularity + 1))
throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks ("
+ toString(rows) + "/" + toString(index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA);
if (strict || !checksums_txt.files.empty())
checksums_txt.checkEqual(checksums_data, true);
if (first_exception)
first_exception->rethrow();
}
}

View File

@ -14,7 +14,8 @@ int main(int argc, char ** argv)
try
{
DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 3 ? DB::parse<size_t>(argv[2]) : 8192ul, argv[2][0] == '1', DB::DataTypeFactory());
DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 4 ? DB::parse<size_t>(argv[3]) : 8192ul, argv[2][0] == '1',
DB::DataTypeFactory(), true);
}
catch (...)
{