From 8c2e228856499324af660786b5f0a8a5716dbc45 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Thu, 24 Nov 2016 23:08:54 +0400 Subject: [PATCH] Revert "New "vertical" algorithm for MergeTrees' parts merges" --- dbms/CMakeLists.txt | 2 - dbms/include/DB/Core/SortDescription.h | 15 +- .../CollapsingSortedBlockInputStream.h | 12 +- .../DB/DataStreams/ColumnGathererStream.h | 81 ----- .../MergingSortedBlockInputStream.h | 13 +- .../include/DB/Storages/MergeTree/MergeList.h | 20 +- .../MergeTree/MergeTreeBlockInputStream.h | 2 +- .../DB/Storages/MergeTree/MergeTreeData.h | 8 +- .../Storages/MergeTree/MergeTreeDataMerger.h | 14 - .../DB/Storages/MergeTree/MergeTreeDataPart.h | 2 - .../DB/Storages/MergeTree/MergeTreeSettings.h | 4 - .../MergeTree/MergedBlockOutputStream.h | 21 +- .../CollapsingSortedBlockInputStream.cpp | 27 +- dbms/src/DataStreams/ColumnGathererStream.cpp | 102 ------ .../MergingSortedBlockInputStream.cpp | 18 +- .../MergeTree/MergeTreeBlockInputStream.cpp | 3 +- .../MergeTree/MergeTreeDataMerger.cpp | 336 ++---------------- .../Storages/MergeTree/MergeTreeDataPart.cpp | 8 - dbms/src/Storages/StorageLog.cpp | 2 +- .../Storages/System/StorageSystemMerges.cpp | 18 +- 20 files changed, 71 insertions(+), 637 deletions(-) delete mode 100644 dbms/include/DB/DataStreams/ColumnGathererStream.h delete mode 100644 dbms/src/DataStreams/ColumnGathererStream.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 394bf659be5..6890068af72 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -298,7 +298,6 @@ add_library (dbms include/DB/DataStreams/SquashingTransform.h include/DB/DataStreams/SquashingBlockInputStream.h include/DB/DataStreams/SquashingBlockOutputStream.h - include/DB/DataStreams/ColumnGathererStream.h include/DB/DataTypes/IDataType.h include/DB/DataTypes/IDataTypeDummy.h include/DB/DataTypes/DataTypeSet.h @@ -809,7 +808,6 @@ add_library (dbms src/DataStreams/SquashingTransform.cpp src/DataStreams/SquashingBlockInputStream.cpp src/DataStreams/SquashingBlockOutputStream.cpp - src/DataStreams/ColumnGathererStream.cpp src/DataTypes/DataTypeString.cpp src/DataTypes/DataTypeFixedString.cpp diff --git a/dbms/include/DB/Core/SortDescription.h b/dbms/include/DB/Core/SortDescription.h index daeaf8db2de..d4fbe234e1e 100644 --- a/dbms/include/DB/Core/SortDescription.h +++ b/dbms/include/DB/Core/SortDescription.h @@ -35,9 +35,9 @@ struct SortColumnDescription using SortDescription = std::vector; -/** Cursor allows to compare rows in different blocks (and parts). - * Cursor moves inside single block. - * It is used in priority queue. +/** Курсор, позволяющий сравнивать соответствующие строки в разных блоках. + * Курсор двигается по одному блоку. + * Для использования в priority queue. */ struct SortCursorImpl { @@ -48,17 +48,14 @@ struct SortCursorImpl size_t pos = 0; size_t rows = 0; - /** Determines order if comparing columns are equal. - * Order is determined by number of cursor. - * - * Cursor number (always?) equals to number of merging part. - * Therefore this field can be used to determine part number of current row (see ColumnGathererStream). + /** Порядок (что сравнивается), если сравниваемые столбцы равны. + * Даёт возможность предпочитать строки из нужного курсора. */ size_t order; using NeedCollationFlags = std::vector; - /** Should we use Collator to sort a column? */ + /** Нужно ли использовать Collator для сортировки столбца */ NeedCollationFlags need_collation; /** Есть ли хотя бы один столбец с Collator. */ diff --git a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h index da9c6bacfbf..3dba4cb00c3 100644 --- a/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/CollapsingSortedBlockInputStream.h @@ -24,8 +24,8 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream { public: CollapsingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, - const String & sign_column_, size_t max_block_size_, MergedRowSources * out_row_sources_ = nullptr) - : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_), + const String & sign_column_, size_t max_block_size_) + : MergingSortedBlockInputStream(inputs_, description_, max_block_size_), sign_column(sign_column_) { } @@ -62,7 +62,7 @@ private: /// Прочитали до конца. bool finished = false; - RowRef current_key; /// Текущий первичный ключ. + RowRef current_key; /// Текущий первичный ключ. RowRef next_key; /// Первичный ключ следующей строки. RowRef first_negative; /// Первая отрицательная строка для текущего первичного ключа. @@ -77,12 +77,6 @@ private: size_t blocks_written = 0; - /// Fields specific for VERTICAL merge algorithm - size_t current_pos = 0; /// Global row number of current key - size_t first_negative_pos = 0; /// Global row number of first_negative - size_t last_positive_pos = 0; /// Global row number of last_positive - size_t last_negative_pos = 0; /// Global row number of last_negative - /** Делаем поддержку двух разных курсоров - с Collation и без. * Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций. */ diff --git a/dbms/include/DB/DataStreams/ColumnGathererStream.h b/dbms/include/DB/DataStreams/ColumnGathererStream.h deleted file mode 100644 index 95be17fe032..00000000000 --- a/dbms/include/DB/DataStreams/ColumnGathererStream.h +++ /dev/null @@ -1,81 +0,0 @@ -#pragma once - -#include -#include -#include - - -namespace DB -{ - - -struct __attribute__((__packed__)) RowSourcePart -{ - /// Sequence of members is important to use RowSourcePart * as UInt8 * if flag = false - UInt8 source_id: 7; - UInt8 flag: 1; - - RowSourcePart() = default; - - RowSourcePart(unsigned source_id_, bool flag_ = false) - { - source_id = source_id_; - flag = flag_; - } - - static constexpr size_t MAX_PARTS = 127; -}; - -using MergedRowSources = PODArray; - - -/** Gather single stream from multiple streams according to streams mask. - * Stream mask maps row number to index of source stream. - * Streams should conatin exactly one column. - */ -class ColumnGathererStream : public IProfilingBlockInputStream -{ -public: - ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources& pos_to_source_idx_, - size_t block_size_ = DEFAULT_BLOCK_SIZE); - - String getName() const override { return "ColumnGatherer"; } - - String getID() const override; - - Block readImpl() override; - -private: - - const MergedRowSources & pos_to_source_idx; - - /// Cache required fileds - struct Source - { - const IColumn * column; - size_t pos; - size_t size; - Block block; - - Source(Block && block_) : block(std::move(block_)) - { - update(); - } - - void update() - { - column = block.getByPosition(0).column.get(); - size = block.rowsInFirstColumn(); - pos = 0; - } - }; - - std::vector sources; - - size_t pos_global = 0; - size_t block_size; - - Logger * log = &Logger::get("ColumnGathererStream"); -}; - -} diff --git a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h index 223c71edf99..49ad3185720 100644 --- a/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/include/DB/DataStreams/MergingSortedBlockInputStream.h @@ -9,7 +9,6 @@ #include #include -#include namespace DB @@ -57,12 +56,10 @@ inline void intrusive_ptr_release(detail::SharedBlock * ptr) class MergingSortedBlockInputStream : public IProfilingBlockInputStream { public: - /// limit - if isn't 0, then we can produce only first limit rows in sorted order. - /// out_row_sources - if isn't nullptr, then at the end of execution it should contain part numbers of each readed row (and needed flag) - MergingSortedBlockInputStream(BlockInputStreams & inputs_, const SortDescription & description_, - size_t max_block_size_, size_t limit_ = 0, MergedRowSources * out_row_sources_ = nullptr) + /// limit - если не 0, то можно выдать только первые limit строк в сортированном порядке. + MergingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, size_t limit_ = 0) : description(description_), max_block_size(max_block_size_), limit(limit_), - source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_) + source_blocks(inputs_.size()), cursors(inputs_.size()) { children.insert(children.end(), inputs_.begin(), inputs_.end()); } @@ -161,10 +158,6 @@ protected: using QueueWithCollation = std::priority_queue; QueueWithCollation queue_with_collation; - /// Used in Vertical merge algorithm to gather non-PK columns (on next step) - /// If it is not nullptr then it should be populated during execution - MergedRowSources * out_row_sources = nullptr; - /// Эти методы используются в Collapsing/Summing/Aggregating... SortedBlockInputStream-ах. diff --git a/dbms/include/DB/Storages/MergeTree/MergeList.h b/dbms/include/DB/Storages/MergeTree/MergeList.h index 28b09100da5..251be557160 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeList.h +++ b/dbms/include/DB/Storages/MergeTree/MergeList.h @@ -32,21 +32,10 @@ struct MergeInfo UInt64 total_size_bytes_compressed{}; UInt64 total_size_marks{}; std::atomic bytes_read_uncompressed{}; - std::atomic bytes_written_uncompressed{}; - - /// Updated only for Horizontal algorithm std::atomic rows_read{}; + std::atomic bytes_written_uncompressed{}; std::atomic rows_written{}; - /// Updated only for Vertical algorithm - /// mutually exclusive with rows_read and rows_written, updated either rows_written either columns_written - std::atomic columns_written{}; - - /// Updated in both cases - /// Number of rows for which primary key columns have been written - std::atomic rows_with_key_columns_read{}; - std::atomic rows_with_key_columns_written{}; - MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name) : database{database}, table{table}, result_part_name{result_part_name} @@ -63,12 +52,9 @@ struct MergeInfo total_size_bytes_compressed(other.total_size_bytes_compressed), total_size_marks(other.total_size_marks), bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)), - bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)), rows_read(other.rows_read.load(std::memory_order_relaxed)), - rows_written(other.rows_written.load(std::memory_order_relaxed)), - columns_written(other.columns_written.load(std::memory_order_relaxed)), - rows_with_key_columns_read(other.rows_with_key_columns_read.load(std::memory_order_relaxed)), - rows_with_key_columns_written(other.rows_with_key_columns_written.load(std::memory_order_relaxed)) + bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)), + rows_written(other.rows_written.load(std::memory_order_relaxed)) { } }; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h index f5a52a81e7d..e5bcbd70a82 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeBlockInputStream.h @@ -24,7 +24,7 @@ public: const MarkRanges & mark_ranges_, bool use_uncompressed_cache_, ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns, size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_, - bool save_marks_in_cache_, bool quiet = false); + bool save_marks_in_cache_); ~MergeTreeBlockInputStream() override; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index ffc09973378..e1911c09875 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -55,11 +55,11 @@ namespace ErrorCodes * Структура файлов: * / min-date _ max-date _ min-id _ max-id _ level / - директория с куском. * Внутри директории с куском: - * checksums.txt - содержит список файлов с их размерами и контрольными суммами. - * columns.txt - содержит список столбцов с их типами. + * checksums.txt - список файлов с их размерами и контрольными суммами. + * columns.txt - список столбцов с их типами. * primary.idx - индексный файл. - * [Column].bin - данные столбца - * [Column].mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. + * Column.bin - данные столбца + * Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк. * * Имеется несколько режимов работы, определяющих, что делать при мердже: * - Ordinary - ничего дополнительно не делать; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index a955d307fbf..0970328d171 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -9,7 +9,6 @@ namespace DB { class MergeListEntry; -class MergeProgressCallback; struct ReshardingJob; @@ -121,19 +120,6 @@ public: bool isCancelled() const { return cancelled > 0; } -public: - - enum class MergeAlgorithm - { - Horizontal, /// per-row merge of all columns - Vertical /// per-row merge of PK columns, per-column gather for non-PK columns - }; - -private: - - MergeAlgorithm chooseMergeAlgorithm(const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, - size_t rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const; - private: MergeTreeData & data; const BackgroundProcessingPool & pool; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h index 2e6780d8bda..32fd1ea4963 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataPart.h @@ -46,8 +46,6 @@ struct MergeTreeDataPartChecksums void addFile(const String & file_name, size_t file_size, uint128 file_hash); - void add(MergeTreeDataPartChecksums && rhs_checksums); - /// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение. /// Если have_uncompressed, для сжатых файлов сравнивает чексуммы разжатых данных. Иначе сравнивает только чексуммы файлов. void checkEqual(const MergeTreeDataPartChecksums & rhs, bool have_uncompressed) const; diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h index 946f3077956..9b03fd791ef 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeSettings.h @@ -90,9 +90,6 @@ struct MergeTreeSettings /// Minimal absolute delay to close, stop serving requests and not return Ok during status check. size_t min_absolute_delay_to_close = 0; - /// Enable usage of Vertical merge algorithm. - size_t enable_vertical_merge_algorithm = 0; - void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config) { @@ -127,7 +124,6 @@ struct MergeTreeSettings SET_SIZE_T(min_relative_delay_to_yield_leadership); SET_SIZE_T(min_relative_delay_to_close); SET_SIZE_T(min_absolute_delay_to_close); - SET_SIZE_T(enable_vertical_merge_algorithm); #undef SET_SIZE_T #undef SET_DOUBLE diff --git a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h index ab9a20d732a..17b22048ad0 100644 --- a/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergedBlockOutputStream.h @@ -33,7 +33,6 @@ public: { } - protected: using OffsetColumns = std::set; @@ -307,16 +306,11 @@ public: throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED); } - MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums( - const NamesAndTypesList & total_column_list, - MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr) + MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums() { /// Заканчиваем запись и достаем чексуммы. MergeTreeData::DataPart::Checksums checksums; - if (additional_column_checksums) - checksums = std::move(*additional_column_checksums); - if (storage.merging_params.mode != MergeTreeData::MergingParams::Unsorted) { index_stream->next(); @@ -325,10 +319,10 @@ public: index_stream = nullptr; } - for (auto & column_stream : column_streams) + for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it) { - column_stream.second->finalize(); - column_stream.second->addToChecksums(checksums); + it->second->finalize(); + it->second->addToChecksums(checksums); } column_streams.clear(); @@ -344,7 +338,7 @@ public: { /// Записываем файл с описанием столбцов. WriteBufferFromFile out(part_path + "columns.txt", 4096); - total_column_list.writeText(out); + columns_list.writeText(out); } { @@ -356,11 +350,6 @@ public: return checksums; } - MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums() - { - return writeSuffixAndGetChecksums(columns_list, nullptr); - } - MergeTreeData::DataPart::Index & getIndex() { return index_columns; diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index 04ffb48ce2f..b3d80677fe3 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -55,13 +55,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num); - - if (out_row_sources) - { - /// true flag value means "skip row" - out_row_sources->data()[last_positive_pos].flag = false; - out_row_sources->data()[last_negative_pos].flag = false; - } } return; } @@ -71,9 +64,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num); - - if (out_row_sources) - out_row_sources->data()[first_negative_pos].flag = false; } if (count_positive >= count_negative) @@ -81,9 +71,6 @@ void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_colum ++merged_rows; for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num); - - if (out_row_sources) - out_row_sources->data()[last_positive_pos].flag = false; } if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1)) @@ -136,7 +123,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s size_t merged_rows = 0; /// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size - for (; !queue.empty(); ++current_pos) + while (!queue.empty()) { TSortCursor current = queue.top(); @@ -162,10 +149,6 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s queue.pop(); - /// Initially, skip all rows. On insert, unskip "corner" rows. - if (out_row_sources) - out_row_sources->emplace_back(current.impl->order, true); - if (key_differs) { /// Запишем данные для предыдущего первичного ключа. @@ -183,21 +166,13 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s last_is_positive = true; setRowRef(last_positive, current); - last_positive_pos = current_pos; } else if (sign == -1) { if (!count_negative) - { setRowRef(first_negative, current); - first_negative_pos = current_pos; - } - if (!blocks_written && !merged_rows) - { setRowRef(last_negative, current); - last_negative_pos = current_pos; - } ++count_negative; last_is_positive = false; diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp deleted file mode 100644 index 199b81e774b..00000000000 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int INCOMPATIBLE_COLUMNS; - extern const int INCORRECT_NUMBER_OF_COLUMNS; - extern const int EMPTY_DATA_PASSED; - extern const int RECEIVED_EMPTY_DATA; -} - -ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_streams, const MergedRowSources & pos_to_source_idx_, size_t block_size_) -: pos_to_source_idx(pos_to_source_idx_), block_size(block_size_) -{ - if (source_streams.empty()) - throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED); - - children.assign(source_streams.begin(), source_streams.end()); - - sources.reserve(children.size()); - for (size_t i = 0; i < children.size(); i++) - { - sources.emplace_back(children[i]->read()); - - Block & block = sources.back().block; - - if (block.columns() != 1) - throw Exception("Stream should contain exactly one column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS); - - if (block.getByPosition(0).column->getName() != sources[0].block.getByPosition(0).column->getName()) - throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS); - } -} - - -String ColumnGathererStream::getID() const -{ - std::stringstream res; - - res << getName() << "("; - for (size_t i = 0; i < children.size(); i++) - res << (i == 0 ? "" : ", " ) << children[i]->getID(); - res << ")"; - - return res.str(); -} - - -Block ColumnGathererStream::readImpl() -{ - if (children.size() == 1) - return children[0]->read(); - - if (pos_global >= pos_to_source_idx.size()) - return Block(); - - Block block_res = sources[0].block.cloneEmpty(); - IColumn & column_res = *block_res.getByPosition(0).column; - - size_t pos_finish = std::min(pos_global + block_size, pos_to_source_idx.size()); - column_res.reserve(pos_finish - pos_global); - - for (size_t pos = pos_global; pos < pos_finish; ++pos) - { - auto source_id = pos_to_source_idx[pos].source_id; - bool skip = pos_to_source_idx[pos].flag; - Source & source = sources[source_id]; - - if (source.pos >= source.size) /// Fetch new block - { - try - { - source.block = children[source_id]->read(); - source.update(); - } - catch (Exception & e) - { - e.addMessage("Cannot fetch required block. Stream " + children[source_id]->getID() + ", part " + toString(source_id)); - throw; - } - - if (0 == source.size) - { - throw Exception("Fetched block is empty. Stream " + children[source_id]->getID() + ", part " + toString(source_id), - ErrorCodes::RECEIVED_EMPTY_DATA); - } - } - - if (!skip) - column_res.insertFrom(*source.column, source.pos); //TODO: vectorize - ++source.pos; - } - - pos_global = pos_finish; - - return block_res; -} - -} diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 4f14b5465c1..9a8a641c7cd 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -195,10 +195,13 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs return; } - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = current.impl->order; + size_t source_num = 0; + size_t size = cursors.size(); + for (; source_num < size; ++source_num) + if (&cursors[source_num] == current.impl) + break; - if (source_num >= cursors.size()) + if (source_num == size) throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); for (size_t i = 0; i < num_columns; ++i) @@ -221,9 +224,6 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs finished = true; } - if (out_row_sources) - out_row_sources->resize_fill(out_row_sources->size() + merged_rows, RowSourcePart(source_num)); - // std::cerr << "fetching next block\n"; total_merged_rows += merged_rows; @@ -236,12 +236,6 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs for (size_t i = 0; i < num_columns; ++i) merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); - if (out_row_sources) - { - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - out_row_sources->emplace_back(current.impl->order); - } - if (!current->isLast()) { // std::cerr << "moving to next row\n"; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp index 1f4f6c4e6af..aa9345041d8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockInputStream.cpp @@ -21,7 +21,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, /// const MarkRanges & mark_ranges_, bool use_uncompressed_cache_, ExpressionActionsPtr prewhere_actions_, String prewhere_column_, bool check_columns, size_t min_bytes_to_use_direct_io_, size_t max_read_buffer_size_, - bool save_marks_in_cache_, bool quiet) + bool save_marks_in_cache_) : path(path_), block_size(block_size_), storage(storage_), owned_data_part(owned_data_part_), @@ -97,7 +97,6 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(const String & path_, /// total_rows += range.end - range.begin; total_rows *= storage.index_granularity; - if (!quiet) LOG_TRACE(log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name << ", approx. " << total_rows << (all_mark_ranges.size() > 1 diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 1efe4e8ba1c..c993e103e42 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -16,13 +16,11 @@ #include #include #include -#include #include #include #include #include -#include namespace ProfileEvents @@ -44,10 +42,6 @@ namespace ErrorCodes extern const int ABORTED; } - -using MergeAlgorithm = MergeTreeDataMerger::MergeAlgorithm; - - namespace { @@ -280,157 +274,8 @@ MergeTreeData::DataPartsVector MergeTreeDataMerger::selectAllPartsFromPartition( return parts_from_partition; } -static void extractOrdinaryAndKeyColumns(const NamesAndTypesList & all_columns, ExpressionActionsPtr primary_key_expressions, - NamesAndTypesList & ordinary_column_names_and_types, Names & ordinary_column_names, - NamesAndTypesList & key_column_names_and_types, Names & key_column_names -) -{ - Names key_columns_dup = primary_key_expressions->getRequiredColumns(); - std::set key_columns(key_columns_dup.cbegin(), key_columns_dup.cend()); - for (auto & column : all_columns) - { - auto it = std::find(key_columns.cbegin(), key_columns.cend(), column.name); - - if (key_columns.end() == it) - { - ordinary_column_names_and_types.emplace_back(column); - ordinary_column_names.emplace_back(column.name); - } - else - { - key_column_names_and_types.emplace_back(column); - key_column_names.emplace_back(column.name); - } - } -} - -/* Allow to compute more accurate progress statistics */ -class ColumnSizeEstimator -{ - std::unordered_map map; -public: - - /// Stores approximate size of columns in bytes - /// Exact values are not required since it used for relative values estimation (progress). - size_t sum_total = 0; - size_t sum_index_columns = 0; - size_t sum_ordinary_columns = 0; - - ColumnSizeEstimator(MergeTreeData::DataPartsVector & parts, const Names & key_columns, const Names & ordinary_columns) - { - if (parts.empty()) - return; - - for (const auto & name_and_type : parts.front()->columns) - map[name_and_type.name] = 0; - - for (const auto & part : parts) - { - for (const auto & name_and_type : parts.front()->columns) - map.at(name_and_type.name) += part->getColumnSize(name_and_type.name); - } - - for (const auto & name : key_columns) - sum_index_columns += map.at(name); - - for (const auto & name : ordinary_columns) - sum_ordinary_columns += map.at(name); - - sum_total = sum_index_columns + sum_ordinary_columns; - } - - /// Approximate size of num_rows column elements if column contains num_total_rows elements - Float64 columnSize(const String & column, size_t num_rows, size_t num_total_rows) const - { - return static_cast(map.at(column)) / num_total_rows * num_rows; - } - - /// Relative size of num_rows column elements (in comparison with overall size of all columns) if column contains num_total_rows elements - Float64 columnProgress(const String & column, size_t num_rows, size_t num_total_rows) const - { - return columnSize(column, num_rows, num_total_rows) / sum_total; - } - - /// Like columnSize, but takes into account only PK columns - Float64 keyColumnsSize(size_t num_rows, size_t num_total_rows) const - { - return static_cast(sum_index_columns) / num_total_rows * num_rows; - } - - /// Like columnProgress, but takes into account only PK columns - Float64 keyColumnsProgress(size_t num_rows, size_t num_total_rows) const - { - return keyColumnsSize(num_rows, num_total_rows) / sum_total; - } -}; - - -class MergeProgressCallback : public ProgressCallback -{ -public: - MergeProgressCallback(MergeList::Entry & merge_entry_) : merge_entry(merge_entry_) {} - - MergeProgressCallback(MergeList::Entry & merge_entry_, MergeTreeDataMerger::MergeAlgorithm merge_alg_, size_t num_total_rows, - const ColumnSizeEstimator & column_sizes) - : merge_entry(merge_entry_), merge_alg(merge_alg_) - { - if (merge_alg == MergeAlgorithm::Horizontal) - average_elem_progress = 1.0 / num_total_rows; - else - average_elem_progress = column_sizes.keyColumnsProgress(1, num_total_rows); - } - - MergeList::Entry & merge_entry; - const MergeAlgorithm merge_alg{MergeAlgorithm::Vertical}; - Float64 average_elem_progress; - - void operator() (const Progress & value) - { - ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - merge_entry->bytes_read_uncompressed += value.bytes; - merge_entry->rows_with_key_columns_read += value.rows; - - if (merge_alg == MergeAlgorithm::Horizontal) - { - ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); - merge_entry->rows_read += value.rows; - merge_entry->progress = average_elem_progress * merge_entry->rows_read; - } - else - { - merge_entry->progress = average_elem_progress * merge_entry->rows_with_key_columns_read; - } - }; -}; - -class MergeProgressCallbackVerticalStep : public MergeProgressCallback -{ -public: - - MergeProgressCallbackVerticalStep(MergeList::Entry & merge_entry_, size_t num_total_rows_exact, - const ColumnSizeEstimator & column_sizes, const String & column_name) - : MergeProgressCallback(merge_entry_), initial_progress(merge_entry->progress) - { - average_elem_progress = column_sizes.columnProgress(column_name, 1, num_total_rows_exact); - } - - Float64 initial_progress; - /// NOTE: not thread safe (to be copyable). It is OK in current single thread use case - size_t rows_read_internal{0}; - - void operator() (const Progress & value) - { - merge_entry->bytes_read_uncompressed += value.bytes; - ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); - - rows_read_internal += value.rows; - Float64 local_progress = average_elem_progress * rows_read_internal; - merge_entry->progress = initial_progress + local_progress; - }; -}; - -/// parts should be sorted. +/// parts должны быть отсортированы. MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart( MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeList::Entry & merge_entry, size_t aio_threshold, time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation) @@ -461,58 +306,49 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart part->accumulateColumnSizes(merged_column_to_size); } - Names all_column_names = data.getColumnNamesList(); - NamesAndTypesList all_column_names_and_types = data.getColumnsList(); - SortDescription sort_desc = data.getSortDescription(); - - NamesAndTypesList ordinary_column_names_and_types, key_column_names_and_types; - Names ordinary_column_names, key_column_names; - extractOrdinaryAndKeyColumns(all_column_names_and_types, data.getPrimaryExpression(), - ordinary_column_names_and_types, ordinary_column_names, - key_column_names_and_types, key_column_names - ); + Names column_names = data.getColumnNamesList(); + NamesAndTypesList column_names_and_types = data.getColumnsList(); MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data); ActiveDataPartSet::parsePartName(merged_name, *new_data_part); new_data_part->name = "tmp_" + merged_name; new_data_part->is_temp = true; - size_t sum_input_rows_upper_bound = merge_entry->total_size_marks * data.index_granularity; - - MergedRowSources merged_rows_sources; - MergeAlgorithm merge_alg = chooseMergeAlgorithm(data, parts, sum_input_rows_upper_bound, merged_rows_sources); - - LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); - - MergedRowSources * merged_rows_sources_ptr = (merge_alg == MergeAlgorithm::Vertical) - ? &merged_rows_sources : nullptr; - Names & main_column_names = (merge_alg == MergeAlgorithm::Vertical) - ? key_column_names : all_column_names; - NamesAndTypesList & main_column_names_and_types = (merge_alg == MergeAlgorithm::Vertical) - ? key_column_names_and_types : all_column_names_and_types; - - ColumnSizeEstimator column_sizes(parts, key_column_names, ordinary_column_names); - /** Читаем из всех кусков, сливаем и пишем в новый. * Попутно вычисляем выражение для сортировки. */ BlockInputStreams src_streams; + size_t sum_rows_approx = 0; + + const auto rows_total = merge_entry->total_size_marks * data.index_granularity; + for (size_t i = 0; i < parts.size(); ++i) { + MarkRanges ranges{{0, parts[i]->size}}; + String part_path = data.getFullPath() + parts[i]->name + '/'; - auto input = std::make_unique( - part_path, DEFAULT_MERGE_BLOCK_SIZE, main_column_names, data, parts[i], - MarkRanges(1, MarkRange(0, parts[i]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); + part_path, DEFAULT_MERGE_BLOCK_SIZE, column_names, data, + parts[i], ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false); - input->setProgressCallback(MergeProgressCallback{merge_entry, merge_alg, sum_input_rows_upper_bound, column_sizes}); + input->setProgressCallback([&merge_entry, rows_total] (const Progress & value) + { + const auto new_rows_read = merge_entry->rows_read += value.rows; + merge_entry->progress = static_cast(new_rows_read) / rows_total; + merge_entry->bytes_read_uncompressed += value.bytes; + + ProfileEvents::increment(ProfileEvents::MergedRows, value.rows); + ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.bytes); + }); if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted) src_streams.emplace_back(std::make_shared( std::make_shared(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression()))); else src_streams.emplace_back(std::move(input)); + + sum_rows_approx += parts[i]->size * data.index_granularity; } /// Порядок потоков важен: при совпадении ключа элементы идут в порядке номера потока-источника. @@ -524,32 +360,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart { case MergeTreeData::MergingParams::Ordinary: merged_stream = std::make_unique( - src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, 0, merged_rows_sources_ptr); + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Collapsing: merged_stream = std::make_unique( - src_streams, sort_desc, data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE, merged_rows_sources_ptr); + src_streams, data.getSortDescription(), data.merging_params.sign_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Summing: merged_stream = std::make_unique( - src_streams, sort_desc, data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.columns_to_sum, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Aggregating: merged_stream = std::make_unique( - src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Replacing: merged_stream = std::make_unique( - src_streams, sort_desc, data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); + src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE); break; case MergeTreeData::MergingParams::Graphite: merged_stream = std::make_unique( - src_streams, sort_desc, DEFAULT_MERGE_BLOCK_SIZE, + src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE, data.merging_params.graphite_params, time_of_merge); break; @@ -568,7 +404,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart static_cast(merge_entry->total_size_bytes_compressed) / data.getTotalActiveSizeInBytes()); MergedBlockOutputStream to{ - data, new_part_tmp_path, main_column_names_and_types, compression_method, merged_column_to_size, aio_threshold}; + data, new_part_tmp_path, column_names_and_types, compression_method, merged_column_to_size, aio_threshold}; merged_stream->readPrefix(); to.writePrefix(); @@ -582,86 +418,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart rows_written += block.rows(); to.write(block); - if (merge_alg == MergeAlgorithm::Horizontal) - merge_entry->rows_written = merged_stream->getProfileInfo().rows; - merge_entry->rows_with_key_columns_written = merged_stream->getProfileInfo().rows; + merge_entry->rows_written = merged_stream->getProfileInfo().rows; merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes; - /// This update is unactual for VERTICAL algorithm sicne it requires more accurate per-column updates - /// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements - if (disk_reservation && merge_alg == MergeAlgorithm::Horizontal) - { - Float64 relative_rows_written = std::min(1., 1. * rows_written / sum_input_rows_upper_bound); - disk_reservation->update(static_cast((1. - relative_rows_written) * initial_reservation)); - } + if (disk_reservation) + disk_reservation->update(static_cast((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation)); } if (isCancelled()) throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); - MergeTreeData::DataPart::Checksums checksums_ordinary_columns; - - /// Gather ordinary columns - if (merge_alg == MergeAlgorithm::Vertical) - { - size_t sum_input_rows_exact = merge_entry->rows_with_key_columns_read; - merge_entry->columns_written = key_column_names.size(); - merge_entry->progress = column_sizes.keyColumnsProgress(sum_input_rows_exact, sum_input_rows_exact); - - BlockInputStreams column_part_streams(parts.size()); - auto it_name_and_type = ordinary_column_names_and_types.cbegin(); - - for (size_t column_num = 0; column_num < ordinary_column_names.size(); ++column_num) - { - const String & column_name = ordinary_column_names[column_num]; - Names column_name_(1, column_name); - NamesAndTypesList column_name_and_type_(1, *it_name_and_type++); - Float64 progress_before = merge_entry->progress; - - LOG_TRACE(log, "Gathering column " << column_name << " " << column_name_and_type_.front().type->getName()); - - for (size_t part_num = 0; part_num < parts.size(); ++part_num) - { - String part_path = data.getFullPath() + parts[part_num]->name + '/'; - - /// TODO: test perfomance with more accurate settings - auto column_part_stream = std::make_shared( - part_path, DEFAULT_MERGE_BLOCK_SIZE, column_name_, data, parts[part_num], - MarkRanges(1, MarkRange(0, parts[part_num]->size)), false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, - false, true); - - column_part_stream->setProgressCallback( - MergeProgressCallbackVerticalStep{merge_entry, sum_input_rows_exact, column_sizes, column_name}); - - column_part_streams[part_num] = std::move(column_part_stream); - } - - ColumnGathererStream column_gathered_stream(column_part_streams, merged_rows_sources, DEFAULT_BLOCK_SIZE); - MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, true, compression_method); - - column_to.writePrefix(); - while ((block = column_gathered_stream.read())) - { - column_to.write(block); - } - /// NOTE: nested column contains duplicates checksums (and files) - checksums_ordinary_columns.add(column_to.writeSuffixAndGetChecksums()); - - merge_entry->columns_written = key_column_names.size() + column_num; - merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes; - merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact); - - if (isCancelled()) - throw Exception("Cancelled merging parts", ErrorCodes::ABORTED); - } - } - merged_stream->readSuffix(); - new_data_part->columns = all_column_names_and_types; - if (merge_alg != MergeAlgorithm::Vertical) - new_data_part->checksums = to.writeSuffixAndGetChecksums(); - else - new_data_part->checksums = to.writeSuffixAndGetChecksums(all_column_names_and_types, &checksums_ordinary_columns); + new_data_part->columns = column_names_and_types; + new_data_part->checksums = to.writeSuffixAndGetChecksums(); new_data_part->index.swap(to.getIndex()); /// Для удобства, даже CollapsingSortedBlockInputStream не может выдать ноль строк. @@ -677,43 +446,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart } -MergeTreeDataMerger::MergeAlgorithm MergeTreeDataMerger::chooseMergeAlgorithm( - const MergeTreeData & data, const MergeTreeData::DataPartsVector & parts, - size_t sum_rows_upper_bound, MergedRowSources & rows_sources_to_alloc) const -{ - if (data.context.getMergeTreeSettings().enable_vertical_merge_algorithm == 0) - return MergeAlgorithm::Horizontal; - - bool is_supported_storage = - data.merging_params.mode == MergeTreeData::MergingParams::Ordinary || - data.merging_params.mode == MergeTreeData::MergingParams::Collapsing; - - bool enough_ordinary_cols = data.getColumnNamesList().size() > data.getSortDescription().size(); - - bool enough_total_rows = sum_rows_upper_bound >= DEFAULT_MERGE_BLOCK_SIZE; - - bool no_parts_overflow = parts.size() <= RowSourcePart::MAX_PARTS; - - auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ? - MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal; - - if (merge_alg == MergeAlgorithm::Vertical) - { - try - { - rows_sources_to_alloc.reserve(sum_rows_upper_bound); - } - catch (...) - { - /// Not enough memory for VERTICAL merge algorithm, make sense for very large tables - merge_alg = MergeAlgorithm::Horizontal; - } - } - - return merge_alg; -} - - MergeTreeData::DataPartPtr MergeTreeDataMerger::renameMergedTemporaryPart( MergeTreeData::DataPartsVector & parts, MergeTreeData::MutableDataPartPtr & new_data_part, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp index 9341f6e6801..11c4b09a08f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPart.cpp @@ -222,14 +222,6 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s files[file_name] = Checksum(file_size, file_hash); } -void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums) -{ - for (auto & checksum : rhs_checksums.files) - files[std::move(checksum.first)] = std::move(checksum.second); - - rhs_checksums.files.clear(); -} - /// Контрольная сумма от множества контрольных сумм .bin файлов. void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const { diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index 6cfe8c59bef..293fe0f4b22 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -691,7 +691,7 @@ BlockInputStreams StorageLog::read( max_block_size, column_names, *this, - 0, marksCount() ? std::numeric_limits::max() : 0, + 0, std::numeric_limits::max(), settings.max_read_buffer_size)); } else diff --git a/dbms/src/Storages/System/StorageSystemMerges.cpp b/dbms/src/Storages/System/StorageSystemMerges.cpp index 4bdce11acc0..d78111e05ff 100644 --- a/dbms/src/Storages/System/StorageSystemMerges.cpp +++ b/dbms/src/Storages/System/StorageSystemMerges.cpp @@ -24,10 +24,7 @@ StorageSystemMerges::StorageSystemMerges(const std::string & name) { "bytes_read_uncompressed", std::make_shared() }, { "rows_read", std::make_shared() }, { "bytes_written_uncompressed", std::make_shared() }, - { "rows_written", std::make_shared() }, - { "columns_written", std::make_shared() }, - { "rows_with_key_columns_read", std::make_shared() }, - { "rows_with_key_columns_written", std::make_shared() } + { "rows_written", std::make_shared() } } { } @@ -61,16 +58,13 @@ BlockInputStreams StorageSystemMerges::read( ColumnWithTypeAndName col_rows_read{std::make_shared(), std::make_shared(), "rows_read"}; ColumnWithTypeAndName col_bytes_written_uncompressed{std::make_shared(), std::make_shared(), "bytes_written_uncompressed"}; ColumnWithTypeAndName col_rows_written{std::make_shared(), std::make_shared(), "rows_written"}; - ColumnWithTypeAndName col_columns_written{std::make_shared(), std::make_shared(), "columns_written"}; - ColumnWithTypeAndName col_rows_with_key_columns_read{std::make_shared(), std::make_shared(), "rows_with_key_columns_read"}; - ColumnWithTypeAndName col_rows_with_key_columns_written{std::make_shared(), std::make_shared(), "rows_with_key_columns_written"}; for (const auto & merge : context.getMergeList().get()) { col_database.column->insert(merge.database); col_table.column->insert(merge.table); col_elapsed.column->insert(merge.watch.elapsedSeconds()); - col_progress.column->insert(std::min(1., merge.progress)); /// little cheat + col_progress.column->insert(merge.progress); col_num_parts.column->insert(merge.num_parts); col_result_part_name.column->insert(merge.result_part_name); col_total_size_bytes_compressed.column->insert(merge.total_size_bytes_compressed); @@ -79,9 +73,6 @@ BlockInputStreams StorageSystemMerges::read( col_rows_read.column->insert(merge.rows_read.load(std::memory_order_relaxed)); col_bytes_written_uncompressed.column->insert(merge.bytes_written_uncompressed.load(std::memory_order_relaxed)); col_rows_written.column->insert(merge.rows_written.load(std::memory_order_relaxed)); - col_columns_written.column->insert(merge.columns_written.load(std::memory_order_relaxed)); - col_rows_with_key_columns_read.column->insert(merge.rows_with_key_columns_read.load(std::memory_order_relaxed)); - col_rows_with_key_columns_written.column->insert(merge.rows_with_key_columns_written.load(std::memory_order_relaxed)); } Block block{ @@ -96,10 +87,7 @@ BlockInputStreams StorageSystemMerges::read( col_bytes_read_uncompressed, col_rows_read, col_bytes_written_uncompressed, - col_rows_written, - col_columns_written, - col_rows_with_key_columns_read, - col_rows_with_key_columns_written + col_rows_written }; return BlockInputStreams{1, std::make_shared(block)};