From 2a5abb17bedabbaf806bc0ecf7f227294214a34b Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Fri, 19 Aug 2016 15:01:10 +0300 Subject: [PATCH] dbms: cleanup [#METR-19266] --- .../MergeTree/MergeTreePartChecker.cpp | 98 ++++++++++++++----- 1 file changed, 76 insertions(+), 22 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index 6eda6fa2f33..32397b24843 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -31,8 +31,9 @@ namespace { constexpr auto DATA_FILE_EXTENSION = ".bin"; -constexpr auto NULL_MAP_EXTENSION = ".null"; +constexpr auto NULL_MAP_FILE_EXTENSION = ".null"; constexpr auto MARKS_FILE_EXTENSION = ".mrk"; +constexpr auto NULL_MARKS_FILE_EXTENSION = ".null_mrk"; struct Stream { @@ -150,8 +151,44 @@ public: HashingReadBuffer mrk_hashing_buf; }; -/// Returns the number of rows. Updates the "checksums" variable with the checksum of -/// each column's bin file. +/// Updates the checksum value for the null map information of the +/// specified column. Returns the number of read rows. +size_t checkNullMap(const String & path, + const String & name, + const MergeTreePartChecker::Settings & settings, + MergeTreeData::DataPart::Checksums & checksums, + std::atomic * is_cancelled) +{ + size_t rows = 0; + + DataTypePtr type = std::make_shared(); + Stream data_stream(path, escapeForFileName(name), type, + NULL_MAP_FILE_EXTENSION, NULL_MARKS_FILE_EXTENSION); + + while (true) + { + if (is_cancelled && *is_cancelled) + return 0; + + if (data_stream.marksEOF()) + break; + + data_stream.assertMark(); + + size_t cur_rows = data_stream.read(settings.index_granularity); + + rows += cur_rows; + if (cur_rows < settings.index_granularity) + break; + } + + data_stream.assertEnd(checksums); + + return rows; +} + +/// Updates the checksum value for the specified column. +/// Returns the number of read rows. size_t checkColumn( const String & path, const String & name, @@ -164,13 +201,7 @@ size_t checkColumn( try { - if (type->isNullable()) - { - const auto & nullable_type = static_cast(*type); - auto nested_type = nullable_type.getNestedType(); - return checkColumn(path, name, nested_type, settings, checksums, is_cancelled); - } - else if (auto array = typeid_cast(type.get())) + if (auto array = typeid_cast(type.get())) { String sizes_name = DataTypeNested::extractNestedTableName(name); Stream sizes_stream(path, escapeForFileName(sizes_name) + ".size0", std::make_shared(), @@ -218,7 +249,6 @@ size_t checkColumn( Stream data_stream(path, escapeForFileName(name), type, DATA_FILE_EXTENSION, MARKS_FILE_EXTENSION); - size_t rows = 0; while (true) { if (is_cancelled && *is_cancelled) @@ -327,6 +357,19 @@ void MergeTreePartChecker::checkDataPart( size_t rows = UNKNOWN; std::exception_ptr first_exception; + /// Verify that the number of rows is consistent between all the columns. + auto check_row_count = [&rows, &any_column_name](size_t cur_rows, const std::string & col_name) + { + if (rows == UNKNOWN) + { + rows = cur_rows; + any_column_name = col_name; + } + else if (rows != cur_rows) + throw Exception{"Different number of rows in columns " + any_column_name + " and " + col_name, + ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH}; + }; + for (const NameAndTypePair & column : columns) { if (settings.verbose) @@ -345,21 +388,32 @@ void MergeTreePartChecker::checkDataPart( continue; } - size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data, is_cancelled); + const DataTypePtr * observed_type; + + /// If the current column is nullable, first we process its null map and the + /// corresponding marks. + if (column.type->isNullable()) + { + const auto & nullable_type = static_cast(column.type); + observed_type = &nullable_type.getNestedType(); + + size_t cur_rows = checkNullMap(path, column.name, settings, checksums_data, is_cancelled); + + if (is_cancelled && *is_cancelled) + return; + + check_row_count(cur_rows, column.name); + } + else + observed_type = &column.type; + + /// Update the checksum from the data of the column. + size_t cur_rows = checkColumn(path, column.name, *observed_type, settings, checksums_data, is_cancelled); if (is_cancelled && *is_cancelled) return; - if (rows == UNKNOWN) - { - rows = cur_rows; - any_column_name = column.name; - } - else if (rows != cur_rows) - { - throw Exception("Different number of rows in columns " + any_column_name + " and " + column.name, - ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); - } + check_row_count(cur_rows, column.name); ok = true; }