diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index 0eaa1f891cb..506b6c8a40c 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -9,10 +9,18 @@ namespace DB class CompressedReadBuffer : public CompressedReadBufferBase, public BufferWithOwnMemory { private: + size_t size_compressed = 0; + + size_t currentBlockCompressedSize() const + { + return size_compressed; + } + bool nextImpl() { size_t size_decompressed; - if (!readCompressedData(size_decompressed)) + size_compressed = readCompressedData(size_decompressed); + if (!size_compressed) return false; memory.resize(size_decompressed); @@ -68,6 +76,12 @@ public: return bytes_read; } + + /// Сжатый размер текущего блока. + size_t getSizeCompressed() const + { + return size_compressed; + } }; } diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h index 3f8b51624a8..2a0a314e4ea 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreePartChecker.h @@ -15,8 +15,10 @@ public: * - Проверяет правильность засечек. * Бросает исключение, если кусок испорчен или если проверить не получилось (TODO: можно попробовать разделить эти случаи). * Если strict, требует, чтобы для всех столбцов из columns.txt были файлы, и чтобы засечки не указывали в конец сжатого блока. + * Если verbose, пишет в stderr прогресс и ошибки, и не останавливается при первой ошибке. */ - static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory); + static void checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, + bool verbose = false); }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 3ae909845d8..37bd5601a5b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -28,10 +28,7 @@ struct Stream Stream(const String & path_, const String & name_, DataTypePtr type_) : type(type_), path(path_), name(name_), file_buf(path + name + ".bin"), compressed_hashing_buf(file_buf), uncompressing_buf(compressed_hashing_buf), - uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf) - { - std::cerr << "hi: " << compressed_hashing_buf.count() << ' ' << uncompressed_hashing_buf.offset() << std::endl; - } + uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf) {} bool marksEOF() { @@ -108,20 +105,23 @@ struct Stream MarkInCompressedFile data_mark; - if (!strict) + if (uncompressed_hashing_buf.position() == uncompressed_hashing_buf.buffer().end()) { - /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, - /// и на начало следующего. - data_mark.offset_in_compressed_file = compressed_hashing_buf.count(); - data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); + if (!strict) + { + /// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока, + /// и на начало следующего. + data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed(); + data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); - if (mrk_mark == data_mark) - return; + if (mrk_mark == data_mark) + return; + } + + uncompressed_hashing_buf.next(); } - uncompressed_hashing_buf.nextIfAtEnd(); - - data_mark.offset_in_compressed_file = compressed_hashing_buf.count(); + data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed(); data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); if (mrk_mark != data_mark) @@ -221,7 +221,8 @@ static size_t checkColumn(const String & path, const String & name, DataTypePtr } } -void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory) +void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, bool strict, const DataTypeFactory & data_type_factory, + bool verbose) { if (!path.empty() && *path.rbegin() != '/') path += "/"; @@ -235,7 +236,7 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, assertEOF(buf); } - if (Poco::File(path + "checksums.txt").exists()) + if (strict || Poco::File(path + "checksums.txt").exists()) { ReadBufferFromFile buf(path + "checksums.txt"); checksums_txt.readText(buf); @@ -243,33 +244,82 @@ void MergeTreePartChecker::checkDataPart(String path, size_t index_granularity, } MergeTreeData::DataPart::Checksums checksums_data; + size_t primary_idx_size; + + { + ReadBufferFromFile file_buf(path + "primary.idx"); + HashingReadBuffer hashing_buf(file_buf); + primary_idx_size = hashing_buf.tryIgnore(std::numeric_limits::max()); + checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash()); + } bool first = true; size_t rows = 0; + ExceptionPtr first_exception; + for (const NameAndTypePair & column : columns) { - if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists()) - continue; - - size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data); - if (first) + if (verbose) { - rows = cur_rows; - first = false; - } - else if (rows != cur_rows) - { - throw Exception("Different number of rows in columns " + columns.begin()->name + " and " + column.name, - ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + std::cerr << column.name << ":"; + std::cerr.flush(); } - std::cerr << "column " << column.name << " ok" << std::endl; + bool ok = false; + try + { + if (!strict && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists()) + { + if (verbose) + std::cerr << " no files" << std::endl; + continue; + } + + size_t cur_rows = checkColumn(path, column.name, column.type, index_granularity, strict, checksums_data); + if (first) + { + rows = cur_rows; + first = false; + } + else if (rows != cur_rows) + { + throw Exception("Different number of rows in columns " + columns.begin()->name + " and " + column.name, + ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + } + + ok = true; + } + catch (...) + { + if (!verbose) + throw; + ExceptionPtr e = cloneCurrentException(); + if (!first_exception) + first_exception = e; + + std::cerr << " exception" << std::endl; + std::cerr << "Code: " << e->code() << ", e.displayText() = " << e->displayText() << ", e.what() = " << e->what() << std::endl; + if (auto dbe = dynamic_cast(&*e)) + std::cerr << "Stack trace:\n\n" << dbe->getStackTrace().toString() << std::endl; + std::cerr << std::endl; + } + + if (verbose && ok) + std::cerr << " ok" << std::endl; } if (first) throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED); - checksums_txt.checkEqual(checksums_data, true); + if (primary_idx_size % ((rows - 1) / index_granularity + 1)) + throw Exception("primary.idx size (" + toString(primary_idx_size) + ") not divisible by number of marks (" + + toString(rows) + "/" + toString(index_granularity) + " rounded up)", ErrorCodes::CORRUPTED_DATA); + + if (strict || !checksums_txt.files.empty()) + checksums_txt.checkEqual(checksums_data, true); + + if (first_exception) + first_exception->rethrow(); } } diff --git a/dbms/src/Storages/tests/part_checker.cpp b/dbms/src/Storages/tests/part_checker.cpp index de5b103f639..9293ac253e9 100644 --- a/dbms/src/Storages/tests/part_checker.cpp +++ b/dbms/src/Storages/tests/part_checker.cpp @@ -14,7 +14,8 @@ int main(int argc, char ** argv) try { - DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 3 ? DB::parse(argv[2]) : 8192ul, argv[2][0] == '1', DB::DataTypeFactory()); + DB::MergeTreePartChecker::checkDataPart(argv[1], argc == 4 ? DB::parse(argv[3]) : 8192ul, argv[2][0] == '1', + DB::DataTypeFactory(), true); } catch (...) {