mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Add setting optimize_on_insert
This commit is contained in:
parent
637e3dc2c2
commit
8d5e0784d3
@ -396,6 +396,7 @@ class IColumn;
|
||||
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
|
||||
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
|
||||
M(Bool, aggregate_functions_null_for_empty, false, "Rewrite all aggregate functions in a query, adding -OrNull suffix to them", 0) \
|
||||
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
|
||||
\
|
||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||
\
|
||||
|
@ -30,7 +30,6 @@ struct SortCursorImpl
|
||||
ColumnRawPtrs all_columns;
|
||||
SortDescription desc;
|
||||
size_t sort_columns_size = 0;
|
||||
size_t pos = 0;
|
||||
size_t rows = 0;
|
||||
|
||||
/** Determines order if comparing columns are equal.
|
||||
@ -49,15 +48,20 @@ struct SortCursorImpl
|
||||
/** Is there at least one column with Collator. */
|
||||
bool has_collation = false;
|
||||
|
||||
/** We could use SortCursorImpl in case when columns aren't sorted
|
||||
* but we have their sorted permutation
|
||||
*/
|
||||
IColumn::Permutation * permutation = nullptr;
|
||||
|
||||
SortCursorImpl() {}
|
||||
|
||||
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
|
||||
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
|
||||
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
|
||||
{
|
||||
reset(block);
|
||||
reset(block, perm);
|
||||
}
|
||||
|
||||
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0)
|
||||
SortCursorImpl(const Columns & columns, const SortDescription & desc_, size_t order_ = 0, IColumn::Permutation * perm = nullptr)
|
||||
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size())
|
||||
{
|
||||
for (auto & column_desc : desc)
|
||||
@ -66,19 +70,19 @@ struct SortCursorImpl
|
||||
throw Exception("SortDescription should contain column position if SortCursor was used without header.",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
reset(columns, {});
|
||||
reset(columns, {}, perm);
|
||||
}
|
||||
|
||||
bool empty() const { return rows == 0; }
|
||||
|
||||
/// Set the cursor to the beginning of the new block.
|
||||
void reset(const Block & block)
|
||||
void reset(const Block & block, IColumn::Permutation * perm = nullptr)
|
||||
{
|
||||
reset(block.getColumns(), block);
|
||||
reset(block.getColumns(), block, perm);
|
||||
}
|
||||
|
||||
/// Set the cursor to the beginning of the new block.
|
||||
void reset(const Columns & columns, const Block & block)
|
||||
void reset(const Columns & columns, const Block & block, IColumn::Permutation * perm = nullptr)
|
||||
{
|
||||
all_columns.clear();
|
||||
sort_columns.clear();
|
||||
@ -96,18 +100,36 @@ struct SortCursorImpl
|
||||
: column_desc.column_number;
|
||||
sort_columns.push_back(columns[column_number].get());
|
||||
|
||||
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported(); /// TODO Nullable(String)
|
||||
need_collation[j] = desc[j].collator != nullptr && sort_columns.back()->isCollationSupported();
|
||||
has_collation |= need_collation[j];
|
||||
}
|
||||
|
||||
pos = 0;
|
||||
rows = all_columns[0]->size();
|
||||
permutation = perm;
|
||||
}
|
||||
|
||||
size_t getPos() const
|
||||
{
|
||||
if (permutation)
|
||||
return (*permutation)[pos];
|
||||
return pos;
|
||||
}
|
||||
|
||||
/// We need a possibility to change pos (see MergeJoin).
|
||||
size_t & getPosRef()
|
||||
{
|
||||
return pos;
|
||||
}
|
||||
|
||||
bool isFirst() const { return pos == 0; }
|
||||
bool isLast() const { return pos + 1 >= rows; }
|
||||
bool isValid() const { return pos < rows; }
|
||||
void next() { ++pos; }
|
||||
|
||||
/// Prevent using pos instead of getPos()
|
||||
private:
|
||||
size_t pos;
|
||||
};
|
||||
|
||||
using SortCursorImpls = std::vector<SortCursorImpl>;
|
||||
@ -127,7 +149,7 @@ struct SortCursorHelper
|
||||
|
||||
bool ALWAYS_INLINE greater(const SortCursorHelper & rhs) const
|
||||
{
|
||||
return derived().greaterAt(rhs.derived(), impl->pos, rhs.impl->pos);
|
||||
return derived().greaterAt(rhs.derived(), impl->getPos(), rhs.impl->getPos());
|
||||
}
|
||||
|
||||
/// Inverted so that the priority queue elements are removed in ascending order.
|
||||
|
@ -224,7 +224,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, TSort
|
||||
// std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
// std::cerr << "Inserting row\n";
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos());
|
||||
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
|
@ -182,10 +182,10 @@ public:
|
||||
: impl(SortCursorImpl(block, desc_))
|
||||
{}
|
||||
|
||||
size_t position() const { return impl.pos; }
|
||||
size_t position() const { return impl.getPos(); }
|
||||
size_t end() const { return impl.rows; }
|
||||
bool atEnd() const { return impl.pos >= impl.rows; }
|
||||
void nextN(size_t num) { impl.pos += num; }
|
||||
bool atEnd() const { return impl.getPos() >= impl.rows; }
|
||||
void nextN(size_t num) { impl.getPosRef() += num; }
|
||||
|
||||
void setCompareNullability(const MergeJoinCursor & rhs)
|
||||
{
|
||||
@ -254,10 +254,10 @@ private:
|
||||
else if (cmp > 0)
|
||||
rhs.impl.next();
|
||||
else if (!cmp)
|
||||
return Range{impl.pos, rhs.impl.pos, getEqualLength(), rhs.getEqualLength()};
|
||||
return Range{impl.getPos(), rhs.impl.getPos(), getEqualLength(), rhs.getEqualLength()};
|
||||
}
|
||||
|
||||
return Range{impl.pos, rhs.impl.pos, 0, 0};
|
||||
return Range{impl.getPos(), rhs.impl.getPos(), 0, 0};
|
||||
}
|
||||
|
||||
template <bool left_nulls, bool right_nulls>
|
||||
@ -268,7 +268,7 @@ private:
|
||||
const auto * left_column = impl.sort_columns[i];
|
||||
const auto * right_column = rhs.impl.sort_columns[i];
|
||||
|
||||
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.pos, rhs.impl.pos);
|
||||
int res = nullableCompareAt<left_nulls, right_nulls>(*left_column, *right_column, impl.getPos(), rhs.impl.getPos());
|
||||
if (res)
|
||||
return res;
|
||||
}
|
||||
@ -278,11 +278,11 @@ private:
|
||||
/// Expects !atEnd()
|
||||
size_t getEqualLength()
|
||||
{
|
||||
size_t pos = impl.pos + 1;
|
||||
size_t pos = impl.getPos() + 1;
|
||||
for (; pos < impl.rows; ++pos)
|
||||
if (!samePrev(pos))
|
||||
break;
|
||||
return pos - impl.pos;
|
||||
return pos - impl.getPos();
|
||||
}
|
||||
|
||||
/// Expects lhs_pos > 0
|
||||
|
@ -239,12 +239,12 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::addRow(SortCursor & curs
|
||||
throw Exception("Can't add a row to the group because it was not started.", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
for (auto & desc : def.columns_to_aggregate)
|
||||
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->pos);
|
||||
desc.column->insertMergeFrom(*cursor->all_columns[desc.column_number], cursor->getPos());
|
||||
|
||||
for (auto & desc : def.columns_to_simple_aggregate)
|
||||
{
|
||||
auto & col = cursor->all_columns[desc.column_number];
|
||||
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->pos, arena.get());
|
||||
desc.add_function(desc.function.get(), desc.state.data(), &col, cursor->getPos(), arena.get());
|
||||
}
|
||||
}
|
||||
|
||||
@ -334,7 +334,7 @@ IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()
|
||||
return Status(merged_data.pull());
|
||||
}
|
||||
|
||||
merged_data.startGroup(current->all_columns, current->pos);
|
||||
merged_data.startGroup(current->all_columns, current->getPos());
|
||||
}
|
||||
|
||||
merged_data.addRow(current);
|
||||
|
@ -13,11 +13,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int INCORRECT_DATA;
|
||||
}
|
||||
|
||||
CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
|
||||
const Block & header,
|
||||
size_t num_inputs,
|
||||
@ -25,9 +20,9 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
|
||||
const String & sign_column,
|
||||
bool only_positive_sign_,
|
||||
size_t max_block_size,
|
||||
Poco::Logger * log_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
bool use_average_block_sizes,
|
||||
Poco::Logger * log_)
|
||||
bool use_average_block_sizes)
|
||||
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
|
||||
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
|
||||
, sign_column_number(header.getPositionByName(sign_column))
|
||||
@ -123,7 +118,7 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
|
||||
return Status(current.impl->order);
|
||||
}
|
||||
|
||||
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
|
||||
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getPos()];
|
||||
|
||||
RowRef current_row;
|
||||
setRowRef(current_row, current);
|
||||
|
@ -33,9 +33,9 @@ public:
|
||||
const String & sign_column,
|
||||
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
|
||||
size_t max_block_size,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
bool use_average_block_sizes,
|
||||
Poco::Logger * log_);
|
||||
Poco::Logger * log_,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false);
|
||||
|
||||
Status merge() override;
|
||||
|
||||
|
@ -164,12 +164,12 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
|
||||
return Status(current.impl->order);
|
||||
}
|
||||
|
||||
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->pos);
|
||||
StringRef next_path = current->all_columns[columns_definition.path_column_num]->getDataAt(current->getPos());
|
||||
bool new_path = is_first || next_path != current_group_path;
|
||||
|
||||
is_first = false;
|
||||
|
||||
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->pos);
|
||||
time_t next_row_time = current->all_columns[columns_definition.time_column_num]->getUInt(current->getPos());
|
||||
/// Is new key before rounding.
|
||||
bool is_new_key = new_path || next_row_time != current_time;
|
||||
|
||||
@ -227,7 +227,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
|
||||
/// and for rows with same maximum version - only last row.
|
||||
if (is_new_key
|
||||
|| current->all_columns[columns_definition.version_column_num]->compareAt(
|
||||
current->pos, current_subgroup_newest_row.row_num,
|
||||
current->getPos(), current_subgroup_newest_row.row_num,
|
||||
*(*current_subgroup_newest_row.all_columns)[columns_definition.version_column_num],
|
||||
/* nan_direction_hint = */ 1) >= 0)
|
||||
{
|
||||
@ -263,7 +263,7 @@ IMergingAlgorithm::Status GraphiteRollupSortedAlgorithm::merge()
|
||||
|
||||
void GraphiteRollupSortedAlgorithm::startNextGroup(SortCursor & cursor, Graphite::RollupRule next_rule)
|
||||
{
|
||||
merged_data.startNextGroup(cursor->all_columns, cursor->pos, next_rule, columns_definition);
|
||||
merged_data.startNextGroup(cursor->all_columns, cursor->getPos(), next_rule, columns_definition);
|
||||
}
|
||||
|
||||
void GraphiteRollupSortedAlgorithm::finishCurrentGroup()
|
||||
|
@ -29,6 +29,8 @@ public:
|
||||
/// between different algorithm objects in parallel FINAL.
|
||||
bool skip_last_row = false;
|
||||
|
||||
IColumn::Permutation * permutation = nullptr;
|
||||
|
||||
void swap(Input & other)
|
||||
{
|
||||
chunk.swap(other.chunk);
|
||||
|
@ -22,7 +22,7 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Inputs inputs)
|
||||
if (!current_inputs[source_num].chunk)
|
||||
continue;
|
||||
|
||||
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num);
|
||||
cursors[source_num] = SortCursorImpl(current_inputs[source_num].chunk.getColumns(), description, source_num, current_inputs[source_num].permutation);
|
||||
}
|
||||
|
||||
queue = SortingHeap<SortCursor>(cursors);
|
||||
@ -37,7 +37,7 @@ void IMergingAlgorithmWithDelayedChunk::updateCursor(Input & input, size_t sourc
|
||||
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
|
||||
|
||||
current_input.swap(input);
|
||||
cursors[source_num].reset(current_input.chunk.getColumns(), {});
|
||||
cursors[source_num].reset(current_input.chunk.getColumns(), {}, current_input.permutation);
|
||||
|
||||
queue.push(cursors[source_num]);
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Inputs inputs)
|
||||
|
||||
source.skip_last_row = inputs[source_num].skip_last_row;
|
||||
source.chunk = chunk_allocator.alloc(inputs[source_num].chunk);
|
||||
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num);
|
||||
cursors[source_num] = SortCursorImpl(source.chunk->getColumns(), description, source_num, inputs[source_num].permutation);
|
||||
|
||||
source.chunk->all_columns = cursors[source_num].all_columns;
|
||||
source.chunk->sort_columns = cursors[source_num].sort_columns;
|
||||
@ -55,7 +55,7 @@ void IMergingAlgorithmWithSharedChunks::consume(Input & input, size_t source_num
|
||||
auto & source = sources[source_num];
|
||||
source.skip_last_row = input.skip_last_row;
|
||||
source.chunk = chunk_allocator.alloc(input.chunk);
|
||||
cursors[source_num].reset(source.chunk->getColumns(), {});
|
||||
cursors[source_num].reset(source.chunk->getColumns(), {}, input.permutation);
|
||||
|
||||
source.chunk->all_columns = cursors[source_num].all_columns;
|
||||
source.chunk->sort_columns = cursors[source_num].sort_columns;
|
||||
|
@ -139,7 +139,7 @@ IMergingAlgorithm::Status MergingSortedAlgorithm::mergeImpl(TSortingHeap & queue
|
||||
|
||||
//std::cerr << "total_merged_rows: " << total_merged_rows << ", merged_rows: " << merged_rows << "\n";
|
||||
//std::cerr << "Inserting row\n";
|
||||
merged_data.insertRow(current->all_columns, current->pos, current->rows);
|
||||
merged_data.insertRow(current->all_columns, current->getPos(), current->rows);
|
||||
|
||||
if (out_row_sources_buf)
|
||||
{
|
||||
|
@ -18,9 +18,9 @@ public:
|
||||
size_t num_inputs,
|
||||
SortDescription description_,
|
||||
size_t max_block_size,
|
||||
UInt64 limit_,
|
||||
WriteBuffer * out_row_sources_buf_,
|
||||
bool use_average_block_sizes);
|
||||
UInt64 limit_ = 0,
|
||||
WriteBuffer * out_row_sources_buf_ = nullptr,
|
||||
bool use_average_block_sizes = false);
|
||||
|
||||
void addInput();
|
||||
|
||||
|
@ -73,7 +73,7 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
|
||||
if (version_column_number == -1
|
||||
|| selected_row.empty()
|
||||
|| current->all_columns[version_column_number]->compareAt(
|
||||
current->pos, selected_row.row_num,
|
||||
current->getPos(), selected_row.row_num,
|
||||
*(*selected_row.all_columns)[version_column_number],
|
||||
/* nan_direction_hint = */ 1) >= 0)
|
||||
{
|
||||
|
@ -136,7 +136,7 @@ struct RowRef
|
||||
{
|
||||
sort_columns = cursor.impl->sort_columns.data();
|
||||
num_columns = cursor.impl->sort_columns.size();
|
||||
row_num = cursor.impl->pos;
|
||||
row_num = cursor.impl->getPos();
|
||||
}
|
||||
|
||||
static bool checkEquals(size_t size, const IColumn ** lhs, size_t lhs_row, const IColumn ** rhs, size_t rhs_row)
|
||||
@ -192,7 +192,7 @@ struct RowRefWithOwnedChunk
|
||||
void set(SortCursor & cursor, SharedChunkPtr chunk)
|
||||
{
|
||||
owned_chunk = std::move(chunk);
|
||||
row_num = cursor.impl->pos;
|
||||
row_num = cursor.impl->getPos();
|
||||
all_columns = &owned_chunk->all_columns;
|
||||
sort_columns = &owned_chunk->sort_columns;
|
||||
}
|
||||
|
@ -688,10 +688,10 @@ IMergingAlgorithm::Status SummingSortedAlgorithm::merge()
|
||||
return Status(merged_data.pull());
|
||||
}
|
||||
|
||||
merged_data.startGroup(current->all_columns, current->pos);
|
||||
merged_data.startGroup(current->all_columns, current->getPos());
|
||||
}
|
||||
else
|
||||
merged_data.addRow(current->all_columns, current->pos);
|
||||
merged_data.addRow(current->all_columns, current->getPos());
|
||||
|
||||
if (!current->isLast())
|
||||
{
|
||||
|
@ -73,7 +73,7 @@ IMergingAlgorithm::Status VersionedCollapsingAlgorithm::merge()
|
||||
|
||||
RowRef current_row;
|
||||
|
||||
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
|
||||
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->getPos()];
|
||||
|
||||
setRowRef(current_row, current);
|
||||
|
||||
|
@ -27,9 +27,9 @@ public:
|
||||
sign_column,
|
||||
only_positive_sign,
|
||||
max_block_size,
|
||||
&Poco::Logger::get("CollapsingSortedTransform"),
|
||||
out_row_sources_buf_,
|
||||
use_average_block_sizes,
|
||||
&Poco::Logger::get("CollapsingSortedTransform"))
|
||||
use_average_block_sizes)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -100,7 +100,7 @@ Chunk MergeSorter::mergeImpl(TSortingHeap & queue)
|
||||
|
||||
/// Append a row from queue.
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insertFrom(*current->all_columns[i], current->getPos());
|
||||
|
||||
++total_merged_rows;
|
||||
++merged_rows;
|
||||
|
@ -2,26 +2,113 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Interpreters/PartLog.h>
|
||||
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
|
||||
#include <Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h>
|
||||
#include <Interpreters/sortBlock.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
Block MergeTreeBlockOutputStream::getHeader() const
|
||||
{
|
||||
return metadata_snapshot->getSampleBlock();
|
||||
}
|
||||
|
||||
Block MergeTreeBlockOutputStream::mergeBlock(const Block & block)
|
||||
{
|
||||
/// Get the information needed for merging algorithms
|
||||
size_t block_size = block.rows();
|
||||
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
|
||||
SortDescription sort_description;
|
||||
size_t sort_columns_size = sort_columns.size();
|
||||
sort_description.reserve(sort_columns_size);
|
||||
|
||||
for (size_t i = 0; i < sort_columns_size; ++i)
|
||||
sort_description.emplace_back(getHeader().getPositionByName(sort_columns[i]), 1, 1);
|
||||
|
||||
auto get_merging_algorithm = [&]() -> std::shared_ptr<IMergingAlgorithm>
|
||||
{
|
||||
switch (storage.merging_params.mode)
|
||||
{
|
||||
/// There is nothing to merge in single block in ordinary MergeTree
|
||||
case MergeTreeData::MergingParams::Ordinary:
|
||||
return nullptr;
|
||||
case MergeTreeData::MergingParams::Replacing:
|
||||
return std::make_shared<ReplacingSortedAlgorithm>(
|
||||
getHeader(), 1, sort_description, storage.merging_params.version_column, block_size);
|
||||
case MergeTreeData::MergingParams::Collapsing:
|
||||
return std::make_shared<CollapsingSortedAlgorithm>(
|
||||
getHeader(), 1, sort_description, storage.merging_params.sign_column,
|
||||
false, block_size, &Poco::Logger::get("MergeTreeBlockOutputStream"));
|
||||
case MergeTreeData::MergingParams::Summing:
|
||||
return std::make_shared<SummingSortedAlgorithm>(
|
||||
getHeader(), 1, sort_description, storage.merging_params.columns_to_sum,
|
||||
metadata_snapshot->getPartitionKey().column_names, block_size);
|
||||
case MergeTreeData::MergingParams::Aggregating:
|
||||
return std::make_shared<AggregatingSortedAlgorithm>(getHeader(), 1, sort_description, block_size);
|
||||
case MergeTreeData::MergingParams::VersionedCollapsing:
|
||||
return std::make_shared<VersionedCollapsingAlgorithm>(
|
||||
getHeader(), 1, sort_description, storage.merging_params.sign_column, block_size);
|
||||
case MergeTreeData::MergingParams::Graphite:
|
||||
return std::make_shared<GraphiteRollupSortedAlgorithm>(
|
||||
getHeader(), 1, sort_description, block_size, storage.merging_params.graphite_params, time(nullptr));
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
};
|
||||
|
||||
auto merging_algorithm = get_merging_algorithm();
|
||||
if (!merging_algorithm)
|
||||
return block;
|
||||
|
||||
/// Merging algorithms works with inputs containing sorted chunks, so we need to get a sorted permutation
|
||||
/// of the block, convert the block to a chunk and construct an input from it
|
||||
IColumn::Permutation permutation;
|
||||
stableGetPermutation(block, sort_description, permutation);
|
||||
|
||||
Chunk chunk(block.getColumns(), block_size);
|
||||
|
||||
IMergingAlgorithm::Input input;
|
||||
input.set(std::move(chunk));
|
||||
input.permutation = &permutation;
|
||||
|
||||
IMergingAlgorithm::Inputs inputs;
|
||||
inputs.push_back(std::move(input));
|
||||
merging_algorithm->initialize(std::move(inputs));
|
||||
|
||||
IMergingAlgorithm::Status status = merging_algorithm->merge();
|
||||
while (!status.is_finished)
|
||||
status = merging_algorithm->merge();
|
||||
|
||||
return block.cloneWithColumns(status.chunk.getColumns());
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBlockOutputStream::write(const Block & block)
|
||||
{
|
||||
storage.delayInsertOrThrowIfNeeded();
|
||||
|
||||
auto settings = context.getSettings();
|
||||
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
||||
for (auto & current_block : part_blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
|
||||
if (settings.optimize_on_insert)
|
||||
current_block.block = mergeBlock(current_block.block);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot);
|
||||
storage.renameTempPartAndAdd(part, &storage.increment);
|
||||
|
||||
|
@ -14,20 +14,23 @@ class StorageMergeTree;
|
||||
class MergeTreeBlockOutputStream : public IBlockOutputStream
|
||||
{
|
||||
public:
|
||||
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_)
|
||||
MergeTreeBlockOutputStream(StorageMergeTree & storage_, const StorageMetadataPtr metadata_snapshot_, size_t max_parts_per_block_, const Context & context_)
|
||||
: storage(storage_)
|
||||
, metadata_snapshot(metadata_snapshot_)
|
||||
, max_parts_per_block(max_parts_per_block_)
|
||||
, context(context_)
|
||||
{
|
||||
}
|
||||
|
||||
Block getHeader() const override;
|
||||
void write(const Block & block) override;
|
||||
Block mergeBlock(const Block & block);
|
||||
|
||||
private:
|
||||
StorageMergeTree & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
size_t max_parts_per_block;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -232,7 +232,7 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & /*query*/, const Sto
|
||||
|
||||
const auto & settings = context.getSettingsRef();
|
||||
return std::make_shared<MergeTreeBlockOutputStream>(
|
||||
*this, metadata_snapshot, settings.max_partitions_per_insert_block);
|
||||
*this, metadata_snapshot, settings.max_partitions_per_insert_block, context);
|
||||
}
|
||||
|
||||
void StorageMergeTree::checkTableCanBeDropped() const
|
||||
|
Loading…
Reference in New Issue
Block a user