From 2afa1590e09ce7cc0c9f94843bd71c82f04f804a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Sat, 17 Apr 2021 04:06:59 +0300 Subject: [PATCH] ColumnSparse: fix MergeTree in old syntax --- src/Columns/ColumnSparse.cpp | 9 +++++++++ src/Columns/ColumnSparse.h | 5 ----- .../Serializations/SerializationSparse.cpp | 3 ++- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 18 +++++++++++++++--- .../MergeTree/MergedBlockOutputStream.cpp | 4 +++- 5 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/Columns/ColumnSparse.cpp b/src/Columns/ColumnSparse.cpp index f952cd0565b..890bae741f7 100644 --- a/src/Columns/ColumnSparse.cpp +++ b/src/Columns/ColumnSparse.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace DB { @@ -41,6 +43,13 @@ ColumnSparse::ColumnSparse(MutableColumnPtr && values_, MutableColumnPtr && offs if (_size < offsets->size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of sparse column ({}) cannot be lower than number of non-default values ({})", _size, offsets->size()); + +#ifndef NDEBUG + const auto & offsets_data = getOffsetsData(); + auto it = std::adjacent_find(offsets_data.begin(), offsets_data.end(), std::greater_equal()); + if (it != offsets_data.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Offsets of ColumnSparse must be strictly sorted"); +#endif } MutableColumnPtr ColumnSparse::cloneResized(size_t new_size) const diff --git a/src/Columns/ColumnSparse.h b/src/Columns/ColumnSparse.h index 3bd0741df4c..591011665bb 100644 --- a/src/Columns/ColumnSparse.h +++ b/src/Columns/ColumnSparse.h @@ -157,11 +157,6 @@ public: IColumn & getOffsetsColumn() { return *offsets; } private: - [[noreturn]] void throwMustBeDense() const - { - throw Exception("Not implemented for ColumnSparse", ErrorCodes::LOGICAL_ERROR); - } - class Iterator { public: diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index a160d23c4a1..dd39d51b409 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -55,10 +55,12 @@ size_t deserializeOffsets(IColumn::Offsets & offsets, /// TODO: offsets.reserve(limit / 10); + bool first = true; size_t total_rows = state.num_trailing_defaults; if (state.has_value_after_defaults) { offsets.push_back(start + state.num_trailing_defaults); + first = false; state.has_value_after_defaults = false; state.num_trailing_defaults = 0; @@ -66,7 +68,6 @@ size_t deserializeOffsets(IColumn::Offsets & offsets, } size_t group_size; - bool first = true; while (!istr.eof()) { readIntBinary(group_size, istr); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 13f53616090..8b5a49c6c3b 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -811,6 +811,14 @@ void IMergeTreeDataPart::loadChecksums(bool require) void IMergeTreeDataPart::loadRowsCount() { String path = getFullRelativePath() + "count.txt"; + + auto read_rows_count = [&]() + { + auto buf = openForReading(volume->getDisk(), path); + readIntText(rows_count, *buf); + assertEOF(*buf); + }; + if (index_granularity.empty()) { rows_count = 0; @@ -820,9 +828,7 @@ void IMergeTreeDataPart::loadRowsCount() if (!volume->getDisk()->exists(path)) throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART); - auto buf = openForReading(volume->getDisk(), path); - readIntText(rows_count, *buf); - assertEOF(*buf); + read_rows_count(); #ifndef NDEBUG /// columns have to be loaded @@ -875,6 +881,12 @@ void IMergeTreeDataPart::loadRowsCount() } else { + if (volume->getDisk()->exists(path)) + { + read_rows_count(); + return; + } + for (const NameAndTypePair & column : columns) { ColumnPtr column_col = column.type->createColumn(*getSerializationForColumn(column)); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 4cca8a3cd6c..111b0c948c3 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -115,7 +115,7 @@ void MergedBlockOutputStream::finalizePartOnDisk( out->sync(); } - if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part)) + if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { new_part->partition.store(storage, volume->getDisk(), part_path, checksums); if (new_part->minmax_idx.initialized) @@ -123,7 +123,9 @@ void MergedBlockOutputStream::finalizePartOnDisk( else if (rows_count) throw Exception("MinMax index was not initialized for new non-empty part " + new_part->name + ". It is a bug.", ErrorCodes::LOGICAL_ERROR); + } + { auto count_out = volume->getDisk()->writeFile(part_path + "count.txt", 4096); HashingWriteBuffer count_out_hashing(*count_out); writeIntText(rows_count, count_out_hashing);