diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 580756361b1..ca5ec847efd 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -396,6 +396,7 @@ class IColumn; M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \ M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \ M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \ + M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/src/Core/SortCursor.h b/src/Core/SortCursor.h index 7a222f70199..f8adb4a5f39 100644 --- a/src/Core/SortCursor.h +++ b/src/Core/SortCursor.h @@ -30,7 +30,6 @@ struct SortCursorImpl ColumnRawPtrs all_columns; SortDescription desc; size_t sort_columns_size = 0; - size_t pos = 0; size_t rows = 0; /** Determines order if comparing columns are equal. @@ -49,15 +48,20 @@ struct SortCursorImpl /** Is there at least one column with Collator. */ bool has_collation = false; + /** We could use SortCursorImpl in case when columns aren't sorted + * but we have their sorted permutation + */ + IColumn::Permutation * permutation = nullptr; + SortCursorImpl() {} - SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0) + SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr) : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) { - reset(block); + reset(block, perm); } - SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0) + SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr) : desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()) { for (auto & column_desc : desc) @@ -66,19 +70,19 @@ struct SortCursorImpl throw Exception("SortDescription should contain column position if SortCursor was used without header.", ErrorCodes::LOGICAL_ERROR); } - reset(columns, {}); + reset(columns, {}, perm); } bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset(const Block & block) + void reset(const Block & block, IColumn::Permutation * perm = nullptr) { - reset(block.getColumns(), block); + reset(block.getColumns(), block, perm); } /// Set the cursor to the beginning of the new block. - void reset(const Columns & columns, const Block & block) + void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr) { all_columns.clear(); sort_columns.clear(); @@ -96,18 +100,36 @@ struct SortCursorImpl : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); - need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String) + need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); has_collation |= need_collation[j]; } pos = 0; rows = all_columns[0]->size(); + permutation = perm; + } + + size_t getPos() const + { + if (permutation) + return (*permutation)[pos]; + return pos; + } + + /// We need a possibility to change pos (see MergeJoin). + size_t & getPosRef() + { + return pos; } bool isFirst() const { return pos == 0; } bool isLast() const { return pos + 1 >= rows; } bool isValid() const { return pos < rows; } void next() { ++pos; } + +/// Prevent using pos instead of getPos() +private: + size_t pos; }; using SortCursorImpls = std::vector; @@ -127,7 +149,7 @@ struct SortCursorHelper bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const { - return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos); + return derived().greaterAt(rhs.derived(), impl->getPos(), rhs.impl->getPos()); } /// Inverted so that the priority queue elements are removed in ascending order. diff --git a/src/DataStreams/MergingSortedBlockInputStream.cpp b/src/DataStreams/MergingSortedBlockInputStream.cpp index 9c9213ef3cc..2cb87b07dba 100644 --- a/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -224,7 +224,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort // std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; // std::cerr << "Inserting row\n"; for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); if (out_row_sources_buf) { diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 665ec4d60f3..99fa99b914b 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -182,10 +182,10 @@ public: : impl(SortCursorImpl(block, desc_)) {} - size_t position() const { return impl.pos; } + size_t position() const { return impl.getPos(); } size_t end() const { return impl.rows; } - bool atEnd() const { return impl.pos >= impl.rows; } - void nextN(size_t num) { impl.pos += num; } + bool atEnd() const { return impl.getPos() >= impl.rows; } + void nextN(size_t num) { impl.getPosRef() += num; } void setCompareNullability(const MergeJoinCursor & rhs) { @@ -254,10 +254,10 @@ private: else if (cmp > 0) rhs.impl.next(); else if (!cmp) - return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()}; + return Range{impl.getPos(), rhs.impl.getPos(), getEqualLength(), rhs.getEqualLength()}; } - return Range{impl.pos, rhs.impl.pos, 0, 0}; + return Range{impl.getPos(), rhs.impl.getPos(), 0, 0}; } template @@ -268,7 +268,7 @@ private: const auto * left_column = impl.sort_columns[i]; const auto * right_column = rhs.impl.sort_columns[i]; - int res = nullableCompareAt(*left_column, *right_column, impl.pos, rhs.impl.pos); + int res = nullableCompareAt(*left_column, *right_column, impl.getPos(), rhs.impl.getPos()); if (res) return res; } @@ -278,11 +278,11 @@ private: /// Expects !atEnd() size_t getEqualLength() { - size_t pos = impl.pos + 1; + size_t pos = impl.getPos() + 1; for (; pos < impl.rows; ++pos) if (!samePrev(pos)) break; - return pos - impl.pos; + return pos - impl.getPos(); } /// Expects lhs_pos > 0 diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp index b834ed82729..cb254b54dd3 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.cpp @@ -239,12 +239,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR); for (auto & desc : def.columns_to_aggregate) - desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos); + desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getPos()); for (auto & desc : def.columns_to_simple_aggregate) { auto & col = cursor->all_columns[desc.column_number]; - desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get()); + desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getPos(), arena.get()); } } @@ -334,7 +334,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->pos); + merged_data.startGroup(current->all_columns, current->getPos()); } merged_data.addRow(current); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp index e7a7200ac34..24e1d1d40a8 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.cpp @@ -13,11 +13,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int INCORRECT_DATA; -} - CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( const Block & header, size_t num_inputs, @@ -25,9 +20,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm( const String & sign_column, bool only_positive_sign_, size_t max_block_size, + Poco::Logger * log_, WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes, - Poco::Logger * log_) + bool use_average_block_sizes) : IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs) , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , sign_column_number(header.getPositionByName(sign_column)) @@ -123,7 +118,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge() return Status(current.impl->order); } - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; RowRef current_row; setRowRef(current_row, current); diff --git a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h index d95fac2f02b..028715f715b 100644 --- a/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h @@ -33,9 +33,9 @@ public: const String & sign_column, bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0. size_t max_block_size, - WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes, - Poco::Logger * log_); + Poco::Logger * log_, + WriteBuffer * out_row_sources_buf_ = nullptr, + bool use_average_block_sizes = false); Status merge() override; diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp index 0e704e5a05b..59aef3b3bd6 100644 --- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.cpp @@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() return Status(current.impl->order); } - StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos); + StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getPos()); bool new_path = is_first || next_path != current_group_path; is_first = false; - time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos); + time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getPos()); /// Is new key before rounding. bool is_new_key = new_path || next_row_time != current_time; @@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() /// and for rows with same maximum version - only last row. if (is_new_key || current->all_columns[columns_definition.version_column_num]->compareAt( - current->pos, current_subgroup_newest_row.row_num, + current->getPos(), current_subgroup_newest_row.row_num, *(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num], /* nan_direction_hint = */ 1) >= 0) { @@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge() void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule) { - merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition); + merged_data.startNextGroup(cursor->all_columns, cursor->getPos(), next_rule, columns_definition); } void GraphiteRollupSortedAlgorithm::finishCurrentGroup() diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h index f86be2a7d1b..5c8d18875e7 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h @@ -29,6 +29,8 @@ public: /// between different algorithm objects in parallel FINAL. bool skip_last_row = false; + IColumn::Permutation * permutation = nullptr; + void swap(Input & other) { chunk.swap(other.chunk); diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp index 0b13d689636..e4c60d7609c 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.cpp @@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs) if (!current_inputs[source_num].chunk) continue; - cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num); + cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation); } queue = SortingHeap(cursors); @@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); current_input.swap(input); - cursors[source_num].reset(current_input.chunk.getColumns(), {}); + cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation); queue.push(cursors[source_num]); } diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp index 39abe5c0ec7..97abffdc167 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.cpp @@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs) source.skip_last_row = inputs[source_num].skip_last_row; source.chunk = chunk_allocator.alloc(inputs[source_num].chunk); - cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num); + cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation); source.chunk->all_columns = cursors[source_num].all_columns; source.chunk->sort_columns = cursors[source_num].sort_columns; @@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num auto & source = sources[source_num]; source.skip_last_row = input.skip_last_row; source.chunk = chunk_allocator.alloc(input.chunk); - cursors[source_num].reset(source.chunk->getColumns(), {}); + cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation); source.chunk->all_columns = cursors[source_num].all_columns; source.chunk->sort_columns = cursors[source_num].sort_columns; diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp index ee13ef70203..138f19de4c2 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.cpp @@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue //std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n"; //std::cerr << "Inserting row\n"; - merged_data.insertRow(current->all_columns, current->pos, current->rows); + merged_data.insertRow(current->all_columns, current->getPos(), current->rows); if (out_row_sources_buf) { diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h index 531b2636747..63dced26dd4 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h @@ -18,9 +18,9 @@ public: size_t num_inputs, SortDescription description_, size_t max_block_size, - UInt64 limit_, - WriteBuffer * out_row_sources_buf_, - bool use_average_block_sizes); + UInt64 limit_ = 0, + WriteBuffer * out_row_sources_buf_ = nullptr, + bool use_average_block_sizes = false); void addInput(); diff --git a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp index 3ee0df0efd8..faa15c004c7 100644 --- a/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/ReplacingSortedAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge() if (version_column_number == -1 || selected_row.empty() || current->all_columns[version_column_number]->compareAt( - current->pos, selected_row.row_num, + current->getPos(), selected_row.row_num, *(*selected_row.all_columns)[version_column_number], /* nan_direction_hint = */ 1) >= 0) { diff --git a/src/Processors/Merges/Algorithms/RowRef.h b/src/Processors/Merges/Algorithms/RowRef.h index 1b4da9781f8..16e7a5b5cc2 100644 --- a/src/Processors/Merges/Algorithms/RowRef.h +++ b/src/Processors/Merges/Algorithms/RowRef.h @@ -136,7 +136,7 @@ struct RowRef { sort_columns = cursor.impl->sort_columns.data(); num_columns = cursor.impl->sort_columns.size(); - row_num = cursor.impl->pos; + row_num = cursor.impl->getPos(); } static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row) @@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk void set(SortCursor & cursor, SharedChunkPtr chunk) { owned_chunk = std::move(chunk); - row_num = cursor.impl->pos; + row_num = cursor.impl->getPos(); all_columns = &owned_chunk->all_columns; sort_columns = &owned_chunk->sort_columns; } diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp index 17e5e4364ff..c4b35369cd6 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.cpp @@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge() return Status(merged_data.pull()); } - merged_data.startGroup(current->all_columns, current->pos); + merged_data.startGroup(current->all_columns, current->getPos()); } else - merged_data.addRow(current->all_columns, current->pos); + merged_data.addRow(current->all_columns, current->getPos()); if (!current->isLast()) { diff --git a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp index 6273bd28371..707d755f980 100644 --- a/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.cpp @@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge() RowRef current_row; - Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->pos]; + Int8 sign = assert_cast(*current->all_columns[sign_column_number]).getData()[current->getPos()]; setRowRef(current_row, current); diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 4e65504a101..9e6bd306eee 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -27,9 +27,9 @@ public: sign_column, only_positive_sign, max_block_size, + &Poco::Logger::get("CollapsingSortedTransform"), out_row_sources_buf_, - use_average_block_sizes, - &Poco::Logger::get("CollapsingSortedTransform")) + use_average_block_sizes) { } diff --git a/src/Processors/Transforms/SortingTransform.cpp b/src/Processors/Transforms/SortingTransform.cpp index 03c8e87ce7a..607dcd20a3a 100644 --- a/src/Processors/Transforms/SortingTransform.cpp +++ b/src/Processors/Transforms/SortingTransform.cpp @@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue) /// Append a row from queue. for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insertFrom(*current->all_columns[i], current->pos); + merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos()); ++total_merged_rows; ++merged_rows; diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 40714e5af31..2de68c802b9 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -2,26 +2,113 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Block MergeTreeBlockOutputStream::getHeader() const { return metadata_snapshot->getSampleBlock(); } +Block MergeTreeBlockOutputStream::mergeBlock(const Block & block) +{ + /// Get the information needed for merging algorithms + size_t block_size = block.rows(); + Names sort_columns = metadata_snapshot->getSortingKeyColumns(); + SortDescription sort_description; + size_t sort_columns_size = sort_columns.size(); + sort_description.reserve(sort_columns_size); + + for (size_t i = 0; i < sort_columns_size; ++i) + sort_description.emplace_back(getHeader().getPositionByName(sort_columns[i]), 1, 1); + + auto get_merging_algorithm = [&]() -> std::shared_ptr + { + switch (storage.merging_params.mode) + { + /// There is nothing to merge in single block in ordinary MergeTree + case MergeTreeData::MergingParams::Ordinary: + return nullptr; + case MergeTreeData::MergingParams::Replacing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.version_column, block_size); + case MergeTreeData::MergingParams::Collapsing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.sign_column, + false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream")); + case MergeTreeData::MergingParams::Summing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.columns_to_sum, + metadata_snapshot->getPartitionKey().column_names, block_size); + case MergeTreeData::MergingParams::Aggregating: + return std::make_shared(getHeader(), 1, sort_description, block_size); + case MergeTreeData::MergingParams::VersionedCollapsing: + return std::make_shared( + getHeader(), 1, sort_description, storage.merging_params.sign_column, block_size); + case MergeTreeData::MergingParams::Graphite: + return std::make_shared( + getHeader(), 1, sort_description, block_size, storage.merging_params.graphite_params, time(nullptr)); + } + + __builtin_unreachable(); + }; + + auto merging_algorithm = get_merging_algorithm(); + if (!merging_algorithm) + return block; + + /// Merging algorithms works with inputs containing sorted chunks, so we need to get a sorted permutation + /// of the block, convert the block to a chunk and construct an input from it + IColumn::Permutation permutation; + stableGetPermutation(block, sort_description, permutation); + + Chunk chunk(block.getColumns(), block_size); + + IMergingAlgorithm::Input input; + input.set(std::move(chunk)); + input.permutation = &permutation; + + IMergingAlgorithm::Inputs inputs; + inputs.push_back(std::move(input)); + merging_algorithm->initialize(std::move(inputs)); + + IMergingAlgorithm::Status status = merging_algorithm->merge(); + while (!status.is_finished) + status = merging_algorithm->merge(); + + return block.cloneWithColumns(status.chunk.getColumns()); +} + void MergeTreeBlockOutputStream::write(const Block & block) { storage.delayInsertOrThrowIfNeeded(); + auto settings = context.getSettings(); + auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot); for (auto & current_block : part_blocks) { Stopwatch watch; + if (settings.optimize_on_insert) + current_block.block = mergeBlock(current_block.block); + MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot); storage.renameTempPartAndAdd(part, &storage.increment); diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h index 8aae7f3e625..1024bd72fa2 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.h +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.h @@ -14,20 +14,23 @@ class StorageMergeTree; class MergeTreeBlockOutputStream : public IBlockOutputStream { public: - MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_) + MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, const Context & context_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) , max_parts_per_block(max_parts_per_block_) + , context(context_) { } Block getHeader() const override; void write(const Block & block) override; + Block mergeBlock(const Block & block); private: StorageMergeTree & storage; StorageMetadataPtr metadata_snapshot; size_t max_parts_per_block; + const Context & context; }; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 925f510f63a..cb3f425aba4 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -232,7 +232,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto const auto & settings = context.getSettingsRef(); return std::make_shared( - *this, metadata_snapshot, settings.max_partitions_per_insert_block); + *this, metadata_snapshot, settings.max_partitions_per_insert_block, context); } void StorageMergeTree::checkTableCanBeDropped() const