From ec9bcd80543f616b100656bdb61b1bdf8f9085c1 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Thu, 3 Nov 2016 15:00:44 +0300 Subject: [PATCH 1/6] Vertical merging algorithm for ordinary MergeTreee. --- dbms/CMakeLists.txt | 2 + dbms/include/DB/Core/SortDescription.h | 3 + .../CollapsingSortedBlockInputStream.h | 12 +- .../DB/DataStreams/ColumnGathererStream.h | 77 +++++++++ .../MergingSortedBlockInputStream.h | 9 +- .../DB/Storages/MergeTree/MergeTreeData.h | 8 +- .../Storages/MergeTree/MergeTreeDataMerger.h | 12 ++ .../DB/Storages/MergeTree/MergeTreeDataPart.h | 2 + .../MergeTree/MergedBlockOutputStream.h | 21 ++- .../CollapsingSortedBlockInputStream.cpp | 26 ++- dbms/src/DataStreams/ColumnGathererStream.cpp | 92 ++++++++++ .../MergingSortedBlockInputStream.cpp | 27 ++- .../MergeTree/MergeTreeDataMerger.cpp | 161 +++++++++++++++--- .../Storages/MergeTree/MergeTreeDataPart.cpp | 11 ++ merges_test.sh | 37 ++++ 15 files changed, 457 insertions(+), 43 deletions(-) create mode 100644 dbms/include/DB/DataStreams/ColumnGathererStream.h create mode 100644 dbms/src/DataStreams/ColumnGathererStream.cpp create mode 100755 merges_test.sh diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 6890068af72..394bf659be5 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -298,6 +298,7 @@ add_library (dbms include/DB/DataStreams/SquashingTransform.h include/DB/DataStreams/SquashingBlockInputStream.h include/DB/DataStreams/SquashingBlockOutputStream.h + include/DB/DataStreams/ColumnGathererStream.h include/DB/DataTypes/IDataType.h include/DB/DataTypes/IDataTypeDummy.h include/DB/DataTypes/DataTypeSet.h @@ -808,6 +809,7 @@ add_library (dbms src/DataStreams/SquashingTransform.cpp src/DataStreams/SquashingBlockInputStream.cpp src/DataStreams/SquashingBlockOutputStream.cpp + src/DataStreams/ColumnGathererStream.cpp src/DataTypes/DataTypeString.cpp src/DataTypes/DataTypeFixedString.cpp diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index d4fbe234e1e..43793975179 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -61,6 +61,9 @@ struct SortCursorImpl /** Есть ли хотя бы один столбец с Collator. */ bool has_collation = false; + /* Index of part from which the block was acquired */ + // UInt8 source_part_label; + SortCursorImpl() {} SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) diff --git a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h index 3dba4cb00c3..c2c270d3215 100644 --- a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h @@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream { public: CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), + const String & sign_column_, size_t max_block_size_, MergedRowSources * row_sources_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, row_sources_), sign_column(sign_column_) { } @@ -62,7 +62,7 @@ private: /// Прочитали до конца. bool finished = false; - RowRef current_key; /// Текущий первичный ключ. + RowRef current_key; /// Текущий первичный ключ. RowRef next_key; /// Первичный ключ следующей строки. RowRef first_negative; /// Первая отрицательная строка для текущего первичного ключа. @@ -77,6 +77,12 @@ private: size_t blocks_written = 0; + /// Fields specific for VERTICAL merge algorithm + size_t current_pos = 0; /// Global row number of current key + size_t first_negative_pos = 0; /// Global row number of first_negative + size_t last_positive_pos = 0; /// Global row number of last_positive + size_t last_negative_pos = 0; /// Global row number of last_negative + /** Делаем поддержку двух разных курсоров - с Collation и без. * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ diff --git a/dbms/include/DB/DataStreams/ColumnGathererStream.h b/dbms/include/DB/DataStreams/ColumnGathererStream.h new file mode 100644 index 00000000000..dd00f3fe772 --- /dev/null +++ b/dbms/include/DB/DataStreams/ColumnGathererStream.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + + +struct __attribute__((__packed__)) RowSourcePart +{ + unsigned int skip: 1; + unsigned int source_id: 7; + + RowSourcePart(unsigned source_id_, bool skip_ = false) + { + source_id = source_id_; + skip = skip_; + } + + static constexpr size_t MAX_PARTS = 127; +}; + +using MergedRowSources = PODArray; + + +/** Gather single stream from multiple streams according to streams mask. + * Stream mask maps row number to index of source stream. + */ +class ColumnGathererStream : public IProfilingBlockInputStream +{ +public: + ColumnGathererStream(const BlockInputStreams & source_streams, MergedRowSources & pos_to_source_idx_, + size_t block_size_ = DEFAULT_BLOCK_SIZE); + + String getName() const override { return "MergingSorted"; } + + String getID() const override; + + Block readImpl() override; + +private: + + MergedRowSources & pos_to_source_idx; + + /// Cache required fileds + struct Source + { + IColumn * column; + size_t pos; + size_t size; + Block block; + + Source(Block && block_) : block(std::move(block_)) + { + update(); + } + + void update() + { + column = &*block.getByPosition(0).column; + size = block.rowsInFirstColumn(); + pos = 0; + } + }; + + std::vector sources; + + size_t pos_global = 0; + size_t block_size; + + Logger * log = &Logger::get("ColumnGathererStream"); +}; + +} diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 49ad3185720..7ea2be168aa 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -9,6 +9,7 @@ #include #include +#include namespace DB @@ -57,9 +58,10 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream { public: /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. - MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0) + MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, + size_t max_block_size_, size_t limit_ = 0, MergedRowSources * row_sources_ = nullptr) : description(description_), max_block_size(max_block_size_), limit(limit_), - source_blocks(inputs_.size()), cursors(inputs_.size()) + source_blocks(inputs_.size()), cursors(inputs_.size()), row_sources(row_sources_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); } @@ -158,6 +160,9 @@ protected: using QueueWithCollation = std::priority_queue; QueueWithCollation queue_with_collation; + /// Used in VERTICAL merge algorithm + MergedRowSources * row_sources = nullptr; + /// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах. diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index e1911c09875..ffc09973378 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -55,11 +55,11 @@ namespace ErrorCodes * Структура файлов: * / min-date _ max-date _ min-id _ max-id _ level / - директория с куском. * Внутри директории с куском: - * checksums.txt - список файлов с их размерами и контрольными суммами. - * columns.txt - список столбцов с их типами. + * checksums.txt - содержит список файлов с их размерами и контрольными суммами. + * columns.txt - содержит список столбцов с их типами. * primary.idx - индексный файл. - * Column.bin - данные столбца - * Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. + * [Column].bin - данные столбца + * [Column].mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. * * Имеется несколько режимов работы, определяющих, что делать при мердже: * - Ordinary - ничего дополнительно не делать; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index 0970328d171..66e88d0ef1f 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -120,6 +120,18 @@ public: bool isCancelled() const { return cancelled > 0; } +protected: + + enum class MergeAlg + { + BASIC, + VERTICAL + }; + + MergeAlg chooseMergingAlg( + const MergeTreeData::DataPartsVector & parts, const Names & all_column_names, const SortDescription & sort_desc, + size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const; + private: MergeTreeData & data; const BackgroundProcessingPool & pool; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h index 32fd1ea4963..2e6780d8bda 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h @@ -46,6 +46,8 @@ struct MergeTreeDataPartChecksums void addFile(const String & file_name, size_t file_size, uint128 file_hash); + void add(MergeTreeDataPartChecksums && rhs_checksums); + /// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение. /// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов. void checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const; diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index 17b22048ad0..ab9a20d732a 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -33,6 +33,7 @@ public: { } + protected: using OffsetColumns = std::set; @@ -306,11 +307,16 @@ public: throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED); } - MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums() + MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums( + const NamesAndTypesList & total_column_list, + MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr) { /// Заканчиваем запись и достаем чексуммы. MergeTreeData::DataPart::Checksums checksums; + if (additional_column_checksums) + checksums = std::move(*additional_column_checksums); + if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { index_stream->next(); @@ -319,10 +325,10 @@ public: index_stream = nullptr; } - for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) + for (auto & column_stream : column_streams) { - it->second->finalize(); - it->second->addToChecksums(checksums); + column_stream.second->finalize(); + column_stream.second->addToChecksums(checksums); } column_streams.clear(); @@ -338,7 +344,7 @@ public: { /// Записываем файл с описанием столбцов. WriteBufferFromFile out(part_path + "columns.txt", 4096); - columns_list.writeText(out); + total_column_list.writeText(out); } { @@ -350,6 +356,11 @@ public: return checksums; } + MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums() + { + return writeSuffixAndGetChecksums(columns_list, nullptr); + } + MergeTreeData::DataPart::Index & getIndex() { return index_columns; diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index b3d80677fe3..4cc8ca61d5a 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -55,6 +55,12 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); + + if (row_sources) + { + row_sources->data()[last_positive_pos].skip = false; + row_sources->data()[last_negative_pos].skip = false; + } } return; } @@ -64,6 +70,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num); + + if (row_sources) + row_sources->data()[first_negative_pos].skip = false; } if (count_positive >= count_negative) @@ -71,6 +80,9 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); + + if (row_sources) + row_sources->data()[last_positive_pos].skip = false; } if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) @@ -123,7 +135,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s size_t merged_rows = 0; /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size - while (!queue.empty()) + for (; !queue.empty(); ++current_pos) { TSortCursor current = queue.top(); @@ -149,6 +161,10 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s queue.pop(); + /// Initially, skip all rows. On insert, unskip "corner" rows. + if (row_sources) + row_sources->push_back(RowSourcePart(current.impl->order, true)); + if (key_differs) { /// Запишем данные для предыдущего первичного ключа. @@ -166,13 +182,21 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s last_is_positive = true; setRowRef(last_positive, current); + last_positive_pos = current_pos; } else if (sign == -1) { if (!count_negative) + { setRowRef(first_negative, current); + first_negative_pos = current_pos; + } + if (!blocks_written && !merged_rows) + { setRowRef(last_negative, current); + last_negative_pos = current_pos; + } ++count_negative; last_is_positive = false; diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp new file mode 100644 index 00000000000..2260734d967 --- /dev/null +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -0,0 +1,92 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INCOMPATIBLE_COLUMNS; +} + +ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, MergedRowSources & pos_to_source_idx_, size_t block_size_) +: pos_to_source_idx(pos_to_source_idx_), block_size(block_size_) +{ + children.assign(source_streams.begin(), source_streams.end()); + + sources.reserve(children.size()); + for (size_t i = 0; i < children.size(); i++) + { + sources.emplace_back(children[i]->read()); + + Block & block = sources.back().block; + if (block.columns() > 1 || !block.getByPosition(0).column || + block.getByPosition(0).type->getName() != sources[0].block.getByPosition(0).type->getName()) + throw Exception("Column formats don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); + } +} + + +String ColumnGathererStream::getID() const +{ + std::stringstream res; + + res << getName() << "("; + for (size_t i = 0; i < children.size(); i++) + res << (i == 0 ? "" : ", " ) << children[i]->getID(); + res << ")"; + + return res.str(); +} + + +Block ColumnGathererStream::readImpl() +{ + if (children.size() == 1) + return children[0]->read(); + + if (children.size() == 0 || pos_global >= pos_to_source_idx.size()) + return Block(); + + Block block_res = sources[0].block.cloneEmpty(); + IColumn * column_res = &*block_res.getByPosition(0).column; + + size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size()); + column_res->reserve(pos_finish - pos_global); + + for (size_t pos = pos_global; pos < pos_finish; ++pos) + { + auto source_id = pos_to_source_idx[pos].source_id; + bool skip = pos_to_source_idx[pos].skip; + Source & source = sources[source_id]; + + if (source.pos >= source.size) /// Fetch new block + { + try + { + source.block = children[source_id]->read(); + source.update(); + } + catch (...) + { + source.size = 0; + } + + if (0 == source.size) + { + throw Exception("Can't fetch required block from " + children[source_id]->getID() + " (i. e. source " + std::to_string(source_id) +")", + ErrorCodes::LOGICAL_ERROR); + } + } + + if (!skip) + column_res->insertFrom(*source.column, source.pos); + ++source.pos; + } + + pos_global = pos_finish; + + return block_res; +} + +} diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 9a8a641c7cd..3cdb93bde48 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -195,13 +195,10 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs return; } - size_t source_num = 0; - size_t size = cursors.size(); - for (; source_num < size; ++source_num) - if (&cursors[source_num] == current.impl) - break; + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = current.impl->order; - if (source_num == size) + if (source_num >= cursors.size()) throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); for (size_t i = 0; i < num_columns; ++i) @@ -224,6 +221,9 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs finished = true; } + if (row_sources) + row_sources->resize_fill(row_sources->size() + merged_rows, RowSourcePart(source_num)); + // std::cerr << "fetching next block\n"; total_merged_rows += merged_rows; @@ -236,6 +236,21 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + if (row_sources) + { + /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) + size_t source_num = 0; + size_t size = cursors.size(); + for (; source_num < size; ++source_num) + if (&cursors[source_num] == current.impl) + break; + + if (source_num != current.impl->order) + throw Exception("Developer's logical error"); + + row_sources->push_back(RowSourcePart(current.impl->order)); + } + if (!current->isLast()) { // std::cerr << "moving to next row\n"; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index c993e103e42..979494541f8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -16,11 +16,13 @@ #include #include #include +#include #include #include #include #include +#include namespace ProfileEvents @@ -275,6 +277,32 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition( } +static void extractOrdinaryAndIndexColumns(const NamesAndTypesList & all_columns, const SortDescription & sort_desc, + NamesAndTypesList & ordinary_column_names_and_types, Names & ordinary_column_names, + NamesAndTypesList & index_column_names_and_types, Names & index_column_names +) +{ + size_t column_number = 0; + for (auto & column : all_columns) + { + auto it = std::find_if(sort_desc.begin(), sort_desc.end(), + [&](const SortColumnDescription & desc) { return desc.matchColumn(column.name, column_number); }); + + if (sort_desc.end() == it) + { + ordinary_column_names_and_types.emplace_back(column); + ordinary_column_names.emplace_back(column.name); + } + else + { + index_column_names_and_types.emplace_back(column); + index_column_names.emplace_back(column.name); + } + + ++column_number; + } +} + /// parts должны быть отсортированы. MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, @@ -306,31 +334,52 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart part->accumulateColumnSizes(merged_column_to_size); } - Names column_names = data.getColumnNamesList(); - NamesAndTypesList column_names_and_types = data.getColumnsList(); + Names all_column_names = data.getColumnNamesList(); + NamesAndTypesList all_column_names_and_types = data.getColumnsList(); + SortDescription sort_desc = data.getSortDescription(); + + NamesAndTypesList ordinary_column_names_and_types, index_column_names_and_types; + Names ordinary_column_names, index_column_names; + extractOrdinaryAndIndexColumns(all_column_names_and_types, sort_desc, + ordinary_column_names_and_types, ordinary_column_names, + index_column_names_and_types, index_column_names + ); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); ActiveDataPartSet::parsePartName(merged_name, *new_data_part); new_data_part->name = "tmp_" + merged_name; new_data_part->is_temp = true; + size_t sum_rows_upper_bound = 0; + for (const auto & part : parts) + sum_rows_upper_bound += part->size * data.index_granularity; + + const auto rows_total = merge_entry->total_size_marks * data.index_granularity; + + MergedRowSources merged_rows_sources; + MergeAlg merge_alg = chooseMergingAlg(parts, all_column_names, sort_desc, sum_rows_upper_bound, merged_rows_sources); + + MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlg::VERTICAL) ? + &merged_rows_sources : nullptr; + Names & main_column_names = (merge_alg == MergeAlg::VERTICAL) ? + all_column_names : index_column_names; + NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlg::VERTICAL) ? + all_column_names_and_types : index_column_names_and_types; + + LOG_DEBUG(log, "Selected MergeAlg : " << ((merge_alg == MergeAlg::VERTICAL) ? "VERTICAL" : "BASIC")); + /** Читаем из всех кусков, сливаем и пишем в новый. * Попутно вычисляем выражение для сортировки. */ BlockInputStreams src_streams; - size_t sum_rows_approx = 0; - - const auto rows_total = merge_entry->total_size_marks * data.index_granularity; - for (size_t i = 0; i < parts.size(); ++i) { - MarkRanges ranges{{0, parts[i]->size}}; - String part_path = data.getFullPath() + parts[i]->name + '/'; + auto input = std::make_unique( - part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data, - parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); + part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i], + MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); input->setProgressCallback([&merge_entry, rows_total] (const Progress & value) { @@ -347,8 +396,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart std::make_shared(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression()))); else src_streams.emplace_back(std::move(input)); - - sum_rows_approx += parts[i]->size * data.index_granularity; } /// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника. @@ -360,32 +407,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { case MergeTreeData::MergingParams::Ordinary: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr); break; case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr); break; case MergeTreeData::MergingParams::Summing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Aggregating: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Replacing: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Graphite: merged_stream = std::make_unique( - src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE, + src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, data.merging_params.graphite_params, time_of_merge); break; @@ -404,7 +451,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart static_cast(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes()); MergedBlockOutputStream to{ - data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold}; + data, new_part_tmp_path, main_column_names_and_types, compression_method, merged_column_to_size, aio_threshold}; merged_stream->readPrefix(); to.writePrefix(); @@ -422,15 +469,54 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; if (disk_reservation) - disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation)); + disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_upper_bound)) * initial_reservation)); } if (isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); + MergeTreeData::DataPart::Checksums checksums_ordinary_columns; + + /// Gather ordinary rows + if (merge_alg == MergeAlg::VERTICAL) + { + BlockInputStreams column_part_streams(parts.size()); + auto it_name_and_type = ordinary_column_names_and_types.cbegin(); + + for (size_t column_id = 0; column_id < ordinary_column_names.size(); ++column_id) + { + Names column_name_(1, ordinary_column_names[column_id]); + NamesAndTypesList column_name_and_type_(1, *it_name_and_type++); + + for (size_t part_id = 0; part_id < parts.size(); ++part_id) + { + String part_path = data.getFullPath() + parts[part_id]->name + '/'; + + /// TODO: test perfomance with more accurate settings + column_part_streams[part_id] = std::make_shared( + part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_id], + MarkRanges(1, MarkRange(0, parts[part_id]->size)), false, nullptr, "", true, aio_threshold, 8192, false + ); + } + + ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, 8192); + MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method); + + column_to.writePrefix(); + while ((block = column_gathered_stream.read())) + { + column_to.write(block); + } + checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums()); + } + } + merged_stream->readSuffix(); - new_data_part->columns = column_names_and_types; - new_data_part->checksums = to.writeSuffixAndGetChecksums(); + new_data_part->columns = all_column_names_and_types; + if (merge_alg != MergeAlg::VERTICAL) + new_data_part->checksums = to.writeSuffixAndGetChecksums(); + else + new_data_part->checksums = to.writeSuffixAndGetChecksums(all_column_names_and_types, &checksums_ordinary_columns); new_data_part->index.swap(to.getIndex()); /// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк. @@ -446,6 +532,37 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart } +MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( + const MergeTreeData::DataPartsVector & parts, const Names & all_column_names, const SortDescription & sort_desc, + size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const +{ + bool is_supported_storage = + data.merging_params.mode == MergeTreeData::MergingParams::Ordinary || + data.merging_params.mode == MergeTreeData::MergingParams::Collapsing; + + bool enough_ordinary_cols = all_column_names.size() > sort_desc.size(); + + bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS; + + auto merge_alg = (is_supported_storage && enough_ordinary_cols && no_parts_overflow) ? MergeAlg::VERTICAL : MergeAlg::BASIC; + + if (merge_alg == MergeAlg::VERTICAL) + { + try + { + rows_sources_to_alloc.reserve(sum_rows_upper_bound); + } + catch (...) + { + /// Not enough memory for VERTICAL merge algorithm, make sense for very large tables + merge_alg = MergeAlg::BASIC; + } + } + + return merge_alg; +} + + MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart( MergeTreeData::DataPartsVector & parts, MergeTreeData::MutableDataPartPtr & new_data_part, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 11c4b09a08f..dafba018282 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -222,6 +222,17 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s files[file_name] = Checksum(file_size, file_hash); } +void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums) +{ + for (auto & checksum : rhs_checksums.files) + { + if (!files.emplace(std::move(checksum)).second) + throw Exception("Adding already existing file checksum", ErrorCodes::LOGICAL_ERROR); + } + + rhs_checksums.files.clear(); +} + /// Контрольная сумма от множества контрольных сумм .bin файлов. void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const { diff --git a/merges_test.sh b/merges_test.sh new file mode 100755 index 00000000000..5a25a6c1f88 --- /dev/null +++ b/merges_test.sh @@ -0,0 +1,37 @@ +#!/bin/bash +set -e + +pre= +chunk=8192 +rows=$((90 * chunk)) + +clickhouse-client --query "DROP TABLE IF EXISTS test.merges" +clickhouse-client --query "DROP TABLE IF EXISTS test.merges_ref" +#sudo rm -r /opt/clickhouse/data/test/merges + +clickhouse-client --query "CREATE TABLE test.merges (part Date, id UInt64, data UInt64) ENGINE = MergeTree(part, id, 8192)" +clickhouse-client --query "CREATE TABLE test.merges_ref (part Date, id UInt64, data UInt64) ENGINE = Memory" + +iter=0 +for i in $(seq 0 $chunk $((rows-1)) ); do + iter=$((iter+1)) + clickhouse-client --query "insert into test.merges select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $i, $chunk" + clickhouse-client --query "insert into test.merges_ref select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $i, $chunk" + + parts=$( clickhouse-client --query "SELECT count(*) FROM system.parts WHERE table = 'merges' AND active = 1" ) + echo "i=$iter p=$parts [$i, $chunk) / $rows" +done + +clickhouse-client --query "SELECT name, active, min_block_number, max_block_number FROM system.parts WHERE table = 'merges' AND active = 1 FORMAT PrettyCompact" +clickhouse-client --query "OPTIMIZE TABLE test.merges" || echo "OPTIMIZE FAIL!!!" +clickhouse-client --query "SELECT name, active, min_block_number, max_block_number FROM system.parts WHERE table = 'merges' AND active = 1 FORMAT PrettyCompact" + +#clickhouse-client --query "insert into test.merges_ref SELECT * FROM (select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit 0, $rows) ORDER BY id" +#clickhouse-client --query "SELECT id, data FROM test.merges_ref ORDER BY id" > tmp_ref +clickhouse-client --query "SELECT id, data FROM (select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $rows) ORDER BY id" > tmp_ref +clickhouse-client -n --query "SET max_threads = 1; SELECT id, data FROM test.merges;" > tmp_src + +cmp tmp_ref tmp_src || echo "FAIL" + +#clickhouse-client --query "CREATE TEMPORARY TABLE test.merges_union (id ALIAS test.merges.id, id_ref ALIAS test.merges_ref.id, data ALIAS test.merges.data, data_ref ALIAS test.merges_ref.id)" +#clickhouse-client --query "SELECT sum(abs(id - id_ref)), sum(abs(data - data_ref)) FROM test.merges_union" \ No newline at end of file From 2cd5b8b6048e68e394e019ade25c6084d60cbbc0 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 9 Nov 2016 20:58:44 +0300 Subject: [PATCH 2/6] Updated MergeInfo and progress callbacks with respect to vertical merge alg. --- .../include/DB/Storages/MergeTree/MergeList.h | 6 + .../MergeTree/MergeTreeBlockInputStream.h | 2 +- .../Storages/MergeTree/MergeTreeDataMerger.h | 8 +- .../MergeTree/MergeTreeBlockInputStream.cpp | 3 +- .../MergeTree/MergeTreeDataMerger.cpp | 232 ++++++++++++++---- .../Storages/MergeTree/MergeTreeDataPart.cpp | 5 +- .../Storages/System/StorageSystemMerges.cpp | 8 +- merges_test.sh | 37 --- 8 files changed, 202 insertions(+), 99 deletions(-) delete mode 100755 merges_test.sh diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 251be557160..0c3a1e5bcdd 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -34,7 +34,13 @@ struct MergeInfo std::atomic bytes_read_uncompressed{}; std::atomic rows_read{}; std::atomic bytes_written_uncompressed{}; + + /// mutually exclusive, updated either rows_written either columns_written std::atomic rows_written{}; + std::atomic columns_written{}; + + /// Number of rows for which primary key cols are written (updated alwasys) + std::atomic rows_with_key_columns_written{}; MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index e5bcbd70a82..f5a52a81e7d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -24,7 +24,7 @@ public: const MarkRanges & mark_ranges_, bool use_uncompressed_cache_, ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns, size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_, - bool save_marks_in_cache_); + bool save_marks_in_cache_, bool quiet = false); ~MergeTreeBlockInputStream() override; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index 66e88d0ef1f..22d5c1b3e6e 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -9,6 +9,7 @@ namespace DB { class MergeListEntry; +class MergeProgressCallBack; struct ReshardingJob; @@ -120,7 +121,7 @@ public: bool isCancelled() const { return cancelled > 0; } -protected: +public: enum class MergeAlg { @@ -128,8 +129,9 @@ protected: VERTICAL }; - MergeAlg chooseMergingAlg( - const MergeTreeData::DataPartsVector & parts, const Names & all_column_names, const SortDescription & sort_desc, +private: + + MergeAlg chooseMergingAlg(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const; private: diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp index aa9345041d8..1f4f6c4e6af 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp @@ -21,7 +21,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, /// const MarkRanges & mark_ranges_, bool use_uncompressed_cache_, ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns, size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_, - bool save_marks_in_cache_) + bool save_marks_in_cache_, bool quiet) : path(path_), block_size(block_size_), storage(storage_), owned_data_part(owned_data_part_), @@ -97,6 +97,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, /// total_rows += range.end - range.begin; total_rows *= storage.index_granularity; + if (!quiet) LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name << ", approx. " << total_rows << (all_mark_ranges.size() > 1 diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 979494541f8..2def7884a07 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -44,6 +44,10 @@ namespace ErrorCodes extern const int ABORTED; } + +using MergeAlg = MergeTreeDataMerger::MergeAlg; + + namespace { @@ -147,12 +151,13 @@ bool MergeTreeDataMerger::selectPartsToMerge( std::unique_ptr merge_selector; - SimpleMergeSelector::Settings merge_settings; if (aggressive) - merge_settings.base = 1; - - /// NOTE Could allow selection of different merge strategy. - merge_selector = std::make_unique(merge_settings); + merge_selector = std::make_unique(); + else + { + LevelMergeSelector::Settings merge_settings; + merge_selector = std::make_unique(merge_settings); + } IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select( partitions, @@ -276,33 +281,144 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition( return parts_from_partition; } - -static void extractOrdinaryAndIndexColumns(const NamesAndTypesList & all_columns, const SortDescription & sort_desc, +static void extractOrdinaryAndKeyColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions, NamesAndTypesList & ordinary_column_names_and_types, Names & ordinary_column_names, - NamesAndTypesList & index_column_names_and_types, Names & index_column_names + NamesAndTypesList & key_column_names_and_types, Names & key_column_names ) { - size_t column_number = 0; + Names key_columns_dup = primary_key_expressions->getRequiredColumns(); + std::set key_columns(key_columns_dup.cbegin(), key_columns_dup.cend()); + for (auto & column : all_columns) { - auto it = std::find_if(sort_desc.begin(), sort_desc.end(), - [&](const SortColumnDescription & desc) { return desc.matchColumn(column.name, column_number); }); + auto it = std::find(key_columns.cbegin(), key_columns.cend(), column.name); - if (sort_desc.end() == it) + if (key_columns.end() == it) { ordinary_column_names_and_types.emplace_back(column); ordinary_column_names.emplace_back(column.name); } else { - index_column_names_and_types.emplace_back(column); - index_column_names.emplace_back(column.name); + key_column_names_and_types.emplace_back(column); + key_column_names.emplace_back(column.name); } - - ++column_number; } } +/* Allow to compute more accurate progress statistics */ +struct ColumnSizeEstimator : public std::unordered_map +{ + size_t sum_total = 0; + size_t sum_index_columns = 0; + size_t sum_ordinary_columns = 0; + + ColumnSizeEstimator(MergeTreeData::DataPartsVector & parts, const Names & key_columns, const Names & ordinary_columns) + { + if (parts.empty()) + return; + + for (const auto & name_and_type : parts.front()->columns) + (*this)[name_and_type.name] = 0; + + for (const auto & part : parts) + { + for (const auto & name_and_type : parts.front()->columns) + this->at(name_and_type.name) += part->getColumnSize(name_and_type.name); + } + + for (const auto & name : key_columns) + sum_index_columns += this->at(name); + + for (const auto & name : ordinary_columns) + sum_ordinary_columns += this->at(name); + + sum_total = sum_index_columns + sum_ordinary_columns; + } + + Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const + { + return static_cast(this->at(column)) / num_total_rows * num_rows; + } + + Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const + { + return columnSize(column, num_rows, num_total_rows) / sum_total; + } + + Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const + { + return static_cast(sum_index_columns) / num_total_rows * num_rows; + } + + Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const + { + return keyColumnsSize(num_rows, num_total_rows) / sum_total; + } +}; + + +class MergeProgressCallBack : public ProgressCallback +{ +public: + MergeProgressCallBack(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {} + + MergeProgressCallBack(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlg merge_alg_, size_t num_total_rows, + const ColumnSizeEstimator & column_sizes) + : merge_entry(merge_entry_), merge_alg(merge_alg_) + { + if (merge_alg == MergeAlg::BASIC) + average_elem_progress = 1.0 / num_total_rows; + else + average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows); + } + + MergeList::Entry & merge_entry; + const MergeAlg merge_alg{MergeAlg::VERTICAL}; + Float64 average_elem_progress; + /// NOTE: not thread-safety + size_t rows_read_internal{0}; + + void operator() (const Progress & value) + { + merge_entry->bytes_read_uncompressed += value.bytes; + ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); + + if (merge_alg == MergeAlg::BASIC) + { + merge_entry->progress = average_elem_progress * (merge_entry->rows_read += value.rows); + ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); + } + else + { + merge_entry->progress = average_elem_progress * (rows_read_internal += value.rows); + } + }; +}; + +class MergeProgressCallbackOrdinary : public MergeProgressCallBack +{ +public: + + MergeProgressCallbackOrdinary(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, + const ColumnSizeEstimator & column_sizes, const String & column_name) + : MergeProgressCallBack(merge_entry_), initial_progress(merge_entry->progress) + { + average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); + } + + Float64 initial_progress; + + void operator() (const Progress & value) + { + merge_entry->bytes_read_uncompressed += value.bytes; + ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); + + Float64 local_progress = average_elem_progress * (rows_read_internal += value.rows); + merge_entry->progress = initial_progress + local_progress; + }; +}; + /// parts должны быть отсортированы. MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, @@ -338,11 +454,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart NamesAndTypesList all_column_names_and_types = data.getColumnsList(); SortDescription sort_desc = data.getSortDescription(); - NamesAndTypesList ordinary_column_names_and_types, index_column_names_and_types; - Names ordinary_column_names, index_column_names; - extractOrdinaryAndIndexColumns(all_column_names_and_types, sort_desc, + NamesAndTypesList ordinary_column_names_and_types, key_column_names_and_types; + Names ordinary_column_names, key_column_names; + extractOrdinaryAndKeyColumns(all_column_names_and_types, data.getPrimaryExpression(), ordinary_column_names_and_types, ordinary_column_names, - index_column_names_and_types, index_column_names + key_column_names_and_types, key_column_names ); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); @@ -350,23 +466,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart new_data_part->name = "tmp_" + merged_name; new_data_part->is_temp = true; - size_t sum_rows_upper_bound = 0; - for (const auto & part : parts) - sum_rows_upper_bound += part->size * data.index_granularity; - - const auto rows_total = merge_entry->total_size_marks * data.index_granularity; + size_t sum_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity; MergedRowSources merged_rows_sources; - MergeAlg merge_alg = chooseMergingAlg(parts, all_column_names, sort_desc, sum_rows_upper_bound, merged_rows_sources); + MergeAlg merge_alg = chooseMergingAlg(data, parts, sum_rows_upper_bound, merged_rows_sources); MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlg::VERTICAL) ? &merged_rows_sources : nullptr; Names & main_column_names = (merge_alg == MergeAlg::VERTICAL) ? - all_column_names : index_column_names; + key_column_names : all_column_names; NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlg::VERTICAL) ? - all_column_names_and_types : index_column_names_and_types; + key_column_names_and_types : all_column_names_and_types; - LOG_DEBUG(log, "Selected MergeAlg : " << ((merge_alg == MergeAlg::VERTICAL) ? "VERTICAL" : "BASIC")); + ColumnSizeEstimator column_sizes(parts, key_column_names, ordinary_column_names); /** Читаем из всех кусков, сливаем и пишем в новый. * Попутно вычисляем выражение для сортировки. @@ -381,15 +493,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i], MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); - input->setProgressCallback([&merge_entry, rows_total] (const Progress & value) - { - const auto new_rows_read = merge_entry->rows_read += value.rows; - merge_entry->progress = static_cast(new_rows_read) / rows_total; - merge_entry->bytes_read_uncompressed += value.bytes; - - ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); - ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - }); + input->setProgressCallback(MergeProgressCallBack{merge_entry, merge_alg, sum_rows_upper_bound, column_sizes}); if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) src_streams.emplace_back(std::make_shared( @@ -466,9 +570,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart to.write(block); merge_entry->rows_written = merged_stream->getProfileInfo().rows; + merge_entry->rows_with_key_columns_written += merged_stream->getProfileInfo().rows; merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; - if (disk_reservation) + /// This update is unactual for VERTICAL alg sicne it requires more accurate per-column updates + /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements + if (disk_reservation && merge_alg == MergeAlg::BASIC) disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_upper_bound)) * initial_reservation)); } @@ -480,26 +587,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart /// Gather ordinary rows if (merge_alg == MergeAlg::VERTICAL) { + size_t sum_rows_exact = merge_entry->rows_with_key_columns_written; + merge_entry->columns_written = key_column_names.size(); + merge_entry->progress = column_sizes.keyColumnsProgress(sum_rows_exact, sum_rows_exact); + BlockInputStreams column_part_streams(parts.size()); auto it_name_and_type = ordinary_column_names_and_types.cbegin(); - for (size_t column_id = 0; column_id < ordinary_column_names.size(); ++column_id) + for (size_t column_num = 0; column_num < ordinary_column_names.size(); ++column_num) { - Names column_name_(1, ordinary_column_names[column_id]); + const String & column_name = ordinary_column_names[column_num]; + Names column_name_(1, column_name); NamesAndTypesList column_name_and_type_(1, *it_name_and_type++); - for (size_t part_id = 0; part_id < parts.size(); ++part_id) + LOG_TRACE(log, "Gathering column " << column_name << " " << column_name_and_type_.front().type->getName()); + + for (size_t part_num = 0; part_num < parts.size(); ++part_num) { - String part_path = data.getFullPath() + parts[part_id]->name + '/'; + String part_path = data.getFullPath() + parts[part_num]->name + '/'; /// TODO: test perfomance with more accurate settings - column_part_streams[part_id] = std::make_shared( - part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_id], - MarkRanges(1, MarkRange(0, parts[part_id]->size)), false, nullptr, "", true, aio_threshold, 8192, false - ); + auto column_part_stream = std::make_shared( + part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num], + MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, + false, true); + + column_part_stream->setProgressCallback( + MergeProgressCallbackOrdinary{merge_entry, sum_rows_exact, column_sizes, column_name}); + + column_part_streams[part_num] = std::move(column_part_stream); } - ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, 8192); + ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, DEFAULT_BLOCK_SIZE); MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method); column_to.writePrefix(); @@ -507,7 +626,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { column_to.write(block); } + + /// NOTE: nested column contains duplicates checksums (and files) checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums()); + + merge_entry->columns_written = key_column_names.size() + column_num; } } @@ -533,18 +656,21 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( - const MergeTreeData::DataPartsVector & parts, const Names & all_column_names, const SortDescription & sort_desc, + const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const { bool is_supported_storage = data.merging_params.mode == MergeTreeData::MergingParams::Ordinary || data.merging_params.mode == MergeTreeData::MergingParams::Collapsing; - bool enough_ordinary_cols = all_column_names.size() > sort_desc.size(); + bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size(); + + bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE; bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS; - auto merge_alg = (is_supported_storage && enough_ordinary_cols && no_parts_overflow) ? MergeAlg::VERTICAL : MergeAlg::BASIC; + auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ? + MergeAlg::VERTICAL : MergeAlg::BASIC; if (merge_alg == MergeAlg::VERTICAL) { @@ -559,6 +685,8 @@ MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( } } + LOG_DEBUG(log, "Selected MergeAlg: " << ((merge_alg == MergeAlg::VERTICAL) ? "VERTICAL" : "BASIC")); + return merge_alg; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index dafba018282..9341f6e6801 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -225,10 +225,7 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums) { for (auto & checksum : rhs_checksums.files) - { - if (!files.emplace(std::move(checksum)).second) - throw Exception("Adding already existing file checksum", ErrorCodes::LOGICAL_ERROR); - } + files[std::move(checksum.first)] = std::move(checksum.second); rhs_checksums.files.clear(); } diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index d78111e05ff..e19a2ef7c0e 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -24,7 +24,9 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) { "bytes_read_uncompressed", std::make_shared() }, { "rows_read", std::make_shared() }, { "bytes_written_uncompressed", std::make_shared() }, - { "rows_written", std::make_shared() } + { "rows_written", std::make_shared() }, + { "rows_with_key_columns_written", std::make_shared() }, + { "columns_written", std::make_shared() } } { } @@ -58,6 +60,8 @@ BlockInputStreams StorageSystemMerges::read( ColumnWithTypeAndName col_rows_read{std::make_shared(), std::make_shared(), "rows_read"}; ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared(), std::make_shared(), "bytes_written_uncompressed"}; ColumnWithTypeAndName col_rows_written{std::make_shared(), std::make_shared(), "rows_written"}; + ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared(), std::make_shared(), "rows_with_key_columns_written"}; + ColumnWithTypeAndName col_columns_written{std::make_shared(), std::make_shared(), "columns_written"}; for (const auto & merge : context.getMergeList().get()) { @@ -73,6 +77,8 @@ BlockInputStreams StorageSystemMerges::read( col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed)); col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed)); + col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed)); + col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed)); } Block block{ diff --git a/merges_test.sh b/merges_test.sh deleted file mode 100755 index 5a25a6c1f88..00000000000 --- a/merges_test.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash -set -e - -pre= -chunk=8192 -rows=$((90 * chunk)) - -clickhouse-client --query "DROP TABLE IF EXISTS test.merges" -clickhouse-client --query "DROP TABLE IF EXISTS test.merges_ref" -#sudo rm -r /opt/clickhouse/data/test/merges - -clickhouse-client --query "CREATE TABLE test.merges (part Date, id UInt64, data UInt64) ENGINE = MergeTree(part, id, 8192)" -clickhouse-client --query "CREATE TABLE test.merges_ref (part Date, id UInt64, data UInt64) ENGINE = Memory" - -iter=0 -for i in $(seq 0 $chunk $((rows-1)) ); do - iter=$((iter+1)) - clickhouse-client --query "insert into test.merges select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $i, $chunk" - clickhouse-client --query "insert into test.merges_ref select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $i, $chunk" - - parts=$( clickhouse-client --query "SELECT count(*) FROM system.parts WHERE table = 'merges' AND active = 1" ) - echo "i=$iter p=$parts [$i, $chunk) / $rows" -done - -clickhouse-client --query "SELECT name, active, min_block_number, max_block_number FROM system.parts WHERE table = 'merges' AND active = 1 FORMAT PrettyCompact" -clickhouse-client --query "OPTIMIZE TABLE test.merges" || echo "OPTIMIZE FAIL!!!" -clickhouse-client --query "SELECT name, active, min_block_number, max_block_number FROM system.parts WHERE table = 'merges' AND active = 1 FORMAT PrettyCompact" - -#clickhouse-client --query "insert into test.merges_ref SELECT * FROM (select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit 0, $rows) ORDER BY id" -#clickhouse-client --query "SELECT id, data FROM test.merges_ref ORDER BY id" > tmp_ref -clickhouse-client --query "SELECT id, data FROM (select toDate(0) as part, intHash64(number) as id, number as data from system.numbers limit $rows) ORDER BY id" > tmp_ref -clickhouse-client -n --query "SET max_threads = 1; SELECT id, data FROM test.merges;" > tmp_src - -cmp tmp_ref tmp_src || echo "FAIL" - -#clickhouse-client --query "CREATE TEMPORARY TABLE test.merges_union (id ALIAS test.merges.id, id_ref ALIAS test.merges_ref.id, data ALIAS test.merges.data, data_ref ALIAS test.merges_ref.id)" -#clickhouse-client --query "SELECT sum(abs(id - id_ref)), sum(abs(data - data_ref)) FROM test.merges_union" \ No newline at end of file From 588add5a49fc3e73f031396bad2de90f6c339a30 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Tue, 22 Nov 2016 22:34:36 +0300 Subject: [PATCH 3/6] Added requested changes. Also fixed single thread read from empty storage Log. --- dbms/include/DB/Core/SortDescription.h | 18 +- .../CollapsingSortedBlockInputStream.h | 4 +- .../DB/DataStreams/ColumnGathererStream.h | 17 +- .../MergingSortedBlockInputStream.h | 12 +- .../include/DB/Storages/MergeTree/MergeList.h | 18 +- .../Storages/MergeTree/MergeTreeDataMerger.h | 10 +- .../CollapsingSortedBlockInputStream.cpp | 19 +- dbms/src/DataStreams/ColumnGathererStream.cpp | 36 ++-- .../MergingSortedBlockInputStream.cpp | 14 +- .../MergeTree/MergeTreeDataMerger.cpp | 176 ++++++++++-------- dbms/src/Storages/StorageLog.cpp | 2 +- .../Storages/System/StorageSystemMerges.cpp | 16 +- 12 files changed, 195 insertions(+), 147 deletions(-) diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index 43793975179..1573dc645fd 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -35,9 +35,9 @@ struct SortColumnDescription using SortDescription = std::vector; -/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках. - * Курсор двигается по одному блоку. - * Для использования в priority queue. +/** Cursor allows to compare rows in different blocks (and parts). + * Cursor moves inside single block. + * It is used in priority queue. */ struct SortCursorImpl { @@ -48,22 +48,22 @@ struct SortCursorImpl size_t pos = 0; size_t rows = 0; - /** Порядок (что сравнивается), если сравниваемые столбцы равны. - * Даёт возможность предпочитать строки из нужного курсора. + /** Determines order if comparing columns are equal. + * Order is determined by number of cursor. + * + * Cursor number (always?) equals to number of merging part. + * Therefore this filed can be used to determine part number of current row (see ColumnGathererStream). */ size_t order; using NeedCollationFlags = std::vector; - /** Нужно ли использовать Collator для сортировки столбца */ + /** Should we use Collator to sort a column? */ NeedCollationFlags need_collation; /** Есть ли хотя бы один столбец с Collator. */ bool has_collation = false; - /* Index of part from which the block was acquired */ - // UInt8 source_part_label; - SortCursorImpl() {} SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) diff --git a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h index c2c270d3215..da9c6bacfbf 100644 --- a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h @@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream { public: CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, MergedRowSources * row_sources_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, row_sources_), + const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_), sign_column(sign_column_) { } diff --git a/dbms/include/DB/DataStreams/ColumnGathererStream.h b/dbms/include/DB/DataStreams/ColumnGathererStream.h index dd00f3fe772..5773d7080da 100644 --- a/dbms/include/DB/DataStreams/ColumnGathererStream.h +++ b/dbms/include/DB/DataStreams/ColumnGathererStream.h @@ -11,13 +11,13 @@ namespace DB struct __attribute__((__packed__)) RowSourcePart { - unsigned int skip: 1; + unsigned int flag: 1; unsigned int source_id: 7; - RowSourcePart(unsigned source_id_, bool skip_ = false) + RowSourcePart(unsigned source_id_, bool flag_ = false) { source_id = source_id_; - skip = skip_; + flag = flag_; } static constexpr size_t MAX_PARTS = 127; @@ -28,14 +28,15 @@ using MergedRowSources = PODArray; /** Gather single stream from multiple streams according to streams mask. * Stream mask maps row number to index of source stream. + * Streams should conatin exactly one column. */ class ColumnGathererStream : public IProfilingBlockInputStream { public: - ColumnGathererStream(const BlockInputStreams & source_streams, MergedRowSources & pos_to_source_idx_, + ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources& pos_to_source_idx_, size_t block_size_ = DEFAULT_BLOCK_SIZE); - String getName() const override { return "MergingSorted"; } + String getName() const override { return "ColumnGatherer"; } String getID() const override; @@ -43,12 +44,12 @@ public: private: - MergedRowSources & pos_to_source_idx; + const MergedRowSources & pos_to_source_idx; /// Cache required fileds struct Source { - IColumn * column; + const IColumn * column; size_t pos; size_t size; Block block; @@ -60,7 +61,7 @@ private: void update() { - column = &*block.getByPosition(0).column; + column = block.getByPosition(0).column.get(); size = block.rowsInFirstColumn(); pos = 0; } diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 7ea2be168aa..223c71edf99 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -57,11 +57,12 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr) class MergingSortedBlockInputStream : public IProfilingBlockInputStream { public: - /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. + /// limit - if isn't 0, then we can produce only first limit rows in sorted order. + /// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag) MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, - size_t max_block_size_, size_t limit_ = 0, MergedRowSources * row_sources_ = nullptr) + size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr) : description(description_), max_block_size(max_block_size_), limit(limit_), - source_blocks(inputs_.size()), cursors(inputs_.size()), row_sources(row_sources_) + source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); } @@ -160,8 +161,9 @@ protected: using QueueWithCollation = std::priority_queue; QueueWithCollation queue_with_collation; - /// Used in VERTICAL merge algorithm - MergedRowSources * row_sources = nullptr; + /// Used in Vertical merge algorithm to gather non-PK columns (on next step) + /// If it is not nullptr then it should be populated during execution + MergedRowSources * out_row_sources = nullptr; /// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах. diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 0c3a1e5bcdd..28b09100da5 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -32,14 +32,19 @@ struct MergeInfo UInt64 total_size_bytes_compressed{}; UInt64 total_size_marks{}; std::atomic bytes_read_uncompressed{}; - std::atomic rows_read{}; std::atomic bytes_written_uncompressed{}; - /// mutually exclusive, updated either rows_written either columns_written + /// Updated only for Horizontal algorithm + std::atomic rows_read{}; std::atomic rows_written{}; + + /// Updated only for Vertical algorithm + /// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written std::atomic columns_written{}; - /// Number of rows for which primary key cols are written (updated alwasys) + /// Updated in both cases + /// Number of rows for which primary key columns have been written + std::atomic rows_with_key_columns_read{}; std::atomic rows_with_key_columns_written{}; @@ -58,9 +63,12 @@ struct MergeInfo total_size_bytes_compressed(other.total_size_bytes_compressed), total_size_marks(other.total_size_marks), bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)), - rows_read(other.rows_read.load(std::memory_order_relaxed)), bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)), - rows_written(other.rows_written.load(std::memory_order_relaxed)) + rows_read(other.rows_read.load(std::memory_order_relaxed)), + rows_written(other.rows_written.load(std::memory_order_relaxed)), + columns_written(other.columns_written.load(std::memory_order_relaxed)), + rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)), + rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed)) { } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index 22d5c1b3e6e..a955d307fbf 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -9,7 +9,7 @@ namespace DB { class MergeListEntry; -class MergeProgressCallBack; +class MergeProgressCallback; struct ReshardingJob; @@ -123,15 +123,15 @@ public: public: - enum class MergeAlg + enum class MergeAlgorithm { - BASIC, - VERTICAL + Horizontal, /// per-row merge of all columns + Vertical /// per-row merge of PK columns, per-column gather for non-PK columns }; private: - MergeAlg chooseMergingAlg(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, + MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const; private: diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 4cc8ca61d5a..04ffb48ce2f 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -56,10 +56,11 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); - if (row_sources) + if (out_row_sources) { - row_sources->data()[last_positive_pos].skip = false; - row_sources->data()[last_negative_pos].skip = false; + /// true flag value means "skip row" + out_row_sources->data()[last_positive_pos].flag = false; + out_row_sources->data()[last_negative_pos].flag = false; } } return; @@ -71,8 +72,8 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num); - if (row_sources) - row_sources->data()[first_negative_pos].skip = false; + if (out_row_sources) + out_row_sources->data()[first_negative_pos].flag = false; } if (count_positive >= count_negative) @@ -81,8 +82,8 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); - if (row_sources) - row_sources->data()[last_positive_pos].skip = false; + if (out_row_sources) + out_row_sources->data()[last_positive_pos].flag = false; } if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) @@ -162,8 +163,8 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s queue.pop(); /// Initially, skip all rows. On insert, unskip "corner" rows. - if (row_sources) - row_sources->push_back(RowSourcePart(current.impl->order, true)); + if (out_row_sources) + out_row_sources->emplace_back(current.impl->order, true); if (key_differs) { diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp index 2260734d967..199b81e774b 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -7,11 +7,17 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCOMPATIBLE_COLUMNS; + extern const int INCORRECT_NUMBER_OF_COLUMNS; + extern const int EMPTY_DATA_PASSED; + extern const int RECEIVED_EMPTY_DATA; } -ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, MergedRowSources & pos_to_source_idx_, size_t block_size_) +ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources & pos_to_source_idx_, size_t block_size_) : pos_to_source_idx(pos_to_source_idx_), block_size(block_size_) { + if (source_streams.empty()) + throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); + children.assign(source_streams.begin(), source_streams.end()); sources.reserve(children.size()); @@ -20,9 +26,12 @@ ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_stre sources.emplace_back(children[i]->read()); Block & block = sources.back().block; - if (block.columns() > 1 || !block.getByPosition(0).column || - block.getByPosition(0).type->getName() != sources[0].block.getByPosition(0).type->getName()) - throw Exception("Column formats don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); + + if (block.columns() != 1) + throw Exception("Stream should contain exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); + + if (block.getByPosition(0).column->getName() != sources[0].block.getByPosition(0).column->getName()) + throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); } } @@ -45,19 +54,19 @@ Block ColumnGathererStream::readImpl() if (children.size() == 1) return children[0]->read(); - if (children.size() == 0 || pos_global >= pos_to_source_idx.size()) + if (pos_global >= pos_to_source_idx.size()) return Block(); Block block_res = sources[0].block.cloneEmpty(); - IColumn * column_res = &*block_res.getByPosition(0).column; + IColumn & column_res = *block_res.getByPosition(0).column; size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size()); - column_res->reserve(pos_finish - pos_global); + column_res.reserve(pos_finish - pos_global); for (size_t pos = pos_global; pos < pos_finish; ++pos) { auto source_id = pos_to_source_idx[pos].source_id; - bool skip = pos_to_source_idx[pos].skip; + bool skip = pos_to_source_idx[pos].flag; Source & source = sources[source_id]; if (source.pos >= source.size) /// Fetch new block @@ -67,20 +76,21 @@ Block ColumnGathererStream::readImpl() source.block = children[source_id]->read(); source.update(); } - catch (...) + catch (Exception & e) { - source.size = 0; + e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id)); + throw; } if (0 == source.size) { - throw Exception("Can't fetch required block from " + children[source_id]->getID() + " (i. e. source " + std::to_string(source_id) +")", - ErrorCodes::LOGICAL_ERROR); + throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id), + ErrorCodes::RECEIVED_EMPTY_DATA); } } if (!skip) - column_res->insertFrom(*source.column, source.pos); + column_res.insertFrom(*source.column, source.pos); //TODO: vectorize ++source.pos; } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 3cdb93bde48..a7e395e28fa 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -221,8 +221,8 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs finished = true; } - if (row_sources) - row_sources->resize_fill(row_sources->size() + merged_rows, RowSourcePart(source_num)); + if (out_row_sources) + out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num)); // std::cerr << "fetching next block\n"; @@ -236,19 +236,19 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - if (row_sources) + if (out_row_sources) { /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) size_t source_num = 0; - size_t size = cursors.size(); - for (; source_num < size; ++source_num) + for (; source_num < cursors.size(); ++source_num) if (&cursors[source_num] == current.impl) break; + /// TODO: This check can be removed after testing if (source_num != current.impl->order) - throw Exception("Developer's logical error"); + throw Exception("Developer's logical error", ErrorCodes::LOGICAL_ERROR); - row_sources->push_back(RowSourcePart(current.impl->order)); + out_row_sources->emplace_back(current.impl->order); } if (!current->isLast()) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 2def7884a07..8a9ce7c41a2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -45,7 +45,7 @@ namespace ErrorCodes } -using MergeAlg = MergeTreeDataMerger::MergeAlg; +using MergeAlgorithm = MergeTreeDataMerger::MergeAlgorithm; namespace @@ -151,13 +151,12 @@ bool MergeTreeDataMerger::selectPartsToMerge( std::unique_ptr merge_selector; + SimpleMergeSelector::Settings merge_settings; if (aggressive) - merge_selector = std::make_unique(); - else - { - LevelMergeSelector::Settings merge_settings; - merge_selector = std::make_unique(merge_settings); - } + merge_settings.base = 1; + + /// NOTE Could allow selection of different merge strategy. + merge_selector = std::make_unique(merge_settings); IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select( partitions, @@ -307,8 +306,13 @@ static void extractOrdinaryAndKeyColumns(const NamesAndTypesList & all_columns, } /* Allow to compute more accurate progress statistics */ -struct ColumnSizeEstimator : public std::unordered_map +class ColumnSizeEstimator { + std::unordered_map map; +public: + + /// Stores approximate size of columns in bytes + /// Exact values are not required since it used for relative values estimation (progress). size_t sum_total = 0; size_t sum_index_columns = 0; size_t sum_ordinary_columns = 0; @@ -319,38 +323,42 @@ struct ColumnSizeEstimator : public std::unordered_map return; for (const auto & name_and_type : parts.front()->columns) - (*this)[name_and_type.name] = 0; + map[name_and_type.name] = 0; for (const auto & part : parts) { for (const auto & name_and_type : parts.front()->columns) - this->at(name_and_type.name) += part->getColumnSize(name_and_type.name); + map.at(name_and_type.name) += part->getColumnSize(name_and_type.name); } for (const auto & name : key_columns) - sum_index_columns += this->at(name); + sum_index_columns += map.at(name); for (const auto & name : ordinary_columns) - sum_ordinary_columns += this->at(name); + sum_ordinary_columns += map.at(name); sum_total = sum_index_columns + sum_ordinary_columns; } + /// Approximate size of num_rows column elements if column contains num_total_rows elements Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const { - return static_cast(this->at(column)) / num_total_rows * num_rows; + return static_cast(map.at(column)) / num_total_rows * num_rows; } + /// Relative size of num_rows column elements (in comparison with overall size of all columns) if column contains num_total_rows elements Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const { return columnSize(column, num_rows, num_total_rows) / sum_total; } + /// Like columnSize, but takes into account only PK columns Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const { return static_cast(sum_index_columns) / num_total_rows * num_rows; } + /// Like columnProgress, but takes into account only PK columns Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const { return keyColumnsSize(num_rows, num_total_rows) / sum_total; @@ -358,25 +366,57 @@ struct ColumnSizeEstimator : public std::unordered_map }; -class MergeProgressCallBack : public ProgressCallback +class MergeProgressCallback : public ProgressCallback { public: - MergeProgressCallBack(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {} + MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {} - MergeProgressCallBack(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlg merge_alg_, size_t num_total_rows, + MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows, const ColumnSizeEstimator & column_sizes) : merge_entry(merge_entry_), merge_alg(merge_alg_) { - if (merge_alg == MergeAlg::BASIC) + if (merge_alg == MergeAlgorithm::Horizontal) average_elem_progress = 1.0 / num_total_rows; else average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows); } MergeList::Entry & merge_entry; - const MergeAlg merge_alg{MergeAlg::VERTICAL}; + const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical}; Float64 average_elem_progress; - /// NOTE: not thread-safety + + void operator() (const Progress & value) + { + ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); + merge_entry->bytes_read_uncompressed += value.bytes; + merge_entry->rows_with_key_columns_read += value.rows; + + if (merge_alg == MergeAlgorithm::Horizontal) + { + ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); + merge_entry->rows_read += value.rows; + merge_entry->progress = average_elem_progress * merge_entry->rows_read; + } + else + { + merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read; + } + }; +}; + +class MergeProgressCallbackVerticalStep : public MergeProgressCallback +{ +public: + + MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, + const ColumnSizeEstimator & column_sizes, const String & column_name) + : MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress) + { + average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); + } + + Float64 initial_progress; + /// NOTE: not thread safe (to be copyable). It is OK in current single thread use case size_t rows_read_internal{0}; void operator() (const Progress & value) @@ -384,42 +424,13 @@ public: merge_entry->bytes_read_uncompressed += value.bytes; ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - if (merge_alg == MergeAlg::BASIC) - { - merge_entry->progress = average_elem_progress * (merge_entry->rows_read += value.rows); - ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); - } - else - { - merge_entry->progress = average_elem_progress * (rows_read_internal += value.rows); - } - }; -}; - -class MergeProgressCallbackOrdinary : public MergeProgressCallBack -{ -public: - - MergeProgressCallbackOrdinary(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, - const ColumnSizeEstimator & column_sizes, const String & column_name) - : MergeProgressCallBack(merge_entry_), initial_progress(merge_entry->progress) - { - average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); - } - - Float64 initial_progress; - - void operator() (const Progress & value) - { - merge_entry->bytes_read_uncompressed += value.bytes; - ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - - Float64 local_progress = average_elem_progress * (rows_read_internal += value.rows); + rows_read_internal += value.rows; + Float64 local_progress = average_elem_progress * rows_read_internal; merge_entry->progress = initial_progress + local_progress; }; }; -/// parts должны быть отсортированы. +/// parts should be sorted. MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation) @@ -466,17 +477,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart new_data_part->name = "tmp_" + merged_name; new_data_part->is_temp = true; - size_t sum_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity; + size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity; MergedRowSources merged_rows_sources; - MergeAlg merge_alg = chooseMergingAlg(data, parts, sum_rows_upper_bound, merged_rows_sources); + MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources); - MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlg::VERTICAL) ? - &merged_rows_sources : nullptr; - Names & main_column_names = (merge_alg == MergeAlg::VERTICAL) ? - key_column_names : all_column_names; - NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlg::VERTICAL) ? - key_column_names_and_types : all_column_names_and_types; + MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical) + ? &merged_rows_sources : nullptr; + Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical) + ? key_column_names : all_column_names; + NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlgorithm::Vertical) + ? key_column_names_and_types : all_column_names_and_types; ColumnSizeEstimator column_sizes(parts, key_column_names, ordinary_column_names); @@ -493,7 +504,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i], MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); - input->setProgressCallback(MergeProgressCallBack{merge_entry, merge_alg, sum_rows_upper_bound, column_sizes}); + input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes}); if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) src_streams.emplace_back(std::make_shared( @@ -569,14 +580,18 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart rows_written += block.rows(); to.write(block); - merge_entry->rows_written = merged_stream->getProfileInfo().rows; - merge_entry->rows_with_key_columns_written += merged_stream->getProfileInfo().rows; + if (merge_alg == MergeAlgorithm::Horizontal) + merge_entry->rows_written = merged_stream->getProfileInfo().rows; + merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows; merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; - /// This update is unactual for VERTICAL alg sicne it requires more accurate per-column updates + /// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements - if (disk_reservation && merge_alg == MergeAlg::BASIC) - disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_upper_bound)) * initial_reservation)); + if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal) + { + Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound); + disk_reservation->update(static_cast((1. - relative_rows_written) * initial_reservation)); + } } if (isCancelled()) @@ -584,12 +599,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart MergeTreeData::DataPart::Checksums checksums_ordinary_columns; - /// Gather ordinary rows - if (merge_alg == MergeAlg::VERTICAL) + /// Gather ordinary columns + if (merge_alg == MergeAlgorithm::Vertical) { - size_t sum_rows_exact = merge_entry->rows_with_key_columns_written; + size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read; merge_entry->columns_written = key_column_names.size(); - merge_entry->progress = column_sizes.keyColumnsProgress(sum_rows_exact, sum_rows_exact); + merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact); BlockInputStreams column_part_streams(parts.size()); auto it_name_and_type = ordinary_column_names_and_types.cbegin(); @@ -599,6 +614,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart const String & column_name = ordinary_column_names[column_num]; Names column_name_(1, column_name); NamesAndTypesList column_name_and_type_(1, *it_name_and_type++); + Float64 progress_before = merge_entry->progress; LOG_TRACE(log, "Gathering column " << column_name << " " << column_name_and_type_.front().type->getName()); @@ -613,7 +629,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart false, true); column_part_stream->setProgressCallback( - MergeProgressCallbackOrdinary{merge_entry, sum_rows_exact, column_sizes, column_name}); + MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name}); column_part_streams[part_num] = std::move(column_part_stream); } @@ -626,17 +642,21 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { column_to.write(block); } - /// NOTE: nested column contains duplicates checksums (and files) checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums()); merge_entry->columns_written = key_column_names.size() + column_num; + merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; + merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact); + + if (isCancelled()) + throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); } } merged_stream->readSuffix(); new_data_part->columns = all_column_names_and_types; - if (merge_alg != MergeAlg::VERTICAL) + if (merge_alg != MergeAlgorithm::Vertical) new_data_part->checksums = to.writeSuffixAndGetChecksums(); else new_data_part->checksums = to.writeSuffixAndGetChecksums(all_column_names_and_types, &checksums_ordinary_columns); @@ -655,7 +675,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart } -MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( +MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const { @@ -670,9 +690,9 @@ MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS; auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ? - MergeAlg::VERTICAL : MergeAlg::BASIC; + MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal; - if (merge_alg == MergeAlg::VERTICAL) + if (merge_alg == MergeAlgorithm::Vertical) { try { @@ -681,11 +701,11 @@ MergeTreeDataMerger::MergeAlg MergeTreeDataMerger::chooseMergingAlg( catch (...) { /// Not enough memory for VERTICAL merge algorithm, make sense for very large tables - merge_alg = MergeAlg::BASIC; + merge_alg = MergeAlgorithm::Horizontal; } } - LOG_DEBUG(log, "Selected MergeAlg: " << ((merge_alg == MergeAlg::VERTICAL) ? "VERTICAL" : "BASIC")); + LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Basic")); return merge_alg; } diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 293fe0f4b22..6cfe8c59bef 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -691,7 +691,7 @@ BlockInputStreams StorageLog::read( max_block_size, column_names, *this, - 0, std::numeric_limits::max(), + 0, marksCount() ? std::numeric_limits::max() : 0, settings.max_read_buffer_size)); } else diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index e19a2ef7c0e..d2aa40d158a 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -25,8 +25,9 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) { "rows_read", std::make_shared() }, { "bytes_written_uncompressed", std::make_shared() }, { "rows_written", std::make_shared() }, - { "rows_with_key_columns_written", std::make_shared() }, - { "columns_written", std::make_shared() } + { "columns_written", std::make_shared() }, + { "rows_with_key_columns_read", std::make_shared() }, + { "rows_with_key_columns_written", std::make_shared() } } { } @@ -60,8 +61,9 @@ BlockInputStreams StorageSystemMerges::read( ColumnWithTypeAndName col_rows_read{std::make_shared(), std::make_shared(), "rows_read"}; ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared(), std::make_shared(), "bytes_written_uncompressed"}; ColumnWithTypeAndName col_rows_written{std::make_shared(), std::make_shared(), "rows_written"}; - ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared(), std::make_shared(), "rows_with_key_columns_written"}; ColumnWithTypeAndName col_columns_written{std::make_shared(), std::make_shared(), "columns_written"}; + ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared(), std::make_shared(), "rows_with_key_columns_read"}; + ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared(), std::make_shared(), "rows_with_key_columns_written"}; for (const auto & merge : context.getMergeList().get()) { @@ -77,8 +79,9 @@ BlockInputStreams StorageSystemMerges::read( col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed)); col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed)); - col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed)); col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed)); + col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed)); + col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed)); } Block block{ @@ -93,7 +96,10 @@ BlockInputStreams StorageSystemMerges::read( col_bytes_read_uncompressed, col_rows_read, col_bytes_written_uncompressed, - col_rows_written + col_rows_written, + col_columns_written, + col_rows_with_key_columns_read, + col_rows_with_key_columns_written }; return BlockInputStreams{1, std::make_shared(block)}; From ef593d1b0194a2c206c8a3e1e0efc44d17571384 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 23 Nov 2016 14:57:56 +0300 Subject: [PATCH 4/6] Added merge_tree.enable_vertical_merge_algorithm setting. --- dbms/include/DB/Core/SortDescription.h | 2 +- dbms/include/DB/DataStreams/ColumnGathererStream.h | 7 +++++-- dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h | 4 ++++ dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp | 7 +++++-- dbms/src/Storages/System/StorageSystemMerges.cpp | 2 +- 5 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index 1573dc645fd..daeaf8db2de 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -52,7 +52,7 @@ struct SortCursorImpl * Order is determined by number of cursor. * * Cursor number (always?) equals to number of merging part. - * Therefore this filed can be used to determine part number of current row (see ColumnGathererStream). + * Therefore this field can be used to determine part number of current row (see ColumnGathererStream). */ size_t order; diff --git a/dbms/include/DB/DataStreams/ColumnGathererStream.h b/dbms/include/DB/DataStreams/ColumnGathererStream.h index 5773d7080da..95be17fe032 100644 --- a/dbms/include/DB/DataStreams/ColumnGathererStream.h +++ b/dbms/include/DB/DataStreams/ColumnGathererStream.h @@ -11,8 +11,11 @@ namespace DB struct __attribute__((__packed__)) RowSourcePart { - unsigned int flag: 1; - unsigned int source_id: 7; + /// Sequence of members is important to use RowSourcePart * as UInt8 * if flag = false + UInt8 source_id: 7; + UInt8 flag: 1; + + RowSourcePart() = default; RowSourcePart(unsigned source_id_, bool flag_ = false) { diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index 9b03fd791ef..946f3077956 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -90,6 +90,9 @@ struct MergeTreeSettings /// Minimal absolute delay to close, stop serving requests and not return Ok during status check. size_t min_absolute_delay_to_close = 0; + /// Enable usage of Vertical merge algorithm. + size_t enable_vertical_merge_algorithm = 0; + void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) { @@ -124,6 +127,7 @@ struct MergeTreeSettings SET_SIZE_T(min_relative_delay_to_yield_leadership); SET_SIZE_T(min_relative_delay_to_close); SET_SIZE_T(min_absolute_delay_to_close); + SET_SIZE_T(enable_vertical_merge_algorithm); #undef SET_SIZE_T #undef SET_DOUBLE diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 8a9ce7c41a2..1efe4e8ba1c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -482,6 +482,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart MergedRowSources merged_rows_sources; MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources); + LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); + MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical) ? &merged_rows_sources : nullptr; Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical) @@ -679,6 +681,9 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const { + if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0) + return MergeAlgorithm::Horizontal; + bool is_supported_storage = data.merging_params.mode == MergeTreeData::MergingParams::Ordinary || data.merging_params.mode == MergeTreeData::MergingParams::Collapsing; @@ -705,8 +710,6 @@ MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( } } - LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Basic")); - return merge_alg; } diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index d2aa40d158a..4bdce11acc0 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -70,7 +70,7 @@ BlockInputStreams StorageSystemMerges::read( col_database.column->insert(merge.database); col_table.column->insert(merge.table); col_elapsed.column->insert(merge.watch.elapsedSeconds()); - col_progress.column->insert(merge.progress); + col_progress.column->insert(std::min(1., merge.progress)); /// little cheat col_num_parts.column->insert(merge.num_parts); col_result_part_name.column->insert(merge.result_part_name); col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed); From fe891eb19894effa4242cf322ad446c3f37f54b1 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Wed, 23 Nov 2016 16:09:29 +0300 Subject: [PATCH 5/6] Removed extra check. --- dbms/src/DataStreams/MergingSortedBlockInputStream.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index a7e395e28fa..4f14b5465c1 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -239,15 +239,6 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs if (out_row_sources) { /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = 0; - for (; source_num < cursors.size(); ++source_num) - if (&cursors[source_num] == current.impl) - break; - - /// TODO: This check can be removed after testing - if (source_num != current.impl->order) - throw Exception("Developer's logical error", ErrorCodes::LOGICAL_ERROR); - out_row_sources->emplace_back(current.impl->order); } From 544ef67c289f2b97c2fbb79029c1f4caed2fb375 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Thu, 24 Nov 2016 15:26:47 +0300 Subject: [PATCH 6/6] Fixed IN usage inside PK expressions. Refactoring of type conversion functions convertFieldToType(). --- dbms/include/DB/Interpreters/Set.h | 2 +- dbms/src/Core/FieldVisitors.cpp | 2 + dbms/src/Interpreters/Set.cpp | 2 +- dbms/src/Interpreters/convertFieldToType.cpp | 44 +++------ dbms/src/Storages/MergeTree/PKCondition.cpp | 97 +++++++------------ .../0_stateless/00386_enum_in_pk.reference | 2 + .../queries/0_stateless/00386_enum_in_pk.sql | 3 + 7 files changed, 60 insertions(+), 92 deletions(-) diff --git a/dbms/include/DB/Interpreters/Set.h b/dbms/include/DB/Interpreters/Set.h index 0a1f50ba5fd..9b813452e63 100644 --- a/dbms/include/DB/Interpreters/Set.h +++ b/dbms/include/DB/Interpreters/Set.h @@ -283,7 +283,7 @@ public: * node - это список значений: 1, 2, 3 или список tuple-ов: (1, 2), (3, 4), (5, 6). * create_ordered_set - создавать ли вектор упорядоченных элементов. Нужен для работы индекса */ - void createFromAST(DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set); + void createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set); // Возвращает false, если превышено какое-нибудь ограничение, и больше не нужно вставлять. bool insertFromBlock(const Block & block, bool create_ordered_set = false); diff --git a/dbms/src/Core/FieldVisitors.cpp b/dbms/src/Core/FieldVisitors.cpp index c298be632fd..e3a5eccb437 100644 --- a/dbms/src/Core/FieldVisitors.cpp +++ b/dbms/src/Core/FieldVisitors.cpp @@ -117,6 +117,8 @@ DayNum_t stringToDate(const String & s) DayNum_t date{}; readDateText(date, in); + if (!in.eof()) + throw Exception("String is too long for Date: " + s); return date; } diff --git a/dbms/src/Interpreters/Set.cpp b/dbms/src/Interpreters/Set.cpp index 16e02a9f222..116b9299e68 100644 --- a/dbms/src/Interpreters/Set.cpp +++ b/dbms/src/Interpreters/Set.cpp @@ -279,7 +279,7 @@ static Field extractValueFromNode(ASTPtr & node, const IDataType & type, const C } -void Set::createFromAST(DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set) +void Set::createFromAST(const DataTypes & types, ASTPtr node, const Context & context, bool create_ordered_set) { data_types = types; diff --git a/dbms/src/Interpreters/convertFieldToType.cpp b/dbms/src/Interpreters/convertFieldToType.cpp index 8ab607f5a3b..3b66ec71e5b 100644 --- a/dbms/src/Interpreters/convertFieldToType.cpp +++ b/dbms/src/Interpreters/convertFieldToType.cpp @@ -9,6 +9,8 @@ #include #include +#include + #include @@ -74,19 +76,12 @@ Field convertFieldToType(const Field & src, const IDataType & type) const bool is_date = typeid_cast(&type); bool is_datetime = false; - bool is_enum8 = false; - bool is_enum16 = false; + bool is_enum = false; if (!is_date) if (!(is_datetime = typeid_cast(&type))) - if (!(is_enum8 = typeid_cast(&type))) - if (!(is_enum16 = typeid_cast(&type))) - throw Exception{ - "Logical error: unknown numeric type " + type.getName(), - ErrorCodes::LOGICAL_ERROR - }; - - const auto is_enum = is_enum8 || is_enum16; + if (!(is_enum = dynamic_cast(&type))) + throw Exception{"Logical error: unknown numeric type " + type.getName(), ErrorCodes::LOGICAL_ERROR}; /// Numeric values for Enums should not be used directly in IN section if (src.getType() == Field::Types::UInt64 && !is_enum) @@ -94,32 +89,21 @@ Field convertFieldToType(const Field & src, const IDataType & type) if (src.getType() == Field::Types::String) { - /// Возможность сравнивать даты и даты-с-временем со строкой. - const String & str = src.get(); - ReadBufferFromString in(str); - if (is_date) { - DayNum_t date{}; - readDateText(date, in); - if (!in.eof()) - throw Exception("String is too long for Date: " + str); - - return Field(UInt64(date)); + /// Convert 'YYYY-MM-DD' Strings to Date + return UInt64(stringToDate(src.get())); } else if (is_datetime) { - time_t date_time{}; - readDateTimeText(date_time, in); - if (!in.eof()) - throw Exception("String is too long for DateTime: " + str); - - return Field(UInt64(date_time)); + /// Convert 'YYYY-MM-DD hh:mm:ss' Strings to DateTime + return stringToDateTime(src.get()); + } + else if (is_enum) + { + /// Convert String to Enum's value + return dynamic_cast(type).castToValue(src); } - else if (is_enum8) - return Field(UInt64(static_cast(type).getValue(str))); - else if (is_enum16) - return Field(UInt64(static_cast(type).getValue(str))); } throw Exception("Type mismatch in IN or VALUES section: " + type.getName() + " expected, " diff --git a/dbms/src/Storages/MergeTree/PKCondition.cpp b/dbms/src/Storages/MergeTree/PKCondition.cpp index afb35854b0b..bd2df6bbf69 100644 --- a/dbms/src/Storages/MergeTree/PKCondition.cpp +++ b/dbms/src/Storages/MergeTree/PKCondition.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -131,7 +132,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ }, { "in", - [] (RPNElement & out, const Field & value, ASTPtr & node) + [] (RPNElement & out, const Field &, ASTPtr & node) { out.function = RPNElement::FUNCTION_IN_SET; out.in_function = node; @@ -140,7 +141,7 @@ const PKCondition::AtomMap PKCondition::atom_map{ }, { "notIn", - [] (RPNElement & out, const Field & value, ASTPtr & node) + [] (RPNElement & out, const Field &, ASTPtr & node) { out.function = RPNElement::FUNCTION_NOT_IN_SET; out.in_function = node; @@ -239,23 +240,26 @@ bool PKCondition::addCondition(const String & column, const Range & range) return true; } -/** Получить значение константного выражения. - * Вернуть false, если выражение не константно. +/** Computes value of constant expression and it data type. + * Returns false, if expression isn't constant. */ -static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & value) +static bool getConstant(const ASTPtr & expr, Block & block_with_constants, Field & out_value, DataTypePtr & out_type) { String column_name = expr->getColumnName(); - if (const ASTLiteral * lit = typeid_cast(&*expr)) + if (const ASTLiteral * lit = typeid_cast(expr.get())) { - /// литерал - value = lit->value; + /// Simple literal + out_value = lit->value; + out_type = block_with_constants.getByName(column_name).type; return true; } else if (block_with_constants.has(column_name) && block_with_constants.getByName(column_name).column->isConst()) { - /// выражение, вычислившееся в константу - value = (*block_with_constants.getByName(column_name).column)[0]; + /// An expression which is dependent on constants only + const auto & expr_info = block_with_constants.getByName(column_name); + out_value = (*expr_info.column)[0]; + out_type = expr_info.type; return true; } else @@ -362,46 +366,23 @@ bool PKCondition::isPrimaryKeyPossiblyWrappedByMonotonicFunctionsImpl( } -/// NOTE: Keep in the mind that such behavior could be incompatible inside ordinary expression. -/// TODO: Use common methods for types conversions. -static bool tryCastValueToType(const DataTypePtr & desired_type, const DataTypePtr & src_type, Field & src_value) +static void castValueToType(const DataTypePtr & desired_type, Field & src_value, const DataTypePtr & src_type, const ASTPtr & node) { if (desired_type->getName() == src_type->getName()) - return true; + return; - /// Try to correct type of constant for correct comparison try { - /// Convert String to Enum's value - if (auto data_type_enum = dynamic_cast(desired_type.get())) - { - src_value = data_type_enum->castToValue(src_value); - } - /// Convert 'YYYY-MM-DD' Strings to Date - else if (typeid_cast(desired_type.get()) && typeid_cast(src_type.get())) - { - src_value = UInt64(stringToDate(src_value.safeGet())); - } - /// Convert 'YYYY-MM-DD hh:mm:ss' Strings to DateTime - else if (typeid_cast(desired_type.get()) && typeid_cast(src_type.get())) - { - src_value = stringToDateTime(src_value.safeGet()); - } - else if (desired_type->behavesAsNumber() && src_type->behavesAsNumber()) - { - /// Ok, numeric types are almost mutually convertible - } - else - { - return false; - } + /// NOTE: We don't need accurate info about src_type at this moment + src_value = convertFieldToType(src_value, *desired_type); } catch (...) { - return false; + throw Exception("Primary key expression contains comparison between inconvertible types: " + + desired_type->getName() + " and " + src_type->getName() + + " inside " + DB::toString(node->range), + ErrorCodes::BAD_TYPE_OF_FIELD); } - - return true; } @@ -411,8 +392,9 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl * либо он же, завёрнутый в цепочку возможно-монотонных функций, * либо константное выражение - число. */ - Field value; - if (const ASTFunction * func = typeid_cast(&*node)) + Field const_value; + DataTypePtr const_type; + if (const ASTFunction * func = typeid_cast(node.get())) { const ASTs & args = typeid_cast(*func->arguments).children; @@ -423,13 +405,14 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl size_t key_arg_pos; /// Position of argument with primary key column (non-const argument) size_t key_column_num; /// Number of a primary key column (inside sort_descr array) RPNElement::MonotonicFunctionsChain chain; + bool is_set_const = false; - if (getConstant(args[1], block_with_constants, value) + if (getConstant(args[1], block_with_constants, const_value, const_type) && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain)) { key_arg_pos = 0; } - else if (getConstant(args[0], block_with_constants, value) + else if (getConstant(args[0], block_with_constants, const_value, const_type) && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[1], context, key_column_num, key_expr_type, chain)) { key_arg_pos = 1; @@ -438,6 +421,7 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl && isPrimaryKeyPossiblyWrappedByMonotonicFunctions(args[0], context, key_column_num, key_expr_type, chain)) { key_arg_pos = 0; + is_set_const = true; } else return false; @@ -469,26 +453,19 @@ bool PKCondition::atomFromAST(ASTPtr & node, const Context & context, Block & bl if (atom_it == std::end(atom_map)) return false; - const DataTypePtr & const_type = block_with_constants.getByName(args[1 - key_arg_pos]->getColumnName()).type; + if (!is_set_const) /// Set args are already casted inside Set::createFromAST + castValueToType(key_expr_type, const_value, const_type, node); - if (!tryCastValueToType(key_expr_type, const_type, value)) - { - throw Exception("Primary key expression contains comparison between inconvertible types: " + - key_expr_type->getName() + " and " + const_type->getName() + - " inside " + DB::toString(func->range), - ErrorCodes::BAD_TYPE_OF_FIELD); - } - - return atom_it->second(out, value, node); + return atom_it->second(out, const_value, node); } - else if (getConstant(node, block_with_constants, value)) /// Для случаев, когда написано, например, WHERE 0 AND something + else if (getConstant(node, block_with_constants, const_value, const_type)) /// Для случаев, когда написано, например, WHERE 0 AND something { - if (value.getType() == Field::Types::UInt64 - || value.getType() == Field::Types::Int64 - || value.getType() == Field::Types::Float64) + if (const_value.getType() == Field::Types::UInt64 + || const_value.getType() == Field::Types::Int64 + || const_value.getType() == Field::Types::Float64) { /// Ноль во всех типах представлен в памяти так же, как в UInt64. - out.function = value.get() + out.function = const_value.get() ? RPNElement::ALWAYS_TRUE : RPNElement::ALWAYS_FALSE; diff --git a/dbms/tests/queries/0_stateless/00386_enum_in_pk.reference b/dbms/tests/queries/0_stateless/00386_enum_in_pk.reference index e45632d7fd9..858aba970af 100644 --- a/dbms/tests/queries/0_stateless/00386_enum_in_pk.reference +++ b/dbms/tests/queries/0_stateless/00386_enum_in_pk.reference @@ -20,5 +20,7 @@ 3447905173014179293 3051197876967004596 3051197876967004596 +3051197876967004596 +3051197876967004596 463667963421364848 463667963421364848 diff --git a/dbms/tests/queries/0_stateless/00386_enum_in_pk.sql b/dbms/tests/queries/0_stateless/00386_enum_in_pk.sql index 0cefdae9815..2aa89c4636f 100644 --- a/dbms/tests/queries/0_stateless/00386_enum_in_pk.sql +++ b/dbms/tests/queries/0_stateless/00386_enum_in_pk.sql @@ -35,5 +35,8 @@ SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE 1 = 1; SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x = '0' OR x = '1'); SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d = '0' OR d = '1'); +SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE x IN ('0', '1'); +SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE d IN ('0', '1'); + SELECT cityHash64(groupArray(x)) FROM test.enum_pk WHERE (x != '0' AND x != '1'); SELECT cityHash64(groupArray(d)) FROM test.enum_pk WHERE (d != '0' AND d != '1');