From 8d5e0784d357f756c5f515e682a9ca317b16095d Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Thu, 12 Nov 2020 23:29:36 +0300 Subject: [PATCH 01/32] Add setting optimize_on_insert --- src/Core/Settings.h | 1 + src/Core/SortCursor.h | 42 ++++++--- .../MergingSortedBlockInputStream.cpp | 2 +- src/Interpreters/MergeJoin.cpp | 16 ++-- .../Algorithms/AggregatingSortedAlgorithm.cpp | 6 +- .../Algorithms/CollapsingSortedAlgorithm.cpp | 11 +-- .../Algorithms/CollapsingSortedAlgorithm.h | 6 +- .../GraphiteRollupSortedAlgorithm.cpp | 8 +- .../Merges/Algorithms/IMergingAlgorithm.h | 2 + .../IMergingAlgorithmWithDelayedChunk.cpp | 4 +- .../IMergingAlgorithmWithSharedChunks.cpp | 4 +- .../Algorithms/MergingSortedAlgorithm.cpp | 2 +- .../Algorithms/MergingSortedAlgorithm.h | 6 +- .../Algorithms/ReplacingSortedAlgorithm.cpp | 2 +- src/Processors/Merges/Algorithms/RowRef.h | 4 +- .../Algorithms/SummingSortedAlgorithm.cpp | 4 +- .../VersionedCollapsingAlgorithm.cpp | 2 +- .../Merges/CollapsingSortedTransform.h | 4 +- .../Transforms/SortingTransform.cpp | 2 +- .../MergeTree/MergeTreeBlockOutputStream.cpp | 87 +++++++++++++++++++ .../MergeTree/MergeTreeBlockOutputStream.h | 5 +- src/Storages/StorageMergeTree.cpp | 2 +- 22 files changed, 166 insertions(+), 56 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 580756361b1..ca5ec847efd 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -396,6 +396,7 @@ class IColumn; M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \ M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \ M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \ + M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h index 7a222f70199..f8adb4a5f39 100644 --- a/src/Core/SortCursor.h +++ b/src/Core/SortCursor.h @@ -30,7 +30,6 @@ struct SortCursorImpl ColumnRawPtrs all_columns; SortDescription desc; size_t sort_columns_size = 0; - size_t pos = 0; size_t rows = 0; /** Determines order if comparing columns are equal. @@ -49,15 +48,20 @@ struct SortCursorImpl /** Is there at least one column with Collator. */ bool has_collation = false; + /** We could use SortCursorImpl in case when columns aren't sorted + * but we have their sorted permutation + */ + IColumn::Permutation * permutation = nullptr; + SortCursorImpl() {} - SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) + SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr) : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) { - reset(block); + reset(block, perm); } - SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0) + SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr) : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) { for (auto & column_desc : desc) @@ -66,19 +70,19 @@ struct SortCursorImpl throw Exception("SortDescription should contain column position if SortCursor was used without header.", ErrorCodes::LOGICAL_ERROR); } - reset(columns, {}); + reset(columns, {}, perm); } bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset(const Block & block) + void reset(const Block & block, IColumn::Permutation * perm = nullptr) { - reset(block.getColumns(), block); + reset(block.getColumns(), block, perm); } /// Set the cursor to the beginning of the new block. - void reset(const Columns & columns, const Block & block) + void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr) { all_columns.clear(); sort_columns.clear(); @@ -96,18 +100,36 @@ struct SortCursorImpl : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); - need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String) + need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); has_collation |= need_collation[j]; } pos = 0; rows = all_columns[0]->size(); + permutation = perm; + } + + size_t getPos() const + { + if (permutation) + return (*permutation)[pos]; + return pos; + } + + /// We need a possibility to change pos (see MergeJoin). + size_t & getPosRef() + { + return pos; } bool isFirst() const { return pos == 0; } bool isLast() const { return pos + 1 >= rows; } bool isValid() const { return pos < rows; } void next() { ++pos; } + +/// Prevent using pos instead of getPos() +private: + size_t pos; }; using SortCursorImpls = std::vector; @@ -127,7 +149,7 @@ struct SortCursorHelper bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const { - return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos); + return derived().greaterAt(rhs.derived(), impl->getPos(), rhs.impl->getPos()); } /// Inverted so that the priority queue elements are removed in ascending order. diff --git a/src/DataStreams/MergingSortedBlockInputStream.cpp b/src/DataStreams/MergingSortedBlockInputStream.cpp index 9c9213ef3cc..2cb87b07dba 100644 --- a/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -224,7 +224,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; // std::cerr << "Inserting row\n"; for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); if (out_row_sources_buf) { diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 665ec4d60f3..99fa99b914b 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -182,10 +182,10 @@ public: : impl(SortCursorImpl(block, desc_)) {} - size_t position() const { return impl.pos; } + size_t position() const { return impl.getPos(); } size_t end() const { return impl.rows; } - bool atEnd() const { return impl.pos >= impl.rows; } - void nextN(size_t num) { impl.pos += num; } + bool atEnd() const { return impl.getPos() >= impl.rows; } + void nextN(size_t num) { impl.getPosRef() += num; } void setCompareNullability(const MergeJoinCursor & rhs) { @@ -254,10 +254,10 @@ private: else if (cmp > 0) rhs.impl.next(); else if (!cmp) - return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()}; + return Range{impl.getPos(), rhs.impl.getPos(), getEqualLength(), rhs.getEqualLength()}; } - return Range{impl.pos, rhs.impl.pos, 0, 0}; + return Range{impl.getPos(), rhs.impl.getPos(), 0, 0}; } template @@ -268,7 +268,7 @@ private: const auto * left_column = impl.sort_columns[i]; const auto * right_column = rhs.impl.sort_columns[i]; - int res = nullableCompareAt(*left_column, *right_column, impl.pos, rhs.impl.pos); + int res = nullableCompareAt(*left_column, *right_column, impl.getPos(), rhs.impl.getPos()); if (res) return res; } @@ -278,11 +278,11 @@ private: /// Expects !atEnd() size_t getEqualLength() { - size_t pos = impl.pos + 1; + size_t pos = impl.getPos() + 1; for (; pos < impl.rows; ++pos) if (!samePrev(pos)) break; - return pos - impl.pos; + return pos - impl.getPos(); } /// Expects lhs_pos > 0 diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index b834ed82729..cb254b54dd3 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -239,12 +239,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR); for (auto & desc : def.columns_to_aggregate) - desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos); + desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getPos()); for (auto & desc : def.columns_to_simple_aggregate) { auto & col = cursor->all_columns[desc.column_number]; - desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get()); + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getPos(), arena.get()); } } @@ -334,7 +334,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->pos); + merged_data.startGroup(current->all_columns, current->getPos()); } merged_data.addRow(current); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index e7a7200ac34..24e1d1d40a8 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -13,11 +13,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int INCORRECT_DATA; -} - CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( const Block & header, size_t num_inputs, @@ -25,9 +20,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( const String & sign_column, bool only_positive_sign_, size_t max_block_size, + Poco::Logger * log_, WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes, - Poco::Logger * log_) + bool use_average_block_sizes) : IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs) , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , sign_column_number(header.getPositionByName(sign_column)) @@ -123,7 +118,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge() return Status(current.impl->order); } - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; RowRef current_row; setRowRef(current_row, current); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h index d95fac2f02b..028715f715b 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h @@ -33,9 +33,9 @@ public: const String & sign_column, bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0. size_t max_block_size, - WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes, - Poco::Logger * log_); + Poco::Logger * log_, + WriteBuffer * out_row_sources_buf_ = nullptr, + bool use_average_block_sizes = false); Status merge() override; diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp index 0e704e5a05b..59aef3b3bd6 100644 --- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp @@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() return Status(current.impl->order); } - StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos); + StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getPos()); bool new_path = is_first || next_path != current_group_path; is_first = false; - time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos); + time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getPos()); /// Is new key before rounding. bool is_new_key = new_path || next_row_time != current_time; @@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() /// and for rows with same maximum version - only last row. if (is_new_key || current->all_columns[columns_definition.version_column_num]->compareAt( - current->pos, current_subgroup_newest_row.row_num, + current->getPos(), current_subgroup_newest_row.row_num, *(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num], /* nan_direction_hint = */ 1) >= 0) { @@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule) { - merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition); + merged_data.startNextGroup(cursor->all_columns, cursor->getPos(), next_rule, columns_definition); } void GraphiteRollupSortedAlgorithm::finishCurrentGroup() diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h index f86be2a7d1b..5c8d18875e7 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h @@ -29,6 +29,8 @@ public: /// between different algorithm objects in parallel FINAL. bool skip_last_row = false; + IColumn::Permutation * permutation = nullptr; + void swap(Input & other) { chunk.swap(other.chunk); diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp index 0b13d689636..e4c60d7609c 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp @@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs) if (!current_inputs[source_num].chunk) continue; - cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num); + cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation); } queue = SortingHeap(cursors); @@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); current_input.swap(input); - cursors[source_num].reset(current_input.chunk.getColumns(), {}); + cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation); queue.push(cursors[source_num]); } diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp index 39abe5c0ec7..97abffdc167 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp @@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs) source.skip_last_row = inputs[source_num].skip_last_row; source.chunk = chunk_allocator.alloc(inputs[source_num].chunk); - cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num); + cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation); source.chunk->all_columns = cursors[source_num].all_columns; source.chunk->sort_columns = cursors[source_num].sort_columns; @@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num auto & source = sources[source_num]; source.skip_last_row = input.skip_last_row; source.chunk = chunk_allocator.alloc(input.chunk); - cursors[source_num].reset(source.chunk->getColumns(), {}); + cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation); source.chunk->all_columns = cursors[source_num].all_columns; source.chunk->sort_columns = cursors[source_num].sort_columns; diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp index ee13ef70203..138f19de4c2 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp @@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->pos, current->rows); + merged_data.insertRow(current->all_columns, current->getPos(), current->rows); if (out_row_sources_buf) { diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h index 531b2636747..63dced26dd4 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h @@ -18,9 +18,9 @@ public: size_t num_inputs, SortDescription description_, size_t max_block_size, - UInt64 limit_, - WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes); + UInt64 limit_ = 0, + WriteBuffer * out_row_sources_buf_ = nullptr, + bool use_average_block_sizes = false); void addInput(); diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp index 3ee0df0efd8..faa15c004c7 100644 --- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge() if (version_column_number == -1 || selected_row.empty() || current->all_columns[version_column_number]->compareAt( - current->pos, selected_row.row_num, + current->getPos(), selected_row.row_num, *(*selected_row.all_columns)[version_column_number], /* nan_direction_hint = */ 1) >= 0) { diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h index 1b4da9781f8..16e7a5b5cc2 100644 --- a/src/Processors/Merges/Algorithms/RowRef.h +++ b/src/Processors/Merges/Algorithms/RowRef.h @@ -136,7 +136,7 @@ struct RowRef { sort_columns = cursor.impl->sort_columns.data(); num_columns = cursor.impl->sort_columns.size(); - row_num = cursor.impl->pos; + row_num = cursor.impl->getPos(); } static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row) @@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk void set(SortCursor & cursor, SharedChunkPtr chunk) { owned_chunk = std::move(chunk); - row_num = cursor.impl->pos; + row_num = cursor.impl->getPos(); all_columns = &owned_chunk->all_columns; sort_columns = &owned_chunk->sort_columns; } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 17e5e4364ff..c4b35369cd6 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->pos); + merged_data.startGroup(current->all_columns, current->getPos()); } else - merged_data.addRow(current->all_columns, current->pos); + merged_data.addRow(current->all_columns, current->getPos()); if (!current->isLast()) { diff --git a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp index 6273bd28371..707d755f980 100644 --- a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge() RowRef current_row; - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; setRowRef(current_row, current); diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 4e65504a101..9e6bd306eee 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -27,9 +27,9 @@ public: sign_column, only_positive_sign, max_block_size, + &Poco::Logger::get("CollapsingSortedTransform"), out_row_sources_buf_, - use_average_block_sizes, - &Poco::Logger::get("CollapsingSortedTransform")) + use_average_block_sizes) { } diff --git a/src/Processors/Transforms/SortingTransform.cpp b/src/Processors/Transforms/SortingTransform.cpp index 03c8e87ce7a..607dcd20a3a 100644 --- a/src/Processors/Transforms/SortingTransform.cpp +++ b/src/Processors/Transforms/SortingTransform.cpp @@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue) /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); ++total_merged_rows; ++merged_rows; diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 40714e5af31..2de68c802b9 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -2,26 +2,113 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Block MergeTreeBlockOutputStream::getHeader() const { return metadata_snapshot->getSampleBlock(); } +Block MergeTreeBlockOutputStream::mergeBlock(const Block & block) +{ + /// Get the information needed for merging algorithms + size_t block_size = block.rows(); + Names sort_columns = metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(getHeader().getPositionByName(sort_columns[i]), 1, 1); + + auto get_merging_algorithm = [&]() -> std::shared_ptr + { + switch (storage.merging_params.mode) + { + /// There is nothing to merge in single block in ordinary MergeTree + case MergeTreeData::MergingParams::Ordinary: + return nullptr; + case MergeTreeData::MergingParams::Replacing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.version_column, block_size); + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.sign_column, + false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream")); + case MergeTreeData::MergingParams::Summing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.columns_to_sum, + metadata_snapshot->getPartitionKey().column_names, block_size); + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(getHeader(), 1, sort_description, block_size); + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.sign_column, block_size); + case MergeTreeData::MergingParams::Graphite: + return std::make_shared( + getHeader(), 1, sort_description, block_size, storage.merging_params.graphite_params, time(nullptr)); + } + + __builtin_unreachable(); + }; + + auto merging_algorithm = get_merging_algorithm(); + if (!merging_algorithm) + return block; + + /// Merging algorithms works with inputs containing sorted chunks, so we need to get a sorted permutation + /// of the block, convert the block to a chunk and construct an input from it + IColumn::Permutation permutation; + stableGetPermutation(block, sort_description, permutation); + + Chunk chunk(block.getColumns(), block_size); + + IMergingAlgorithm::Input input; + input.set(std::move(chunk)); + input.permutation = &permutation; + + IMergingAlgorithm::Inputs inputs; + inputs.push_back(std::move(input)); + merging_algorithm->initialize(std::move(inputs)); + + IMergingAlgorithm::Status status = merging_algorithm->merge(); + while (!status.is_finished) + status = merging_algorithm->merge(); + + return block.cloneWithColumns(status.chunk.getColumns()); +} + void MergeTreeBlockOutputStream::write(const Block & block) { storage.delayInsertOrThrowIfNeeded(); + auto settings = context.getSettings(); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); for (auto & current_block : part_blocks) { Stopwatch watch; + if (settings.optimize_on_insert) + current_block.block = mergeBlock(current_block.block); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot); storage.renameTempPartAndAdd(part, &storage.increment); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 8aae7f3e625..1024bd72fa2 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -14,20 +14,23 @@ class StorageMergeTree; class MergeTreeBlockOutputStream : public IBlockOutputStream { public: - MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_) + MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, const Context & context_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , max_parts_per_block(max_parts_per_block_) + , context(context_) { } Block getHeader() const override; void write(const Block & block) override; + Block mergeBlock(const Block & block); private: StorageMergeTree & storage; StorageMetadataPtr metadata_snapshot; size_t max_parts_per_block; + const Context & context; }; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 925f510f63a..cb3f425aba4 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -232,7 +232,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto const auto & settings = context.getSettingsRef(); return std::make_shared( - *this, metadata_snapshot, settings.max_partitions_per_insert_block); + *this, metadata_snapshot, settings.max_partitions_per_insert_block, context); } void StorageMergeTree::checkTableCanBeDropped() const From 6a57c0a8cf5217e90ef8c5daa6f006436856e033 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 13 Nov 2020 10:54:05 +0300 Subject: [PATCH 02/32] Move merge in MergeTreeDataWriter --- .../Algorithms/CollapsingSortedAlgorithm.cpp | 5 + .../MergeTree/MergeTreeBlockOutputStream.cpp | 89 +----------- .../MergeTree/MergeTreeBlockOutputStream.h | 1 - .../MergeTree/MergeTreeDataWriter.cpp | 129 ++++++++++++++---- src/Storages/MergeTree/MergeTreeDataWriter.h | 4 +- .../ReplicatedMergeTreeBlockOutputStream.cpp | 6 +- .../ReplicatedMergeTreeBlockOutputStream.h | 5 +- src/Storages/StorageReplicatedMergeTree.cpp | 5 +- 8 files changed, 120 insertions(+), 124 deletions(-) diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index 24e1d1d40a8..f7bcf65486f 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -13,6 +13,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( const Block & header, size_t num_inputs, diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 2de68c802b9..94d326ae9f1 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -2,114 +2,27 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - Block MergeTreeBlockOutputStream::getHeader() const { return metadata_snapshot->getSampleBlock(); } -Block MergeTreeBlockOutputStream::mergeBlock(const Block & block) -{ - /// Get the information needed for merging algorithms - size_t block_size = block.rows(); - Names sort_columns = metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(getHeader().getPositionByName(sort_columns[i]), 1, 1); - - auto get_merging_algorithm = [&]() -> std::shared_ptr - { - switch (storage.merging_params.mode) - { - /// There is nothing to merge in single block in ordinary MergeTree - case MergeTreeData::MergingParams::Ordinary: - return nullptr; - case MergeTreeData::MergingParams::Replacing: - return std::make_shared( - getHeader(), 1, sort_description, storage.merging_params.version_column, block_size); - case MergeTreeData::MergingParams::Collapsing: - return std::make_shared( - getHeader(), 1, sort_description, storage.merging_params.sign_column, - false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream")); - case MergeTreeData::MergingParams::Summing: - return std::make_shared( - getHeader(), 1, sort_description, storage.merging_params.columns_to_sum, - metadata_snapshot->getPartitionKey().column_names, block_size); - case MergeTreeData::MergingParams::Aggregating: - return std::make_shared(getHeader(), 1, sort_description, block_size); - case MergeTreeData::MergingParams::VersionedCollapsing: - return std::make_shared( - getHeader(), 1, sort_description, storage.merging_params.sign_column, block_size); - case MergeTreeData::MergingParams::Graphite: - return std::make_shared( - getHeader(), 1, sort_description, block_size, storage.merging_params.graphite_params, time(nullptr)); - } - - __builtin_unreachable(); - }; - - auto merging_algorithm = get_merging_algorithm(); - if (!merging_algorithm) - return block; - - /// Merging algorithms works with inputs containing sorted chunks, so we need to get a sorted permutation - /// of the block, convert the block to a chunk and construct an input from it - IColumn::Permutation permutation; - stableGetPermutation(block, sort_description, permutation); - - Chunk chunk(block.getColumns(), block_size); - - IMergingAlgorithm::Input input; - input.set(std::move(chunk)); - input.permutation = &permutation; - - IMergingAlgorithm::Inputs inputs; - inputs.push_back(std::move(input)); - merging_algorithm->initialize(std::move(inputs)); - - IMergingAlgorithm::Status status = merging_algorithm->merge(); - while (!status.is_finished) - status = merging_algorithm->merge(); - - return block.cloneWithColumns(status.chunk.getColumns()); -} - void MergeTreeBlockOutputStream::write(const Block & block) { storage.delayInsertOrThrowIfNeeded(); - auto settings = context.getSettings(); - auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); for (auto & current_block : part_blocks) { Stopwatch watch; - if (settings.optimize_on_insert) - current_block.block = mergeBlock(current_block.block); - - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); storage.renameTempPartAndAdd(part, &storage.increment); PartLog::addNewPart(storage.global_context, part, watch.elapsed()); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 1024bd72fa2..3b61d5fa0ef 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -24,7 +24,6 @@ public: Block getHeader() const override; void write(const Block & block) override; - Block mergeBlock(const Block & block); private: StorageMergeTree & storage; diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index e42bb786f46..b111f432632 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -16,6 +16,14 @@ #include +#include +#include +#include +#include +#include +#include +#include + namespace ProfileEvents { extern const Event MergeTreeDataWriterBlocks; @@ -194,7 +202,66 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block return result; } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot) +Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) +{ + size_t block_size = block.rows(); + + auto get_merging_algorithm = [&]() -> std::shared_ptr + { + switch (data.merging_params.mode) + { + /// There is nothing to merge in single block in ordinary MergeTree + case MergeTreeData::MergingParams::Ordinary: + return nullptr; + case MergeTreeData::MergingParams::Replacing: + return std::make_shared( + block, 1, sort_description, data.merging_params.version_column, block_size); + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared( + block, 1, sort_description, data.merging_params.sign_column, + false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream")); + case MergeTreeData::MergingParams::Summing: + return std::make_shared( + block, 1, sort_description, data.merging_params.columns_to_sum, + partition_key_columns, block_size); + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(block, 1, sort_description, block_size); + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared( + block, 1, sort_description, data.merging_params.sign_column, block_size); + case MergeTreeData::MergingParams::Graphite: + return std::make_shared( + block, 1, sort_description, block_size, data.merging_params.graphite_params, time(nullptr)); + } + + __builtin_unreachable(); + }; + + auto merging_algorithm = get_merging_algorithm(); + if (!merging_algorithm) + return block; + + Chunk chunk(block.getColumns(), block_size); + + IMergingAlgorithm::Input input; + input.set(std::move(chunk)); + input.permutation = *permutation; + + IMergingAlgorithm::Inputs inputs; + inputs.push_back(std::move(input)); + merging_algorithm->initialize(std::move(inputs)); + + IMergingAlgorithm::Status status = merging_algorithm->merge(); + while (!status.is_finished) + status = merging_algorithm->merge(); + + /// Merged Block is sorted and we don't need to use permutation anymore + *permutation = nullptr; + + return block.cloneWithColumns(status.chunk.getColumns()); +} + +MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, const Context & context) { Block & block = block_with_partition.block; @@ -228,6 +295,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa else part_name = new_part_info.getPartName(); + /// If we need to calculate some columns to sort. + if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices()) + data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block); + + Names sort_columns = metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1); + + ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks); + + /// Sort + IColumn::Permutation * perm_ptr = nullptr; + IColumn::Permutation perm; + if (!sort_description.empty()) + { + if (!isAlreadySorted(block, sort_description)) + { + stableGetPermutation(block, sort_description, perm); + perm_ptr = &perm; + } + else + ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted); + } + + Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; + if (context.getSettings().optimize_on_insert) + block = mergeBlock(block, sort_description, partition_key_columns, &perm_ptr); + /// Size of part would not be greater than block.bytes() + epsilon size_t expected_size = block.bytes(); @@ -271,34 +370,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa sync_guard.emplace(disk, full_path); } - /// If we need to calculate some columns to sort. - if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices()) - data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot)->execute(block); - - Names sort_columns = metadata_snapshot->getSortingKeyColumns(); - SortDescription sort_description; - size_t sort_columns_size = sort_columns.size(); - sort_description.reserve(sort_columns_size); - - for (size_t i = 0; i < sort_columns_size; ++i) - sort_description.emplace_back(block.getPositionByName(sort_columns[i]), 1, 1); - - ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks); - - /// Sort - IColumn::Permutation * perm_ptr = nullptr; - IColumn::Permutation perm; - if (!sort_description.empty()) - { - if (!isAlreadySorted(block, sort_description)) - { - stableGetPermutation(block, sort_description, perm); - perm_ptr = &perm; - } - else - ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocksAlreadySorted); - } - if (metadata_snapshot->hasRowsTTL()) updateTTL(metadata_snapshot->getRowsTTL(), new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true); diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index b4ad936672c..9523ca76c6e 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -45,7 +45,9 @@ public: /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. */ - MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot); + MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, const Context & context); + + Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation); private: MergeTreeData & data; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index c8530943873..9dd85bc0fa6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -40,7 +40,8 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool quorum_parallel_, - bool deduplicate_) + bool deduplicate_, + const Context & context_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , quorum(quorum_) @@ -49,6 +50,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( , quorum_parallel(quorum_parallel_) , deduplicate(deduplicate_) , log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) + , context(context_) { /// The quorum value `1` has the same meaning as if it is disabled. if (quorum == 1) @@ -142,7 +144,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) /// Write part to the filesystem under temporary name. Calculate a checksum. - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); String block_id; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 97c094c1128..7540e89f00f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -29,7 +29,8 @@ public: size_t quorum_timeout_ms_, size_t max_parts_per_block_, bool quorum_parallel_, - bool deduplicate_); + bool deduplicate_, + const Context & context_); Block getHeader() const override; void writePrefix() override; @@ -71,6 +72,8 @@ private: using Logger = Poco::Logger; Poco::Logger * log; + + const Context & context; }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e7264610e3e..b51aa2f751e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3754,7 +3754,8 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, query_settings.insert_quorum_timeout.totalMilliseconds(), query_settings.max_partitions_per_insert_block, query_settings.insert_quorum_parallel, - deduplicate); + deduplicate, + context); } @@ -4412,7 +4413,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition( PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); - ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false); /// TODO Allow to use quorum here. + ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context); /// TODO Allow to use quorum here. for (size_t i = 0; i < loaded_parts.size(); ++i) { String old_name = loaded_parts[i]->name; From 264e9daf6f9b22a52f50a9696915e2c4f42c349b Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 13 Nov 2020 11:16:30 +0300 Subject: [PATCH 03/32] Fix style --- src/Storages/MergeTree/MergeTreeDataWriter.cpp | 2 +- src/Storages/MergeTree/MergeTreeDataWriter.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index b111f432632..65846c10ff0 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -202,7 +202,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block return result; } -Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) +Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) { size_t block_size = block.rows(); diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index 9523ca76c6e..93c15d38471 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -47,7 +47,7 @@ public: */ MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, const Context & context); - Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation); + Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation); private: MergeTreeData & data; From 53b8ff42c2daac587b7bdef50d376dd216243690 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 17 Nov 2020 19:56:36 +0300 Subject: [PATCH 04/32] Add test and set setting to 0 in some other tests --- .../MergeTree/MergeTreeDataWriter.cpp | 20 +++++++---- .../00083_create_merge_tree_zookeeper.sql | 2 ++ .../00327_summing_composite_nested.sql | 2 ++ .../00443_optimize_final_vertical_merge.sh | 6 ++-- ...ed_storage_definition_syntax_zookeeper.sql | 2 ++ .../00564_versioned_collapsing_merge_tree.sql | 2 ++ ...77_replacing_merge_tree_vertical_merge.sql | 2 ++ .../0_stateless/00616_final_single_part.sql | 2 ++ ...00660_optimize_final_without_partition.sql | 2 ++ ...replicated_without_partition_zookeeper.sql | 2 ++ .../00754_alter_modify_order_by.sql | 1 + ...r_modify_order_by_replicated_zookeeper.sql | 2 ++ ...030_incorrect_count_summing_merge_tree.sql | 2 ++ ...01285_data_skip_index_over_aggregation.sql | 2 ++ .../0_stateless/01323_add_scalars_in_time.sql | 2 ++ ...mming_merge_tree_exclude_partition_key.sql | 2 ++ .../01560_optimize_on_insert.reference | 13 +++++++ .../0_stateless/01560_optimize_on_insert.sql | 35 +++++++++++++++++++ 18 files changed, 92 insertions(+), 9 deletions(-) create mode 100644 tests/queries/0_stateless/01560_optimize_on_insert.reference create mode 100644 tests/queries/0_stateless/01560_optimize_on_insert.sql diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 65846c10ff0..f338b4b7048 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -204,6 +204,9 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) { + + LOG_DEBUG(log, "Apply merging algorithm on inserted data"); + size_t block_size = block.rows(); auto get_merging_algorithm = [&]() -> std::shared_ptr @@ -215,23 +218,23 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ return nullptr; case MergeTreeData::MergingParams::Replacing: return std::make_shared( - block, 1, sort_description, data.merging_params.version_column, block_size); + block, 1, sort_description, data.merging_params.version_column, block_size + 1); case MergeTreeData::MergingParams::Collapsing: return std::make_shared( block, 1, sort_description, data.merging_params.sign_column, - false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream")); + false, block_size + 1, &Poco::Logger::get("MergeTreeBlockOutputStream")); case MergeTreeData::MergingParams::Summing: return std::make_shared( block, 1, sort_description, data.merging_params.columns_to_sum, - partition_key_columns, block_size); + partition_key_columns, block_size + 1); case MergeTreeData::MergingParams::Aggregating: - return std::make_shared(block, 1, sort_description, block_size); + return std::make_shared(block, 1, sort_description, block_size + 1); case MergeTreeData::MergingParams::VersionedCollapsing: return std::make_shared( - block, 1, sort_description, data.merging_params.sign_column, block_size); + block, 1, sort_description, data.merging_params.sign_column, block_size + 1); case MergeTreeData::MergingParams::Graphite: return std::make_shared( - block, 1, sort_description, block_size, data.merging_params.graphite_params, time(nullptr)); + block, 1, sort_description, block_size + 1, data.merging_params.graphite_params, time(nullptr)); } __builtin_unreachable(); @@ -243,6 +246,8 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ Chunk chunk(block.getColumns(), block_size); + LOG_DEBUG(log, "chunk size before merge {}, block rows {}", chunk.getNumRows(), block_size); + IMergingAlgorithm::Input input; input.set(std::move(chunk)); input.permutation = *permutation; @@ -255,6 +260,9 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ while (!status.is_finished) status = merging_algorithm->merge(); + + LOG_DEBUG(log, "chunk size after merge {}", status.chunk.getNumRows()); + /// Merged Block is sorted and we don't need to use permutation anymore *permutation = nullptr; diff --git a/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql b/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql index 120d599bd35..998a4517163 100644 --- a/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql +++ b/tests/queries/0_stateless/00083_create_merge_tree_zookeeper.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS merge_tree; DROP TABLE IF EXISTS collapsing_merge_tree; DROP TABLE IF EXISTS versioned_collapsing_merge_tree; diff --git a/tests/queries/0_stateless/00327_summing_composite_nested.sql b/tests/queries/0_stateless/00327_summing_composite_nested.sql index b61bc71b892..9be21e87abf 100644 --- a/tests/queries/0_stateless/00327_summing_composite_nested.sql +++ b/tests/queries/0_stateless/00327_summing_composite_nested.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS summing_composite_key; CREATE TABLE summing_composite_key (d Date, k UInt64, FirstMap Nested(k1 UInt32, k2ID Int8, s Float64), SecondMap Nested(k1ID UInt64, k2Key String, k3Type Int32, s Int64)) ENGINE = SummingMergeTree(d, k, 1); diff --git a/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh b/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh index 1ea7762b813..1bd73cc1f3e 100755 --- a/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh +++ b/tests/queries/0_stateless/00443_optimize_final_vertical_merge.sh @@ -43,13 +43,13 @@ $CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT toDate(0) AS date, toInt8(1) AS Sign, toUInt64(0) AS ki -FROM system.numbers LIMIT 9000" +FROM system.numbers LIMIT 9000" --server_logs_file=/dev/null $CLICKHOUSE_CLIENT -q "INSERT INTO $name (date, Sign, ki) SELECT toDate(0) AS date, toInt8(1) AS Sign, number AS ki -FROM system.numbers LIMIT 9000, 9000" +FROM system.numbers LIMIT 9000, 9000" --server_logs_file=/dev/null $CLICKHOUSE_CLIENT -q "INSERT INTO $name SELECT toDate(0) AS date, @@ -68,7 +68,7 @@ number AS di09, number AS di10, [number, number+1] AS \`n.i\`, [hex(number), hex(number+1)] AS \`n.s\` -FROM system.numbers LIMIT $res_rows" +FROM system.numbers LIMIT $res_rows" --server_logs_file=/dev/null while [[ $(get_num_parts) -ne 1 ]] ; do $CLICKHOUSE_CLIENT -q "OPTIMIZE TABLE $name PARTITION 197001" --server_logs_file=/dev/null; done diff --git a/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql b/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql index 4a3dd2981cd..cec6088bd59 100644 --- a/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql +++ b/tests/queries/0_stateless/00509_extended_storage_definition_syntax_zookeeper.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + SELECT '*** Replicated with sampling ***'; DROP TABLE IF EXISTS test.replicated_with_sampling; diff --git a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql index b7824e7efdc..634b9781c7a 100644 --- a/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql +++ b/tests/queries/0_stateless/00564_versioned_collapsing_merge_tree.sql @@ -1,3 +1,5 @@ +set optimize_on_insert = 0; + drop table if exists mult_tab; create table mult_tab (date Date, value String, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; diff --git a/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql b/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql index 1bfbdaf75c5..8c51a6f34da 100644 --- a/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql +++ b/tests/queries/0_stateless/00577_replacing_merge_tree_vertical_merge.sql @@ -1,3 +1,5 @@ +set optimize_on_insert = 0; + drop table if exists tab_00577; create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1); diff --git a/tests/queries/0_stateless/00616_final_single_part.sql b/tests/queries/0_stateless/00616_final_single_part.sql index df65123e29b..6618d0b1252 100644 --- a/tests/queries/0_stateless/00616_final_single_part.sql +++ b/tests/queries/0_stateless/00616_final_single_part.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS test_00616; DROP TABLE IF EXISTS replacing_00616; diff --git a/tests/queries/0_stateless/00660_optimize_final_without_partition.sql b/tests/queries/0_stateless/00660_optimize_final_without_partition.sql index 8c1f2ebd361..6545ad6e85b 100644 --- a/tests/queries/0_stateless/00660_optimize_final_without_partition.sql +++ b/tests/queries/0_stateless/00660_optimize_final_without_partition.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS partitioned_by_tuple; CREATE TABLE partitioned_by_tuple (d Date, x UInt8, w String, y UInt8) ENGINE SummingMergeTree (y) PARTITION BY (d, x) ORDER BY (d, x, w); diff --git a/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql b/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql index 77747bc0383..033202e04aa 100644 --- a/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql +++ b/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS partitioned_by_tuple_replica1_00661; DROP TABLE IF EXISTS partitioned_by_tuple_replica2_00661; CREATE TABLE partitioned_by_tuple_replica1_00661(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple_00661', '1') PARTITION BY (d, x) ORDER BY (d, x, w); diff --git a/tests/queries/0_stateless/00754_alter_modify_order_by.sql b/tests/queries/0_stateless/00754_alter_modify_order_by.sql index f8c584ed052..a09d824c928 100644 --- a/tests/queries/0_stateless/00754_alter_modify_order_by.sql +++ b/tests/queries/0_stateless/00754_alter_modify_order_by.sql @@ -1,4 +1,5 @@ SET send_logs_level = 'fatal'; +SET optimize_on_insert = 0; DROP TABLE IF EXISTS old_style; CREATE TABLE old_style(d Date, x UInt32) ENGINE MergeTree(d, x, 8192); diff --git a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql index 0f861749537..b3b746c3b43 100644 --- a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql +++ b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql @@ -1,3 +1,5 @@ +SET optimze_on_insert = 0; + SET send_logs_level = 'fatal'; DROP TABLE IF EXISTS old_style; diff --git a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql index 0b5845d3b04..90b1660e546 100644 --- a/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql +++ b/tests/queries/0_stateless/01030_incorrect_count_summing_merge_tree.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + select '-- SummingMergeTree with Nullable column without duplicates.'; drop table if exists tst; diff --git a/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql b/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql index 110c5b65cab..812da74ce75 100644 --- a/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql +++ b/tests/queries/0_stateless/01285_data_skip_index_over_aggregation.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS data_01285; SET max_threads=1; diff --git a/tests/queries/0_stateless/01323_add_scalars_in_time.sql b/tests/queries/0_stateless/01323_add_scalars_in_time.sql index 2d7cf270017..2ee5603f760 100644 --- a/tests/queries/0_stateless/01323_add_scalars_in_time.sql +++ b/tests/queries/0_stateless/01323_add_scalars_in_time.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS tags; CREATE TABLE tags ( diff --git a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql index 790fbca6b73..c5a874efe09 100644 --- a/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql +++ b/tests/queries/0_stateless/01373_summing_merge_tree_exclude_partition_key.sql @@ -1,3 +1,5 @@ +SET optimize_on_insert = 0; + DROP TABLE IF EXISTS tt_01373; CREATE TABLE tt_01373 diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.reference b/tests/queries/0_stateless/01560_optimize_on_insert.reference new file mode 100644 index 00000000000..b1f46ac9068 --- /dev/null +++ b/tests/queries/0_stateless/01560_optimize_on_insert.reference @@ -0,0 +1,13 @@ +Replacing Merge Tree +1 2020-01-02 00:00:00 +1 2020-01-01 00:00:00 +Collapsing Merge Tree +1 1 2020-01-01 00:00:00 +Versioned Collapsing Merge Tree +1 1 2 2020-01-01 00:00:00 +Summing Merge Tree +1 6 2020-01-02 00:00:00 +1 6 2020-01-01 00:00:00 +Aggregating Merge Tree +1 5 2020-01-02 00:00:00 +1 5 2020-01-01 00:00:00 diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.sql b/tests/queries/0_stateless/01560_optimize_on_insert.sql new file mode 100644 index 00000000000..6cfdda1205b --- /dev/null +++ b/tests/queries/0_stateless/01560_optimize_on_insert.sql @@ -0,0 +1,35 @@ +SELECT 'Replacing Merge Tree'; +DROP TABLE IF EXISTS replacing_merge_tree; +CREATE TABLE replacing_merge_tree (key UInt32, date Datetime) ENGINE=ReplacingMergeTree() PARTITION BY date ORDER BY key; +INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (1, '2020-01-02'), (1, '2020-01-01'), (1, '2020-01-02'); +SELECT * FROM replacing_merge_tree; +DROP TABLE replacing_merge_tree; + +SELECT 'Collapsing Merge Tree'; +DROP TABLE IF EXISTS collapsing_merge_tree; +CREATE TABLE collapsing_merge_tree (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key; +INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, -1, '2020-01-01'), (1, -1, '2020-01-02'), (1, 1, '2020-01-01'); +SELECT * FROM collapsing_merge_tree; +DROP TABLE collapsing_merge_tree; + +SELECT 'Versioned Collapsing Merge Tree'; +DROP TABLE IF EXISTS versioned_collapsing_merge_tree; +CREATE TABLE versioned_collapsing_merge_tree (key UInt32, sign Int8, version Int32, date Datetime) ENGINE=VersionedCollapsingMergeTree(sign, version) PARTITION BY date ORDER BY (key, version); +INSERT INTO versioned_collapsing_merge_tree VALUES (1, 1, 1, '2020-01-01'), (1, -1, 1, '2020-01-01'), (1, 1, 2, '2020-01-01'); +SELECT * FROM versioned_collapsing_merge_tree; +DROP TABLE versioned_collapsing_merge_tree; + +SELECT 'Summing Merge Tree'; +DROP TABLE IF EXISTS summing_merge_tree; +CREATE TABLE summing_merge_tree (key UInt32, val UInt32, date Datetime) ENGINE=SummingMergeTree(val) PARTITION BY date ORDER BY key; +INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, 5, '2020-01-01'), (1, 5, '2020-01-02'); +SELECT * FROM summing_merge_tree; +DROP TABLE summing_merge_tree; + +SELECT 'Aggregating Merge Tree'; +DROP TABLE IF EXISTS aggregating_merge_tree; +CREATE TABLE aggregating_merge_tree (key UInt32, val SimpleAggregateFunction(max, UInt32), date Datetime) ENGINE=AggregatingMergeTree() PARTITION BY date ORDER BY key; +INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, 5, '2020-01-01'), (1, 5, '2020-01-02'); +SELECT * FROM aggregating_merge_tree; +DROP TABLE aggregating_merge_tree; + From 4ed765700818c8735560f358fe545fe20e37656f Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Tue, 17 Nov 2020 22:07:19 +0300 Subject: [PATCH 05/32] fix typo --- .../00754_alter_modify_order_by_replicated_zookeeper.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql index b3b746c3b43..182e77ee8ad 100644 --- a/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql +++ b/tests/queries/0_stateless/00754_alter_modify_order_by_replicated_zookeeper.sql @@ -1,4 +1,4 @@ -SET optimze_on_insert = 0; +SET optimize_on_insert = 0; SET send_logs_level = 'fatal'; From cc1b16ea815059f2f72a1290085cc8ddbe3543f0 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Wed, 18 Nov 2020 13:40:58 +0300 Subject: [PATCH 06/32] Fix test --- .../01560_optimize_on_insert.reference | 6 +++--- .../0_stateless/01560_optimize_on_insert.sql | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.reference b/tests/queries/0_stateless/01560_optimize_on_insert.reference index b1f46ac9068..7ace2043be0 100644 --- a/tests/queries/0_stateless/01560_optimize_on_insert.reference +++ b/tests/queries/0_stateless/01560_optimize_on_insert.reference @@ -1,13 +1,13 @@ Replacing Merge Tree -1 2020-01-02 00:00:00 1 2020-01-01 00:00:00 +2 2020-01-02 00:00:00 Collapsing Merge Tree 1 1 2020-01-01 00:00:00 Versioned Collapsing Merge Tree 1 1 2 2020-01-01 00:00:00 Summing Merge Tree -1 6 2020-01-02 00:00:00 1 6 2020-01-01 00:00:00 +2 6 2020-01-02 00:00:00 Aggregating Merge Tree -1 5 2020-01-02 00:00:00 1 5 2020-01-01 00:00:00 +2 5 2020-01-02 00:00:00 diff --git a/tests/queries/0_stateless/01560_optimize_on_insert.sql b/tests/queries/0_stateless/01560_optimize_on_insert.sql index 6cfdda1205b..9f6dac686bb 100644 --- a/tests/queries/0_stateless/01560_optimize_on_insert.sql +++ b/tests/queries/0_stateless/01560_optimize_on_insert.sql @@ -1,35 +1,35 @@ SELECT 'Replacing Merge Tree'; DROP TABLE IF EXISTS replacing_merge_tree; CREATE TABLE replacing_merge_tree (key UInt32, date Datetime) ENGINE=ReplacingMergeTree() PARTITION BY date ORDER BY key; -INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (1, '2020-01-02'), (1, '2020-01-01'), (1, '2020-01-02'); -SELECT * FROM replacing_merge_tree; +INSERT INTO replacing_merge_tree VALUES (1, '2020-01-01'), (2, '2020-01-02'), (1, '2020-01-01'), (2, '2020-01-02'); +SELECT * FROM replacing_merge_tree ORDER BY key; DROP TABLE replacing_merge_tree; SELECT 'Collapsing Merge Tree'; DROP TABLE IF EXISTS collapsing_merge_tree; CREATE TABLE collapsing_merge_tree (key UInt32, sign Int8, date Datetime) ENGINE=CollapsingMergeTree(sign) PARTITION BY date ORDER BY key; -INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, -1, '2020-01-01'), (1, -1, '2020-01-02'), (1, 1, '2020-01-01'); -SELECT * FROM collapsing_merge_tree; +INSERT INTO collapsing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, -1, '2020-01-01'), (2, -1, '2020-01-02'), (1, 1, '2020-01-01'); +SELECT * FROM collapsing_merge_tree ORDER BY key; DROP TABLE collapsing_merge_tree; SELECT 'Versioned Collapsing Merge Tree'; DROP TABLE IF EXISTS versioned_collapsing_merge_tree; CREATE TABLE versioned_collapsing_merge_tree (key UInt32, sign Int8, version Int32, date Datetime) ENGINE=VersionedCollapsingMergeTree(sign, version) PARTITION BY date ORDER BY (key, version); INSERT INTO versioned_collapsing_merge_tree VALUES (1, 1, 1, '2020-01-01'), (1, -1, 1, '2020-01-01'), (1, 1, 2, '2020-01-01'); -SELECT * FROM versioned_collapsing_merge_tree; +SELECT * FROM versioned_collapsing_merge_tree ORDER BY key; DROP TABLE versioned_collapsing_merge_tree; SELECT 'Summing Merge Tree'; DROP TABLE IF EXISTS summing_merge_tree; CREATE TABLE summing_merge_tree (key UInt32, val UInt32, date Datetime) ENGINE=SummingMergeTree(val) PARTITION BY date ORDER BY key; -INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, 5, '2020-01-01'), (1, 5, '2020-01-02'); -SELECT * FROM summing_merge_tree; +INSERT INTO summing_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02'); +SELECT * FROM summing_merge_tree ORDER BY key; DROP TABLE summing_merge_tree; SELECT 'Aggregating Merge Tree'; DROP TABLE IF EXISTS aggregating_merge_tree; CREATE TABLE aggregating_merge_tree (key UInt32, val SimpleAggregateFunction(max, UInt32), date Datetime) ENGINE=AggregatingMergeTree() PARTITION BY date ORDER BY key; -INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (1, 1, '2020-01-02'), (1, 5, '2020-01-01'), (1, 5, '2020-01-02'); -SELECT * FROM aggregating_merge_tree; +INSERT INTO aggregating_merge_tree VALUES (1, 1, '2020-01-01'), (2, 1, '2020-01-02'), (1, 5, '2020-01-01'), (2, 5, '2020-01-02'); +SELECT * FROM aggregating_merge_tree ORDER BY key; DROP TABLE aggregating_merge_tree; From 608722b6ab143747d5ee5b8cdbc38a4eecf864a6 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 20 Nov 2020 02:59:58 +0300 Subject: [PATCH 07/32] Fix integration tests --- .../test_graphite_merge_tree/configs/users.xml | 8 ++++++++ .../test_materialize_mysql_database/configs/users.xml | 1 + tests/integration/test_row_policy/configs/users.xml | 1 + 3 files changed, 10 insertions(+) create mode 100644 tests/integration/test_graphite_merge_tree/configs/users.xml diff --git a/tests/integration/test_graphite_merge_tree/configs/users.xml b/tests/integration/test_graphite_merge_tree/configs/users.xml new file mode 100644 index 00000000000..cdd437797ce --- /dev/null +++ b/tests/integration/test_graphite_merge_tree/configs/users.xml @@ -0,0 +1,8 @@ + + + + + 0 + + + diff --git a/tests/integration/test_materialize_mysql_database/configs/users.xml b/tests/integration/test_materialize_mysql_database/configs/users.xml index f6df1c30fc4..3cb10064421 100644 --- a/tests/integration/test_materialize_mysql_database/configs/users.xml +++ b/tests/integration/test_materialize_mysql_database/configs/users.xml @@ -4,6 +4,7 @@ 1 1 + 0 diff --git a/tests/integration/test_row_policy/configs/users.xml b/tests/integration/test_row_policy/configs/users.xml index ce29b7f7308..229fc995f27 100644 --- a/tests/integration/test_row_policy/configs/users.xml +++ b/tests/integration/test_row_policy/configs/users.xml @@ -3,6 +3,7 @@ 1 + 0 From c3c32ff9f8c93a4a5b0f779b8e9161625a8f8ecb Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 20 Nov 2020 03:16:57 +0300 Subject: [PATCH 08/32] Minor change --- src/Core/SortCursor.h | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h index f8adb4a5f39..4b768ac8c70 100644 --- a/src/Core/SortCursor.h +++ b/src/Core/SortCursor.h @@ -117,10 +117,7 @@ struct SortCursorImpl } /// We need a possibility to change pos (see MergeJoin). - size_t & getPosRef() - { - return pos; - } + size_t & getPosRef() { return pos; } bool isFirst() const { return pos == 0; } bool isLast() const { return pos + 1 >= rows; } From 625962fb5950f222ebbf5dfb6a4cd430c94cb245 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 20 Nov 2020 03:19:36 +0300 Subject: [PATCH 09/32] Remove debug logging --- src/Storages/MergeTree/MergeTreeDataWriter.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index f338b4b7048..f60ad44c14d 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -204,9 +204,6 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) { - - LOG_DEBUG(log, "Apply merging algorithm on inserted data"); - size_t block_size = block.rows(); auto get_merging_algorithm = [&]() -> std::shared_ptr @@ -246,8 +243,6 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ Chunk chunk(block.getColumns(), block_size); - LOG_DEBUG(log, "chunk size before merge {}, block rows {}", chunk.getNumRows(), block_size); - IMergingAlgorithm::Input input; input.set(std::move(chunk)); input.permutation = *permutation; @@ -259,10 +254,7 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ IMergingAlgorithm::Status status = merging_algorithm->merge(); while (!status.is_finished) status = merging_algorithm->merge(); - - - LOG_DEBUG(log, "chunk size after merge {}", status.chunk.getNumRows()); - + /// Merged Block is sorted and we don't need to use permutation anymore *permutation = nullptr; From e5588b94aa960d0b7a900269d355d3ae5b17691f Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 20 Nov 2020 13:38:53 +0300 Subject: [PATCH 10/32] Fix style and tests --- src/Storages/MergeTree/MergeTreeDataWriter.cpp | 2 +- tests/integration/test_graphite_merge_tree/test.py | 3 ++- .../test_row_policy/configs/users.d/another_user.xml | 7 ++++++- tests/integration/test_row_policy/configs/users.xml | 1 - 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index f60ad44c14d..75281b68a82 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -254,7 +254,7 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ IMergingAlgorithm::Status status = merging_algorithm->merge(); while (!status.is_finished) status = merging_algorithm->merge(); - + /// Merged Block is sorted and we don't need to use permutation anymore *permutation = nullptr; diff --git a/tests/integration/test_graphite_merge_tree/test.py b/tests/integration/test_graphite_merge_tree/test.py index 502004d2dfe..ee8a18f1ca5 100644 --- a/tests/integration/test_graphite_merge_tree/test.py +++ b/tests/integration/test_graphite_merge_tree/test.py @@ -8,7 +8,8 @@ from helpers.test_tools import TSV cluster = ClickHouseCluster(__file__) instance = cluster.add_instance('instance', - main_configs=['configs/graphite_rollup.xml']) + main_configs=['configs/graphite_rollup.xml'], + user_configs=["configs/users.xml"]) q = instance.query diff --git a/tests/integration/test_row_policy/configs/users.d/another_user.xml b/tests/integration/test_row_policy/configs/users.d/another_user.xml index fb9608e5313..89e16e94c83 100644 --- a/tests/integration/test_row_policy/configs/users.d/another_user.xml +++ b/tests/integration/test_row_policy/configs/users.d/another_user.xml @@ -1,5 +1,10 @@ + + + 0 + + @@ -10,4 +15,4 @@ default - \ No newline at end of file + diff --git a/tests/integration/test_row_policy/configs/users.xml b/tests/integration/test_row_policy/configs/users.xml index 229fc995f27..ce29b7f7308 100644 --- a/tests/integration/test_row_policy/configs/users.xml +++ b/tests/integration/test_row_policy/configs/users.xml @@ -3,7 +3,6 @@ 1 - 0 From 4b58528b9ed886eea538d07249d51515a1c214a0 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 4 Dec 2020 19:25:30 +0300 Subject: [PATCH 11/32] Rename getPos to getRow, change mergeBlock, pass setting instead of context --- src/Core/SortCursor.h | 4 ++-- .../MergingSortedBlockInputStream.cpp | 2 +- src/Interpreters/MergeJoin.cpp | 14 +++++------ .../Algorithms/AggregatingSortedAlgorithm.cpp | 6 ++--- .../Algorithms/CollapsingSortedAlgorithm.cpp | 2 +- .../GraphiteRollupSortedAlgorithm.cpp | 8 +++---- .../Algorithms/MergingSortedAlgorithm.cpp | 2 +- .../Algorithms/ReplacingSortedAlgorithm.cpp | 2 +- src/Processors/Merges/Algorithms/RowRef.h | 4 ++-- .../Algorithms/SummingSortedAlgorithm.cpp | 4 ++-- .../VersionedCollapsingAlgorithm.cpp | 2 +- .../Transforms/SortingTransform.cpp | 2 +- .../MergeTree/MergeTreeBlockOutputStream.cpp | 2 +- .../MergeTree/MergeTreeBlockOutputStream.h | 6 ++--- .../MergeTree/MergeTreeDataWriter.cpp | 24 ++++++++++++------- src/Storages/MergeTree/MergeTreeDataWriter.h | 4 ++-- .../ReplicatedMergeTreeBlockOutputStream.cpp | 6 ++--- .../ReplicatedMergeTreeBlockOutputStream.h | 4 ++-- 18 files changed, 53 insertions(+), 45 deletions(-) diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h index 4b768ac8c70..d8b58e88440 100644 --- a/src/Core/SortCursor.h +++ b/src/Core/SortCursor.h @@ -109,7 +109,7 @@ struct SortCursorImpl permutation = perm; } - size_t getPos() const + size_t getRow() const { if (permutation) return (*permutation)[pos]; @@ -146,7 +146,7 @@ struct SortCursorHelper bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const { - return derived().greaterAt(rhs.derived(), impl->getPos(), rhs.impl->getPos()); + return derived().greaterAt(rhs.derived(), impl->getRow(), rhs.impl->getRow()); } /// Inverted so that the priority queue elements are removed in ascending order. diff --git a/src/DataStreams/MergingSortedBlockInputStream.cpp b/src/DataStreams/MergingSortedBlockInputStream.cpp index b546f6dcbf1..b7396a23d6a 100644 --- a/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -222,7 +222,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; // std::cerr << "Inserting row\n"; for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow()); if (out_row_sources_buf) { diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 99fa99b914b..a28a9880b76 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -182,9 +182,9 @@ public: : impl(SortCursorImpl(block, desc_)) {} - size_t position() const { return impl.getPos(); } + size_t position() const { return impl.getRow(); } size_t end() const { return impl.rows; } - bool atEnd() const { return impl.getPos() >= impl.rows; } + bool atEnd() const { return impl.getRow() >= impl.rows; } void nextN(size_t num) { impl.getPosRef() += num; } void setCompareNullability(const MergeJoinCursor & rhs) @@ -254,10 +254,10 @@ private: else if (cmp > 0) rhs.impl.next(); else if (!cmp) - return Range{impl.getPos(), rhs.impl.getPos(), getEqualLength(), rhs.getEqualLength()}; + return Range{impl.getRow(), rhs.impl.getRow(), getEqualLength(), rhs.getEqualLength()}; } - return Range{impl.getPos(), rhs.impl.getPos(), 0, 0}; + return Range{impl.getRow(), rhs.impl.getRow(), 0, 0}; } template @@ -268,7 +268,7 @@ private: const auto * left_column = impl.sort_columns[i]; const auto * right_column = rhs.impl.sort_columns[i]; - int res = nullableCompareAt(*left_column, *right_column, impl.getPos(), rhs.impl.getPos()); + int res = nullableCompareAt(*left_column, *right_column, impl.getRow(), rhs.impl.getRow()); if (res) return res; } @@ -278,11 +278,11 @@ private: /// Expects !atEnd() size_t getEqualLength() { - size_t pos = impl.getPos() + 1; + size_t pos = impl.getRow() + 1; for (; pos < impl.rows; ++pos) if (!samePrev(pos)) break; - return pos - impl.getPos(); + return pos - impl.getRow(); } /// Expects lhs_pos > 0 diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index 0ffc44c548f..6b2ee1c8039 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -257,12 +257,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR); for (auto & desc : def.columns_to_aggregate) - desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getPos()); + desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getRow()); for (auto & desc : def.columns_to_simple_aggregate) { auto & col = cursor->all_columns[desc.column_number]; - desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getPos(), arena.get()); + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getRow(), arena.get()); } } @@ -352,7 +352,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->getPos()); + merged_data.startGroup(current->all_columns, current->getRow()); } merged_data.addRow(current); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index 2ba9f7fef8b..ccb66259e2e 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -123,7 +123,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge() return Status(current.impl->order); } - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getRow()]; RowRef current_row; setRowRef(current_row, current); diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp index 59aef3b3bd6..df10fb26d40 100644 --- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp @@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() return Status(current.impl->order); } - StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getPos()); + StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getRow()); bool new_path = is_first || next_path != current_group_path; is_first = false; - time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getPos()); + time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getRow()); /// Is new key before rounding. bool is_new_key = new_path || next_row_time != current_time; @@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() /// and for rows with same maximum version - only last row. if (is_new_key || current->all_columns[columns_definition.version_column_num]->compareAt( - current->getPos(), current_subgroup_newest_row.row_num, + current->getRow(), current_subgroup_newest_row.row_num, *(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num], /* nan_direction_hint = */ 1) >= 0) { @@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule) { - merged_data.startNextGroup(cursor->all_columns, cursor->getPos(), next_rule, columns_definition); + merged_data.startNextGroup(cursor->all_columns, cursor->getRow(), next_rule, columns_definition); } void GraphiteRollupSortedAlgorithm::finishCurrentGroup() diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp index 138f19de4c2..511bd9dd74f 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp @@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->getPos(), current->rows); + merged_data.insertRow(current->all_columns, current->getRow(), current->rows); if (out_row_sources_buf) { diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp index faa15c004c7..132241844d7 100644 --- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge() if (version_column_number == -1 || selected_row.empty() || current->all_columns[version_column_number]->compareAt( - current->getPos(), selected_row.row_num, + current->getRow(), selected_row.row_num, *(*selected_row.all_columns)[version_column_number], /* nan_direction_hint = */ 1) >= 0) { diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h index 16e7a5b5cc2..e4610c88581 100644 --- a/src/Processors/Merges/Algorithms/RowRef.h +++ b/src/Processors/Merges/Algorithms/RowRef.h @@ -136,7 +136,7 @@ struct RowRef { sort_columns = cursor.impl->sort_columns.data(); num_columns = cursor.impl->sort_columns.size(); - row_num = cursor.impl->getPos(); + row_num = cursor.impl->getRow(); } static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row) @@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk void set(SortCursor & cursor, SharedChunkPtr chunk) { owned_chunk = std::move(chunk); - row_num = cursor.impl->getPos(); + row_num = cursor.impl->getRow(); all_columns = &owned_chunk->all_columns; sort_columns = &owned_chunk->sort_columns; } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index c4b35369cd6..f558a6f0677 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->getPos()); + merged_data.startGroup(current->all_columns, current->getRow()); } else - merged_data.addRow(current->all_columns, current->getPos()); + merged_data.addRow(current->all_columns, current->getRow()); if (!current->isLast()) { diff --git a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp index 707d755f980..672242b253b 100644 --- a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge() RowRef current_row; - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getRow()]; setRowRef(current_row, current); diff --git a/src/Processors/Transforms/SortingTransform.cpp b/src/Processors/Transforms/SortingTransform.cpp index 607dcd20a3a..11f23530c9e 100644 --- a/src/Processors/Transforms/SortingTransform.cpp +++ b/src/Processors/Transforms/SortingTransform.cpp @@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue) /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getRow()); ++total_merged_rows; ++merged_rows; diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 94d326ae9f1..1b5d45f0daf 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -22,7 +22,7 @@ void MergeTreeBlockOutputStream::write(const Block & block) { Stopwatch watch; - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert); storage.renameTempPartAndAdd(part, &storage.increment); PartLog::addNewPart(storage.global_context, part, watch.elapsed()); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 3b61d5fa0ef..5853d80e3c6 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -14,11 +14,11 @@ class StorageMergeTree; class MergeTreeBlockOutputStream : public IBlockOutputStream { public: - MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, const Context & context_) + MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, bool optimize_on_insert_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , max_parts_per_block(max_parts_per_block_) - , context(context_) + , optimize_on_insert(optimize_on_insert_) { } @@ -29,7 +29,7 @@ private: StorageMergeTree & storage; StorageMetadataPtr metadata_snapshot; size_t max_parts_per_block; - const Context & context; + bool optimize_on_insert; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 83c3941c943..92cc8564a35 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -202,7 +202,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(const Block & block return result; } -Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation) +Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation) { size_t block_size = block.rows(); @@ -245,23 +245,31 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ IMergingAlgorithm::Input input; input.set(std::move(chunk)); - input.permutation = *permutation; + input.permutation = permutation; IMergingAlgorithm::Inputs inputs; inputs.push_back(std::move(input)); merging_algorithm->initialize(std::move(inputs)); IMergingAlgorithm::Status status = merging_algorithm->merge(); - while (!status.is_finished) - status = merging_algorithm->merge(); + + /// Check that after first merge merging_algorithm is waiting for data from input 0. + if (status.required_source != 0) + return block; + + status = merging_algorithm->merge(); + + /// Check that merge is finished. + if (!status.is_finished) + return block; /// Merged Block is sorted and we don't need to use permutation anymore - *permutation = nullptr; + permutation = nullptr; return block.cloneWithColumns(status.chunk.getColumns()); } -MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, const Context & context) +MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert) { Block & block = block_with_partition.block; @@ -324,8 +332,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa } Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names; - if (context.getSettings().optimize_on_insert) - block = mergeBlock(block, sort_description, partition_key_columns, &perm_ptr); + if (optimize_on_insert) + block = mergeBlock(block, sort_description, partition_key_columns, perm_ptr); /// Size of part would not be greater than block.bytes() + epsilon size_t expected_size = block.bytes(); diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h index 93c15d38471..685d1adf947 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.h +++ b/src/Storages/MergeTree/MergeTreeDataWriter.h @@ -45,9 +45,9 @@ public: /** All rows must correspond to same partition. * Returns part with unique name starting with 'tmp_', yet not added to MergeTreeData. */ - MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, const Context & context); + MergeTreeData::MutableDataPartPtr writeTempPart(BlockWithPartition & block, const StorageMetadataPtr & metadata_snapshot, bool optimize_on_insert); - Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation ** permutation); + Block mergeBlock(const Block & block, SortDescription sort_description, Names & partition_key_columns, IColumn::Permutation *& permutation); private: MergeTreeData & data; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 28d96efa960..5c2c96a57d9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -41,7 +41,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( size_t max_parts_per_block_, bool quorum_parallel_, bool deduplicate_, - const Context & context_) + bool optimize_on_insert_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , quorum(quorum_) @@ -50,7 +50,7 @@ ReplicatedMergeTreeBlockOutputStream::ReplicatedMergeTreeBlockOutputStream( , quorum_parallel(quorum_parallel_) , deduplicate(deduplicate_) , log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)")) - , context(context_) + , optimize_on_insert(optimize_on_insert_) { /// The quorum value `1` has the same meaning as if it is disabled. if (quorum == 1) @@ -144,7 +144,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block) /// Write part to the filesystem under temporary name. Calculate a checksum. - MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, optimize_on_insert); String block_id; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index 7540e89f00f..3ac2c4bcfcb 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -30,7 +30,7 @@ public: size_t max_parts_per_block_, bool quorum_parallel_, bool deduplicate_, - const Context & context_); + bool optimize_on_insert); Block getHeader() const override; void writePrefix() override; @@ -73,7 +73,7 @@ private: using Logger = Poco::Logger; Poco::Logger * log; - const Context & context; + bool optimize_on_insert; }; } From 5ae6c6dab9b06acc1707db44077346e7f0208bdd Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Fri, 4 Dec 2020 20:40:28 +0300 Subject: [PATCH 12/32] Fix build error --- src/Storages/StorageMergeTree.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index ae75be19faa..7f932ae5e82 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -233,7 +233,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto const auto & settings = context.getSettingsRef(); return std::make_shared( - *this, metadata_snapshot, settings.max_partitions_per_insert_block, context); + *this, metadata_snapshot, settings.max_partitions_per_insert_block, context.getSettingsRef().optimize_on_insert); } void StorageMergeTree::checkTableCanBeDropped() const diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8ff56adaa31..a8dbe79cafa 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3861,7 +3861,7 @@ BlockOutputStreamPtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, query_settings.max_partitions_per_insert_block, query_settings.insert_quorum_parallel, deduplicate, - context); + context.getSettingsRef().optimize_on_insert); } @@ -4438,7 +4438,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition( PartsTemporaryRename renamed_parts(*this, "detached/"); MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts); - ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context); /// TODO Allow to use quorum here. + ReplicatedMergeTreeBlockOutputStream output(*this, metadata_snapshot, 0, 0, 0, false, false, false); /// TODO Allow to use quorum here. for (size_t i = 0; i < loaded_parts.size(); ++i) { String old_name = loaded_parts[i]->name; From c3a4e5e7b78ddbbca38efe46bc53e00829fa3a49 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Mon, 7 Dec 2020 01:12:54 +0300 Subject: [PATCH 13/32] Fix max_distributed_connections (one more time) With prefer_localhost_replica=1 max_distributed_connections uses max_threads before this patch. Fixes: #14936 Previous fix: #9673 --- src/Interpreters/InterpreterSelectQuery.cpp | 7 ++++++- .../01602_max_distributed_connections.reference | 0 .../01602_max_distributed_connections.sh | 15 +++++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) create mode 100644 tests/queries/0_stateless/01602_max_distributed_connections.reference create mode 100755 tests/queries/0_stateless/01602_max_distributed_connections.sh diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 43c5102fa32..742828827c4 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -1550,7 +1550,12 @@ void InterpreterSelectQuery::executeFetchColumns( throw Exception("Logical error in InterpreterSelectQuery: nowhere to read", ErrorCodes::LOGICAL_ERROR); /// Specify the number of threads only if it wasn't specified in storage. - if (!query_plan.getMaxThreads()) + /// + /// But in case of remote query and prefer_localhost_replica=1 (default) + /// The inner local query (that is done in the same process, without + /// network interaction), it will setMaxThreads earlier and distributed + /// query will not update it. + if (!query_plan.getMaxThreads() || is_remote) query_plan.setMaxThreads(max_threads_execute_query); /// Aliases in table declaration. diff --git a/tests/queries/0_stateless/01602_max_distributed_connections.reference b/tests/queries/0_stateless/01602_max_distributed_connections.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01602_max_distributed_connections.sh b/tests/queries/0_stateless/01602_max_distributed_connections.sh new file mode 100755 index 00000000000..8c19b6f5bb7 --- /dev/null +++ b/tests/queries/0_stateless/01602_max_distributed_connections.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +common_opts=( + "--format=Null" + + "--max_threads=1" + "--max_distributed_connections=3" +) + +# NOTE: the test use higher timeout to avoid flakiness. +timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=0 +timeout 9s ${CLICKHOUSE_CLIENT} "$@" "${common_opts[@]}" -q "select sleep(3) from remote('127.{1,2,3,4,5}', system.one)" --prefer_localhost_replica=1 From 291959c1b35c7fbf62cd45dafbb13741cbfb10e7 Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 8 Dec 2020 19:19:51 +0300 Subject: [PATCH 14/32] Update comment --- src/Core/SortCursor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h index d8b58e88440..f383c3ded8e 100644 --- a/src/Core/SortCursor.h +++ b/src/Core/SortCursor.h @@ -124,7 +124,7 @@ struct SortCursorImpl bool isValid() const { return pos < rows; } void next() { ++pos; } -/// Prevent using pos instead of getPos() +/// Prevent using pos instead of getRow() private: size_t pos; }; From c8c543ca9416fb193a13545d3f23d6b3a60c17c3 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Wed, 9 Dec 2020 18:07:58 +0300 Subject: [PATCH 15/32] Add throwing exceptions --- src/Interpreters/MergeJoin.cpp | 6 +++++- src/Storages/MergeTree/MergeTreeDataWriter.cpp | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index a28a9880b76..4ca70a84617 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -180,7 +180,11 @@ class MergeJoinCursor public: MergeJoinCursor(const Block & block, const SortDescription & desc_) : impl(SortCursorImpl(block, desc_)) - {} + { + /// SortCursorImpl can work with permutation, but MergeJoinCursor can't. + if (impl.permutation) + throw Exception("Logical error: MergeJoinCursor doesn't support permutation", ErrorCodes::LOGICAL_ERROR); + } size_t position() const { return impl.getRow(); } size_t end() const { return impl.rows; } diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 92cc8564a35..c93d4bceba0 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -255,13 +255,13 @@ Block MergeTreeDataWriter::mergeBlock(const Block & block, SortDescription sort_ /// Check that after first merge merging_algorithm is waiting for data from input 0. if (status.required_source != 0) - return block; + throw Exception("Logical error: required source after the first merge is not 0.", ErrorCodes::LOGICAL_ERROR); status = merging_algorithm->merge(); /// Check that merge is finished. if (!status.is_finished) - return block; + throw Exception("Logical error: merge is not finished after the second merge.", ErrorCodes::LOGICAL_ERROR); /// Merged Block is sorted and we don't need to use permutation anymore permutation = nullptr; From 3ec48cec53f32374df82825b5534f2131e2018ec Mon Sep 17 00:00:00 2001 From: Denny Crane Date: Mon, 14 Dec 2020 22:19:06 -0400 Subject: [PATCH 16/32] Update date-time-functions.md toYYYYMM translation to Russian --- .../sql-reference/functions/date-time-functions.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/ru/sql-reference/functions/date-time-functions.md b/docs/ru/sql-reference/functions/date-time-functions.md index b7a077b3bd6..31482cde77f 100644 --- a/docs/ru/sql-reference/functions/date-time-functions.md +++ b/docs/ru/sql-reference/functions/date-time-functions.md @@ -593,6 +593,18 @@ SELECT dateDiff('hour', toDateTime('2018-01-01 22:00:00'), toDateTime('2018-01-0 Например, `timeSlots(toDateTime('2012-01-01 12:20:00'), toUInt32(600)) = [toDateTime('2012-01-01 12:00:00'), toDateTime('2012-01-01 12:30:00')]`. Это нужно для поиска хитов, входящих в соответствующий визит. +## toYYYYMM + +Переводит дату или дату со временем в число типа UInt32, содержащее номер года и месяца (YYYY * 100 + MM). + +## toYYYYMMDD + +Переводит дату или дату со временем в число типа UInt32, содержащее номер года, месяца и дня (YYYY * 10000 + MM * 100 + DD). + +## toYYYYMMDDhhmmss + +Переводит дату или дату со временем в число типа UInt64 содержащее номер года, месяца, дня и время (YYYY * 10000000000 + MM * 100000000 + DD * 1000000 + hh * 10000 + mm * 100 + ss). + ## formatDateTime {#formatdatetime} Функция преобразует дату-и-время в строку по заданному шаблону. Важно: шаблон — константное выражение, поэтому использовать разные шаблоны в одной колонке не получится. From 96f22a5bcc8aef0159805337319d767c54208243 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Steve-=E9=87=91=E5=8B=87?= Date: Tue, 15 Dec 2020 18:55:52 +0800 Subject: [PATCH 17/32] Update limit.md Update translation of LIMIT , and providing more details for with ties modifier. Make document more human-readable for Chinese. --- docs/zh/sql-reference/statements/select/limit.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/zh/sql-reference/statements/select/limit.md b/docs/zh/sql-reference/statements/select/limit.md index b079248deca..d6d827552b0 100644 --- a/docs/zh/sql-reference/statements/select/limit.md +++ b/docs/zh/sql-reference/statements/select/limit.md @@ -14,7 +14,7 @@ toc_title: LIMIT ## LIMIT … WITH TIES 修饰符 {#limit-with-ties} -如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, you will get in result first `n` or `n,m` rows and all rows with same `ORDER BY` fields values equal to row at position `n` for `LIMIT n` and `m` for `LIMIT n,m`. +如果为 `LIMIT n[,m]` 设置了 `WITH TIES` ,并且声明了 `ORDER BY expr_list`, 除了得到无修饰符的结果(正常情况下的 `limit n`, 前n行数据), 还会返回与第`n`行具有相同排序字段的行(即如果第n+1行的字段与第n行 拥有相同的排序字段,同样返回该结果. 此修饰符可以与: [ORDER BY … WITH FILL modifier](../../../sql-reference/statements/select/order-by.md#orderby-with-fill) 组合使用. @@ -38,7 +38,7 @@ SELECT * FROM ( └───┘ ``` -单子执行了 `WITH TIES` 修饰符后 +添加 `WITH TIES` 修饰符后 ``` sql SELECT * FROM ( @@ -59,4 +59,8 @@ SELECT * FROM ( └───┘ ``` -cause row number 6 have same value “2” for field `n` as row number 5 +虽然指定了`LIMIT 5`, 但第6行的`n`字段值为2,与第5行相同,因此也作为满足条件的记录返回。 +简而言之,该修饰符可理解为是否增加“并列行”的数据。 + +``` sql, +``` sql From 9bb0326fc6bac0922b5a94f03fe4102745bb1e41 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 16:33:14 +0300 Subject: [PATCH 18/32] Fix database drop timeout in clickhouse-test --- tests/clickhouse-test | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 3702268819b..399cb072c65 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -15,6 +15,7 @@ from subprocess import check_call from subprocess import Popen from subprocess import PIPE from subprocess import CalledProcessError +from subprocess import TimeoutExpired from datetime import datetime from time import time, sleep from errno import ESRCH @@ -114,6 +115,7 @@ def get_db_engine(args): def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file): # print(client_options) + start_time = datetime.now() if args.database: database = args.database os.environ.setdefault("CLICKHOUSE_DATABASE", database) @@ -129,7 +131,11 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std database = 'test_{suffix}'.format(suffix=random_str()) clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) - clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args))) + try: + clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(args)), timeout=args.timeout) + except TimeoutExpired: + total_time = (datetime.now() - start_time).total_seconds() + return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time os.environ["CLICKHOUSE_DATABASE"] = database @@ -152,14 +158,18 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std # print(command) proc = Popen(command, shell=True, env=os.environ) - start_time = datetime.now() while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None: sleep(0.01) if not args.database: clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) - clickhouse_proc_create.communicate(("DROP DATABASE " + database)) + seconds_left = args.timeout - (datetime.now() - start_time).total_seconds() + try: + clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left) + except TimeoutExpired: + total_time = (datetime.now() - start_time).total_seconds() + return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time total_time = (datetime.now() - start_time).total_seconds() @@ -305,7 +315,7 @@ def run_tests_array(all_tests_with_params): if args.testname: clickhouse_proc = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) - clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite))) + clickhouse_proc.communicate(("SELECT 'Running test {suite}/{case} from pid={pid}';".format(pid = os.getpid(), case = case, suite = suite)), timeout=10) if clickhouse_proc.returncode != 0: failures += 1 @@ -330,6 +340,8 @@ def run_tests_array(all_tests_with_params): print(MSG_FAIL, end='') print_test_time(total_time) print(" - Timeout!") + if stderr: + print(stderr) else: counter = 1 while proc.returncode != 0 and need_retry(stderr): From 873d9d1e1e22dd6a6f09c6919bfa1cf68cc4a55b Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 16:34:53 +0300 Subject: [PATCH 19/32] Better --- tests/clickhouse-test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 399cb072c65..c0d2d4d5475 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -164,7 +164,7 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std if not args.database: clickhouse_proc_create = Popen(shlex.split(args.client), stdin=PIPE, stdout=PIPE, stderr=PIPE, universal_newlines=True) - seconds_left = args.timeout - (datetime.now() - start_time).total_seconds() + seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 10) try: clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left) except TimeoutExpired: From 22aba554c0fa42a8664a944e266fd69d7189e55f Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Tue, 15 Dec 2020 18:07:20 +0300 Subject: [PATCH 20/32] Revert "Fix access rights required for the merge() table function." --- src/Storages/StorageMerge.cpp | 22 +------ src/Storages/StorageMerge.h | 10 +-- src/TableFunctions/TableFunctionMerge.cpp | 65 +++++++------------ src/TableFunctions/TableFunctionMerge.h | 2 - .../__init__.py | 0 .../test.py | 55 ---------------- 6 files changed, 27 insertions(+), 127 deletions(-) delete mode 100644 tests/integration/test_table_functions_access_rights/__init__.py delete mode 100644 tests/integration/test_table_functions_access_rights/test.py diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 566ad83c370..97f4ccd0bba 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -84,22 +84,6 @@ StorageMerge::StorageMerge( setInMemoryMetadata(storage_metadata); } -StorageMerge::StorageMerge( - const StorageID & table_id_, - const ColumnsDescription & columns_, - const String & source_database_, - const Tables & tables_, - const Context & context_) - : IStorage(table_id_) - , source_database(source_database_) - , tables(tables_) - , global_context(context_.getGlobalContext()) -{ - StorageInMemoryMetadata storage_metadata; - storage_metadata.setColumns(columns_); - setInMemoryMetadata(storage_metadata); -} - template StoragePtr StorageMerge::getFirstTable(F && predicate) const { @@ -455,12 +439,8 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont e.addMessage("while getting table iterator of Merge table. Maybe caused by two Merge tables that will endlessly try to read each other's data"); throw; } - - if (tables) - return std::make_unique(*tables, source_database); - auto database = DatabaseCatalog::instance().getDatabase(source_database); - auto table_name_match = [this](const String & table_name_) { return table_name_regexp->match(table_name_); }; + auto table_name_match = [this](const String & table_name_) { return table_name_regexp.match(table_name_); }; return database->getTablesIterator(context, table_name_match); } diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index 3c322c09b36..681ea7015e7 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -48,8 +48,7 @@ public: private: String source_database; - std::optional table_name_regexp; - std::optional tables; + OptimizedRegularExpression table_name_regexp; const Context & global_context; using StorageWithLockAndName = std::tuple; @@ -76,13 +75,6 @@ protected: const String & table_name_regexp_, const Context & context_); - StorageMerge( - const StorageID & table_id_, - const ColumnsDescription & columns_, - const String & source_database_, - const Tables & source_tables_, - const Context & context_); - Pipe createSources( const StorageMetadataPtr & metadata_snapshot, SelectQueryInfo & query_info, diff --git a/src/TableFunctions/TableFunctionMerge.cpp b/src/TableFunctions/TableFunctionMerge.cpp index 5d1601c25f1..a878909e29d 100644 --- a/src/TableFunctions/TableFunctionMerge.cpp +++ b/src/TableFunctions/TableFunctionMerge.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -23,6 +22,29 @@ namespace ErrorCodes } +static NamesAndTypesList chooseColumns(const String & source_database, const String & table_name_regexp_, const Context & context) +{ + OptimizedRegularExpression table_name_regexp(table_name_regexp_); + auto table_name_match = [&](const String & table_name) { return table_name_regexp.match(table_name); }; + + StoragePtr any_table; + + { + auto database = DatabaseCatalog::instance().getDatabase(source_database); + auto iterator = database->getTablesIterator(context, table_name_match); + + if (iterator->isValid()) + if (const auto & table = iterator->table()) + any_table = table; + } + + if (!any_table) + throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: " + + table_name_regexp_, ErrorCodes::UNKNOWN_TABLE); + + return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical(); +} + void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context) { ASTs & args_func = ast_function->children; @@ -46,46 +68,9 @@ void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Conte table_name_regexp = args[1]->as().value.safeGet(); } - -const Tables & TableFunctionMerge::getMatchingTables(const Context & context) const -{ - if (tables) - return *tables; - - auto database = DatabaseCatalog::instance().getDatabase(source_database); - - OptimizedRegularExpression re(table_name_regexp); - auto table_name_match = [&](const String & table_name_) { return re.match(table_name_); }; - - auto access = context.getAccess(); - bool granted_show_on_all_tables = access->isGranted(AccessType::SHOW_TABLES, source_database); - bool granted_select_on_all_tables = access->isGranted(AccessType::SELECT, source_database); - - tables.emplace(); - for (auto it = database->getTablesIterator(context, table_name_match); it->isValid(); it->next()) - { - if (!it->table()) - continue; - bool granted_show = granted_show_on_all_tables || access->isGranted(AccessType::SHOW_TABLES, source_database, it->name()); - if (!granted_show) - continue; - if (!granted_select_on_all_tables) - access->checkAccess(AccessType::SELECT, source_database, it->name()); - tables->emplace(it->name(), it->table()); - } - - if (tables->empty()) - throw Exception("Error while executing table function merge. In database " + source_database + " no one matches regular expression: " - + table_name_regexp, ErrorCodes::UNKNOWN_TABLE); - - return *tables; -} - - ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const { - auto first_table = getMatchingTables(context).begin()->second; - return ColumnsDescription{first_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical()}; + return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)}; } StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const @@ -94,7 +79,7 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, cons StorageID(getDatabaseName(), table_name), getActualTableStructure(context), source_database, - getMatchingTables(context), + table_name_regexp, context); res->startup(); diff --git a/src/TableFunctions/TableFunctionMerge.h b/src/TableFunctions/TableFunctionMerge.h index 38ea2c22995..a9c4dd6b038 100644 --- a/src/TableFunctions/TableFunctionMerge.h +++ b/src/TableFunctions/TableFunctionMerge.h @@ -19,13 +19,11 @@ private: StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override; const char * getStorageTypeName() const override { return "Merge"; } - const Tables & getMatchingTables(const Context & context) const; ColumnsDescription getActualTableStructure(const Context & context) const override; void parseArguments(const ASTPtr & ast_function, const Context & context) override; String source_database; String table_name_regexp; - mutable std::optional tables; }; diff --git a/tests/integration/test_table_functions_access_rights/__init__.py b/tests/integration/test_table_functions_access_rights/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_table_functions_access_rights/test.py b/tests/integration/test_table_functions_access_rights/test.py deleted file mode 100644 index b986981dc22..00000000000 --- a/tests/integration/test_table_functions_access_rights/test.py +++ /dev/null @@ -1,55 +0,0 @@ -import pytest -from helpers.cluster import ClickHouseCluster -from helpers.test_tools import TSV - -cluster = ClickHouseCluster(__file__) -instance = cluster.add_instance('instance') - - -@pytest.fixture(scope="module", autouse=True) -def started_cluster(): - try: - cluster.start() - - instance.query("CREATE TABLE table1(x UInt32) ENGINE = MergeTree ORDER BY tuple()") - instance.query("CREATE TABLE table2(x UInt32) ENGINE = MergeTree ORDER BY tuple()") - instance.query("INSERT INTO table1 VALUES (1)") - instance.query("INSERT INTO table2 VALUES (2)") - - yield cluster - - finally: - cluster.shutdown() - - -@pytest.fixture(autouse=True) -def cleanup_after_test(): - try: - yield - finally: - instance.query("DROP USER IF EXISTS A") - - -def test_merge(): - select_query = "SELECT * FROM merge('default', 'table[0-9]+') ORDER BY x" - assert instance.query(select_query) == "1\n2\n" - - instance.query("CREATE USER A") - assert "it's necessary to have the grant CREATE TEMPORARY TABLE ON *.*" in instance.query_and_get_error(select_query, user = 'A') - - instance.query("GRANT CREATE TEMPORARY TABLE ON *.* TO A") - assert "no one matches regular expression" in instance.query_and_get_error(select_query, user = 'A') - - instance.query("GRANT SELECT ON default.table1 TO A") - assert instance.query(select_query, user = 'A') == "1\n" - - instance.query("GRANT SELECT ON default.* TO A") - assert instance.query(select_query, user = 'A') == "1\n2\n" - - instance.query("REVOKE SELECT ON default.table1 FROM A") - assert instance.query(select_query, user = 'A') == "2\n" - - instance.query("REVOKE ALL ON default.* FROM A") - instance.query("GRANT SELECT ON default.table1 TO A") - instance.query("GRANT INSERT ON default.table2 TO A") - assert "it's necessary to have the grant SELECT ON default.table2" in instance.query_and_get_error(select_query, user = 'A') From ccabb4680ddad1bfbbdd2550353d890136ade9da Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 18:19:56 +0300 Subject: [PATCH 21/32] Fix dependencies --- docker/images.json | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docker/images.json b/docker/images.json index f5b10a14313..e0859d3e17c 100644 --- a/docker/images.json +++ b/docker/images.json @@ -58,8 +58,7 @@ "docker/test/stateless": { "name": "yandex/clickhouse-stateless-test", "dependent": [ - "docker/test/stateful", - "docker/test/stateful_with_coverage" + "docker/test/stateful" ] }, "docker/test/stateless_pytest": { @@ -68,7 +67,9 @@ }, "docker/test/stateless_with_coverage": { "name": "yandex/clickhouse-stateless-test-with-coverage", - "dependent": [] + "dependent": [ + "docker/test/stateful_with_coverage" + ] }, "docker/test/unit": { "name": "yandex/clickhouse-unit-test", From 554fd738e9a783f1b7f69cae7b2e2a7d3f796a05 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 18:22:21 +0300 Subject: [PATCH 22/32] Remove strange line --- docker/test/stateful_with_coverage/Dockerfile | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker/test/stateful_with_coverage/Dockerfile b/docker/test/stateful_with_coverage/Dockerfile index ac6645b9463..07acf4ed4e6 100644 --- a/docker/test/stateful_with_coverage/Dockerfile +++ b/docker/test/stateful_with_coverage/Dockerfile @@ -1,8 +1,6 @@ # docker build -t yandex/clickhouse-stateful-test-with-coverage . FROM yandex/clickhouse-stateless-test-with-coverage -RUN echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-9 main" >> /etc/apt/sources.list - RUN apt-get update -y \ && env DEBIAN_FRONTEND=noninteractive \ apt-get install --yes --no-install-recommends \ From 01286a6a095d9abe6b77c0d61514d1a979be746a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 15 Dec 2020 19:18:28 +0300 Subject: [PATCH 23/32] Update libunwind. --- contrib/libunwind | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/libunwind b/contrib/libunwind index 51b84d9b6d2..8fe25d7dc70 160000 --- a/contrib/libunwind +++ b/contrib/libunwind @@ -1 +1 @@ -Subproject commit 51b84d9b6d2548f1cbdcafe622d5a753853b6149 +Subproject commit 8fe25d7dc70f2a4ea38c3e5a33fa9d4199b67a5a From 2e4aa6b9e113afdac58d4b5b41069b16ec6fb33e Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 19:20:09 +0300 Subject: [PATCH 24/32] Kill not only database proc --- tests/clickhouse-test | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/clickhouse-test b/tests/clickhouse-test index c0d2d4d5475..4eff8351e66 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -168,7 +168,13 @@ def run_single_test(args, ext, server_logs_level, client_options, case_file, std try: clickhouse_proc_create.communicate(("DROP DATABASE " + database), timeout=seconds_left) except TimeoutExpired: - total_time = (datetime.now() - start_time).total_seconds() + # kill test process because it can also hung + if proc.returncode is None: + try: + proc.kill() + except OSError as e: + if e.errno != ESRCH: + raise return clickhouse_proc_create, "", "Timeout dropping database {} after test".format(database), total_time total_time = (datetime.now() - start_time).total_seconds() From 86fc9ea4df3efaaef44b48d1504ef6a9599bec34 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 15 Dec 2020 23:18:53 +0300 Subject: [PATCH 25/32] Update base image to 20.04 --- docker/test/base/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/test/base/Dockerfile b/docker/test/base/Dockerfile index 61a40673a96..e8653c2122e 100644 --- a/docker/test/base/Dockerfile +++ b/docker/test/base/Dockerfile @@ -1,5 +1,5 @@ # docker build -t yandex/clickhouse-test-base . -FROM ubuntu:19.10 +FROM ubuntu:20.04 ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=11 From eed767bbab573f6f72d69f93ddc2c577539bfab2 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Wed, 16 Dec 2020 00:02:44 +0300 Subject: [PATCH 26/32] Update tips.md --- docs/en/operations/tips.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/en/operations/tips.md b/docs/en/operations/tips.md index 024eba1c899..e62dea0b04e 100644 --- a/docs/en/operations/tips.md +++ b/docs/en/operations/tips.md @@ -91,6 +91,23 @@ The Linux kernel prior to 3.2 had a multitude of problems with IPv6 implementati Use at least a 10 GB network, if possible. 1 Gb will also work, but it will be much worse for patching replicas with tens of terabytes of data, or for processing distributed queries with a large amount of intermediate data. +## Hypervisor configuration + +If you are using OpenStack, set +``` +cpu_mode=host-passthrough +``` +in nova.conf. + +If you are using libvirt, set +``` + +``` +in XML configuration. + +This is important for ClickHouse to be able to get correct information with `cpuid` instruction. +Otherwise you may get `Illegal instruction` crashes when hypervisor is run on old CPU models. + ## ZooKeeper {#zookeeper} You are probably already using ZooKeeper for other purposes. You can use the same installation of ZooKeeper, if it isn’t already overloaded. From ca9ccf4850208d617085fa150b69b5ef35633925 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 16 Dec 2020 06:03:43 +0300 Subject: [PATCH 27/32] Comment update --- src/Storages/StorageReplicatedMergeTree.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f1094793944..d396f32dcca 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -39,7 +39,8 @@ namespace DB * - the structure of the table (/metadata, /columns) * - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...); * - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host); - * - select the leader replica (/leader_election) - this is the replica that assigns the merge; + * - select the leader replica (/leader_election) - these are the replicas that assigning merges, mutations and partition manipulations + * (after ClickHouse version 20.5 we allow multiple leaders to act concurrently); * - a set of parts of data on each replica (/replicas/replica_name/parts); * - list of the last N blocks of data with checksum, for deduplication (/blocks); * - the list of incremental block numbers (/block_numbers) that we are about to insert, From 88c5031dfa14f7805f0aa3a8e3570efc3cac4da7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 16 Dec 2020 06:27:28 +0300 Subject: [PATCH 28/32] Add a commit #16595 --- src/Storages/MergeTree/SimpleMergeSelector.h | 77 ++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/src/Storages/MergeTree/SimpleMergeSelector.h b/src/Storages/MergeTree/SimpleMergeSelector.h index fe57c40320a..d174749300b 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/SimpleMergeSelector.h @@ -3,6 +3,83 @@ #include +/** +We have a set of data parts that is dynamically changing - new data parts are added and there is background merging process. +Background merging process periodically selects continuous range of data parts to merge. + +It tries to optimize the following metrics: +1. Write amplification: total amount of data written on disk (source data + merges) relative to the amount of source data. +It can be also considered as the total amount of work for merges. +2. The number of data parts in the set at the random moment of time (average + quantiles). + +Also taking the following considerations: +1. Older data parts should be merged less frequently than newer data parts. +2. Larger data parts should be merged less frequently than smaller data parts. +3. If no new parts arrive, we should continue to merge existing data parts to eventually optimize the table. +4. Never allow too many parts, because it will slow down SELECT queries significantly. +5. Multiple background merges can run concurrently but not too many. + +It is not possible to optimize both metrics, because they contradict to each other. +To lower the number of parts we can merge eagerly but write amplification will increase. +Then we need some balance between optimization of these two metrics. + +But some optimizations may improve both metrics. + +For example, we can look at the "merge tree" - the tree of data parts that were merged. +If the tree is perfectly balanced then its depth is proportonal to the log(data size), +the total amount of work is proportional to data_size * log(data_size) +and the write amplification is proportional to log(data_size). +If it's not balanced (e.g. every new data part is always merged with existing data parts), +its depth is proportional to the data size, total amount of work is proportional to data_size^2. + +We can also control the "base of the logarithm" - you can look it as the number of data parts +that are merged at once (the tree "arity"). But as the data parts are of different size, we should generalize it: +calculate the ratio between total size of merged parts to the size of the largest part participated in merge. +For example, if we merge 4 parts of size 5, 3, 1, 1 - then "base" will be 2 (10 / 5). + +Base of the logarithm (simply called `base` in `SimpleMergeSelector`) is the main knob to control the write amplification. +The more it is, the less is write amplification but we will have more data parts on average. + +To fit all the considerations, we also adjust `base` depending on total parts count, +parts size and parts age, with linear interpolation (then `base` is not a constant but a function of multiple variables, +looking like a section of hyperplane). + + +Then we apply the algorithm to select the optimal range of data parts to merge. +There is a proof that this algorithm is optimal if we look in the future only by single step. + +The best range of data parts is selected. + +We also apply some tunes: +- there is a fixed const of merging small parts (that is added to the size of data part before all estimations); +- there are some heuristics to "stick" ranges to large data parts. + +It's still unclear if this algorithm is good or optimal at all. It's unclear if this algorithm is using the optimal coefficients. + +To test and optimize SimpleMergeSelector, we apply the following methods: +- insert/merge simulator: a model simulating parts insertion and merging; + merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm; +- insert/merge simulator on real `system.part_log` from production - it gives realistic information about inserted data parts: + their sizes, at what time intervals they are inserted; + +There is a research thesis dedicated to optimization of merge algorithm: +https://presentations.clickhouse.tech/hse_2019/merge_algorithm.pptx + +This work made attempt to variate the coefficients in SimpleMergeSelector and to solve the optimization task: +maybe some change in coefficients will give a clear win on all metrics. Surprisingly enough, it has found +that our selection of coefficients is near optimal. It has found slightly more optimal coefficients, +but I decided not to use them, because the representativeness of the test data is in question. + +This work did not make any attempt to propose any other algorithm. +This work did not make any attempt to analyze the task with analytical methods. +That's why I still believe that there are many opportunities to optimize the merge selection algorithm. + +Please do not mix the task with a similar task in other LSM-based systems (like RocksDB). +Their problem statement is subtly different. Our set of data parts is consisted of data parts +that are completely independent in stored data. Ranges of primary keys in data parts can intersect. +When doing SELECT we read from all data parts. INSERTed data parts comes with unknown size... +*/ + namespace DB { From 62ce33153a45bc3355555cc6c3a75920425fa4b2 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 16 Dec 2020 06:34:25 +0300 Subject: [PATCH 29/32] Added valuable comment --- tests/integration/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/integration/README.md b/tests/integration/README.md index 0886dc2cfac..cea1bd6f893 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -135,3 +135,13 @@ named `test.py` containing tests in it. All functions with names starting with ` To assert that two TSV files must be equal, wrap them in the `TSV` class and use the regular `assert` statement. Example: `assert TSV(result) == TSV(reference)`. In case the assertion fails, `pytest` will automagically detect the types of variables and only the small diff of two files is printed. + +### Troubleshooting + +If tests failing for misterious reasons, this may help: + +``` +sudo service docker stop +sudo bash -c 'rm -rf /var/lib/docker/*' +sudo service docker start +``` From 4bd5d3b662f642d38158482a285366f318711625 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 16 Dec 2020 06:36:43 +0300 Subject: [PATCH 30/32] Comment update --- src/Storages/MergeTree/SimpleMergeSelector.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/SimpleMergeSelector.h b/src/Storages/MergeTree/SimpleMergeSelector.h index d174749300b..ebd01939876 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/SimpleMergeSelector.h @@ -58,9 +58,9 @@ It's still unclear if this algorithm is good or optimal at all. It's unclear if To test and optimize SimpleMergeSelector, we apply the following methods: - insert/merge simulator: a model simulating parts insertion and merging; - merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm; + merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm; - insert/merge simulator on real `system.part_log` from production - it gives realistic information about inserted data parts: - their sizes, at what time intervals they are inserted; + their sizes, at what time intervals they are inserted; There is a research thesis dedicated to optimization of merge algorithm: https://presentations.clickhouse.tech/hse_2019/merge_algorithm.pptx From cf35e144ff65c8e7ea16579dcb6451f46224214a Mon Sep 17 00:00:00 2001 From: Chienlung Cheung Date: Wed, 16 Dec 2020 13:55:17 +0800 Subject: [PATCH 31/32] correct the unit from Kb to KB in the last section. --- docs/en/introduction/performance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/introduction/performance.md b/docs/en/introduction/performance.md index d2780aedccb..6e5710347a1 100644 --- a/docs/en/introduction/performance.md +++ b/docs/en/introduction/performance.md @@ -25,6 +25,6 @@ Under the same conditions, ClickHouse can handle several hundred queries per sec ## Performance When Inserting Data {#performance-when-inserting-data} -We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 Kb in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly. +We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second. When inserting to a MergeTree table from a tab-separated dump, the insertion speed can be from 50 to 200 MB/s. If the inserted rows are around 1 KB in size, the speed will be from 50,000 to 200,000 rows per second. If the rows are small, the performance can be higher in rows per second (on Banner System data -`>` 500,000 rows per second; on Graphite data -`>` 1,000,000 rows per second). To improve performance, you can make multiple INSERT queries in parallel, which scales linearly. {## [Original article](https://clickhouse.tech/docs/en/introduction/performance/) ##} From 37363ae89dd70abb8e23b6bf52bdc248c9f8bbba Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 16 Dec 2020 09:43:05 +0300 Subject: [PATCH 32/32] Comment update --- src/Storages/MergeTree/SimpleMergeSelector.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/SimpleMergeSelector.h b/src/Storages/MergeTree/SimpleMergeSelector.h index ebd01939876..4f277ad74cd 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/SimpleMergeSelector.h @@ -58,9 +58,9 @@ It's still unclear if this algorithm is good or optimal at all. It's unclear if To test and optimize SimpleMergeSelector, we apply the following methods: - insert/merge simulator: a model simulating parts insertion and merging; - merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm; +merge selecting algorithm is applied and the relevant metrics are calculated to allow to tune the algorithm; - insert/merge simulator on real `system.part_log` from production - it gives realistic information about inserted data parts: - their sizes, at what time intervals they are inserted; +their sizes, at what time intervals they are inserted; There is a research thesis dedicated to optimization of merge algorithm: https://presentations.clickhouse.tech/hse_2019/merge_algorithm.pptx