More parallel execution for queries with FINAL (#36396)

This commit is contained in:
Nikita Taranov 2022-06-15 12:44:20 +02:00 committed by GitHub
parent 5c4a5f520e
commit c8afeafe0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 485 additions and 280 deletions

View File

@ -1,5 +1,4 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB
{
@ -181,68 +180,4 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Ready;
}
static void filterChunk(IMergingAlgorithm::Input & input, size_t selector_position)
{
if (!input.chunk.getChunkInfo())
throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto * chunk_info = typeid_cast<const SelectorInfo *>(input.chunk.getChunkInfo().get());
if (!chunk_info)
throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto & selector = chunk_info->selector;
IColumn::Filter filter;
filter.resize_fill(selector.size());
size_t num_rows = input.chunk.getNumRows();
auto columns = input.chunk.detachColumns();
size_t num_result_rows = 0;
for (size_t row = 0; row < num_rows; ++row)
{
if (selector[row] == selector_position)
{
++num_result_rows;
filter[row] = 1;
}
}
if (!filter.empty() && filter.back() == 0)
{
filter.back() = 1;
++num_result_rows;
input.skip_last_row = true;
}
for (auto & column : columns)
column = column->filter(filter, num_result_rows);
input.chunk.clear();
input.chunk.setColumns(std::move(columns), num_result_rows);
}
void IMergingTransformBase::filterChunks()
{
if (state.selector_position < 0)
return;
if (!state.init_chunks.empty())
{
for (size_t i = 0; i < input_states.size(); ++i)
{
auto & input = state.init_chunks[i];
if (!input.chunk)
continue;
filterChunk(input, state.selector_position);
}
}
if (state.has_input)
filterChunk(state.input_chunk, state.selector_position);
}
}

View File

@ -28,17 +28,10 @@ public:
Status prepare() override;
/// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h).
/// Columns will be filtered, keep only rows labeled with this position.
/// It is used in parallel final.
void setSelectorPosition(size_t position) { state.selector_position = position; }
protected:
virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false.
virtual void onFinish() {} /// Is called when all data is processed.
void filterChunks(); /// Filter chunks if selector position was set. For parallel final.
/// Processor state.
struct State
{
@ -50,7 +43,6 @@ protected:
size_t next_input_to_read = 0;
IMergingAlgorithm::Inputs init_chunks;
ssize_t selector_position = -1;
};
State state;
@ -92,8 +84,6 @@ public:
void work() override
{
filterChunks();
if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks));

View File

@ -86,6 +86,9 @@ static void doDescribeProcessor(const IProcessor & processor, size_t count, IQue
doDescribeHeader(*last_header, num_equal_headers, settings);
}
if (!processor.getDescription().empty())
settings.out << String(settings.offset, settings.indent_char) << "Description: " << processor.getDescription() << '\n';
settings.offset += settings.indent;
}

View File

@ -0,0 +1,274 @@
#include <algorithm>
#include <memory>
#include <numeric>
#include <queue>
#include <unordered_map>
#include <vector>
#include <Core/Field.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/Transforms/FilterSortedStreamByRange.h>
#include <Storages/MergeTree/RangesInDataPart.h>
using namespace DB;
namespace
{
using Value = std::vector<Field>;
std::string toString(const Value & value)
{
return fmt::format("({})", fmt::join(value, ", "));
}
/// Adaptor to access PK values from index.
class IndexAccess
{
public:
explicit IndexAccess(const RangesInDataParts & parts_) : parts(parts_) { }
Value getValue(size_t part_idx, size_t mark) const
{
const auto & index = parts[part_idx].data_part->index;
Value value(index.size());
for (size_t i = 0; i < value.size(); ++i)
index[i]->get(mark, value[i]);
return value;
}
size_t getMarkRows(size_t part_idx, size_t mark) const { return parts[part_idx].data_part->index_granularity.getMarkRows(mark); }
size_t getTotalRowCount() const
{
size_t total = 0;
for (const auto & part : parts)
total += part.getRowsCount();
return total;
}
private:
const RangesInDataParts & parts;
};
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// Will try to produce exactly max_layer layers but may return less if data is distributed in not a very parallelizable way.
std::pair<std::vector<Value>, std::vector<RangesInDataParts>> split(RangesInDataParts parts, size_t max_layers)
{
// We will advance the iterator pointing to the mark with the smallest PK value until there will be not less than rows_per_layer rows in the current layer (roughly speaking).
// Then we choose the last observed value as the new border, so the current layer will consists of granules with values greater than the previous mark and less or equal
// than the new border.
struct PartsRangesIterator
{
struct RangeInDataPart : MarkRange
{
size_t part_idx;
};
enum class EventType
{
RangeBeginning,
RangeEnding,
};
bool operator<(const PartsRangesIterator & other) const { return std::tie(value, event) > std::tie(other.value, other.event); }
Value value;
RangeInDataPart range;
EventType event;
};
const auto index_access = std::make_unique<IndexAccess>(parts);
std::priority_queue<PartsRangesIterator> parts_ranges_queue;
for (size_t part_idx = 0; part_idx < parts.size(); ++part_idx)
{
for (const auto & range : parts[part_idx].ranges)
{
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.begin), {range, part_idx}, PartsRangesIterator::EventType::RangeBeginning});
const auto & index_granularity = parts[part_idx].data_part->index_granularity;
if (index_granularity.hasFinalMark() && range.end + 1 == index_granularity.getMarksCount())
parts_ranges_queue.push(
{index_access->getValue(part_idx, range.end), {range, part_idx}, PartsRangesIterator::EventType::RangeEnding});
}
}
/// The beginning of currently started (but not yet finished) range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_begin;
/// The current ending of a range of marks of a part in the current layer.
std::unordered_map<size_t, size_t> current_part_range_end;
/// Determine borders between layers.
std::vector<Value> borders;
std::vector<RangesInDataParts> result_layers;
const size_t rows_per_layer = std::max<size_t>(index_access->getTotalRowCount() / max_layers, 1);
while (!parts_ranges_queue.empty())
{
// New layer should include last granules of still open ranges from the previous layer,
// because they may already contain values greater than the last border.
size_t rows_in_current_layer = 0;
size_t marks_in_current_layer = 0;
// Intersection between the current and next layers is just the last observed marks of each still open part range. Ratio is empirical.
auto layers_intersection_is_too_big = [&]()
{
const auto intersected_parts = current_part_range_end.size();
return marks_in_current_layer < intersected_parts * 2;
};
result_layers.emplace_back();
while (rows_in_current_layer < rows_per_layer || layers_intersection_is_too_big() || result_layers.size() == max_layers)
{
// We're advancing iterators until a new value showed up.
Value last_value;
while (!parts_ranges_queue.empty() && (last_value.empty() || last_value == parts_ranges_queue.top().value))
{
auto current = parts_ranges_queue.top();
parts_ranges_queue.pop();
const auto part_idx = current.range.part_idx;
if (current.event == PartsRangesIterator::EventType::RangeEnding)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], current.range.end}});
current_part_range_begin.erase(part_idx);
current_part_range_end.erase(part_idx);
continue;
}
last_value = std::move(current.value);
rows_in_current_layer += index_access->getMarkRows(part_idx, current.range.begin);
marks_in_current_layer++;
current_part_range_begin.try_emplace(part_idx, current.range.begin);
current_part_range_end[part_idx] = current.range.begin;
if (current.range.begin + 1 < current.range.end)
{
current.range.begin++;
current.value = index_access->getValue(part_idx, current.range.begin);
parts_ranges_queue.push(std::move(current));
}
}
if (parts_ranges_queue.empty())
break;
if (rows_in_current_layer >= rows_per_layer && !layers_intersection_is_too_big() && result_layers.size() < max_layers)
borders.push_back(last_value);
}
for (const auto & [part_idx, last_mark] : current_part_range_end)
{
result_layers.back().emplace_back(
parts[part_idx].data_part,
parts[part_idx].part_index_in_query,
MarkRanges{{current_part_range_begin[part_idx], last_mark + 1}});
current_part_range_begin[part_idx] = current_part_range_end[part_idx];
}
}
for (auto & layer : result_layers)
{
std::stable_sort(
layer.begin(),
layer.end(),
[](const auto & lhs, const auto & rhs) { return lhs.part_index_in_query < rhs.part_index_in_query; });
}
return std::make_pair(std::move(borders), std::move(result_layers));
}
/// Will return borders.size()+1 filters in total, i-th filter will accept rows with PK values within the range [borders[i-1], borders[i]).
std::vector<ASTPtr> buildFilters(const KeyDescription & primary_key, const std::vector<Value> & borders)
{
auto add_and_condition = [&](ASTPtr & result, const ASTPtr & foo) { result = !result ? foo : makeASTFunction("and", result, foo); };
/// Produces ASTPtr to predicate (pk_col0, pk_col1, ... , pk_colN) > (value[0], value[1], ... , value[N])
auto lexicographically_greater = [&](const Value & value)
{
// PK may contain functions of the table columns, so we need the actual PK AST with all expressions it contains.
ASTPtr pk_columns_as_tuple = makeASTFunction("tuple", primary_key.expression_list_ast->children);
ASTPtr value_ast = std::make_shared<ASTExpressionList>();
for (size_t i = 0; i < value.size(); ++i)
{
const auto & types = primary_key.data_types;
ASTPtr component_ast = std::make_shared<ASTLiteral>(value[i]);
// Values of some types (e.g. Date, DateTime) are stored in columns as numbers and we get them as just numbers from the index.
// So we need an explicit Cast for them.
if (isColumnedAsNumber(types.at(i)->getTypeId()) && !isNumber(types.at(i)->getTypeId()))
component_ast = makeASTFunction("cast", std::move(component_ast), std::make_shared<ASTLiteral>(types.at(i)->getName()));
value_ast->children.push_back(std::move(component_ast));
}
ASTPtr value_as_tuple = makeASTFunction("tuple", value_ast->children);
return makeASTFunction("greater", pk_columns_as_tuple, value_as_tuple);
};
std::vector<ASTPtr> filters(borders.size() + 1);
for (size_t layer = 0; layer <= borders.size(); ++layer)
{
if (layer > 0)
add_and_condition(filters[layer], lexicographically_greater(borders[layer - 1]));
if (layer < borders.size())
add_and_condition(filters[layer], makeASTFunction("not", lexicographically_greater(borders[layer])));
}
return filters;
}
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Pipes buildPipesForReadingByPKRanges(
const KeyDescription & primary_key,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter)
{
if (max_layers <= 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "max_layer should be greater than 1.");
auto && [borders, result_layers] = split(std::move(parts), max_layers);
auto filters = buildFilters(primary_key, borders);
Pipes pipes(result_layers.size());
for (size_t i = 0; i < result_layers.size(); ++i)
{
pipes[i] = reading_step_getter(std::move(result_layers[i]));
auto & filter_function = filters[i];
if (!filter_function)
continue;
auto syntax_result = TreeRewriter(context).analyze(filter_function, primary_key.expression->getRequiredColumnsWithTypes());
auto actions = ExpressionAnalyzer(filter_function, syntax_result, context).getActionsDAG(false);
ExpressionActionsPtr expression_actions = std::make_shared<ExpressionActions>(std::move(actions));
auto description = fmt::format(
"filter values in [{}, {})", i ? ::toString(borders[i - 1]) : "-inf", i < borders.size() ? ::toString(borders[i]) : "+inf");
auto pk_expression = std::make_shared<ExpressionActions>(primary_key.expression->getActionsDAG().clone());
pipes[i].addSimpleTransform([pk_expression](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, pk_expression); });
pipes[i].addSimpleTransform(
[&](const Block & header)
{
auto step = std::make_shared<FilterSortedStreamByRange>(header, expression_actions, filter_function->getColumnName(), true);
step->setDescription(description);
return step;
});
}
return pipes;
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <functional>
#include <Interpreters/Context_fwd.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/RangesInDataPart.h>
namespace DB
{
using ReadingInOrderStepGetter = std::function<Pipe(RangesInDataParts)>;
/// Splits parts into layers, each layer will contain parts subranges with PK values from its own range.
/// A separate pipe will be constructed for each layer with a reading step (provided by the reading_step_getter) and a filter for this layer's range of PK values.
/// Will try to produce exactly max_layer pipes but may return less if data is distributed in not a very parallelizable way.
Pipes buildPipesForReadingByPKRanges(
const KeyDescription & primary_key,
RangesInDataParts parts,
size_t max_layers,
ContextPtr context,
ReadingInOrderStepGetter && reading_step_getter);
}

View File

@ -1,14 +1,16 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <numeric>
#include <queue>
#include <stdexcept>
#include <IO/Operators.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
@ -16,17 +18,22 @@
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeInOrderSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReverseSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeThreadSelectProcessor.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/VirtualColumnUtils.h>
#include <IO/Operators.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Common/logger_useful.h>
#include <base/sort.h>
#include <Poco/Logger.h>
#include <Common/JSONBuilder.h>
namespace ProfileEvents
@ -560,7 +567,6 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
static void addMergingFinal(
Pipe & pipe,
size_t num_output_streams,
const SortDescription & sort_description,
MergeTreeData::MergingParams merging_params,
Names partition_key_columns,
@ -607,56 +613,7 @@ static void addMergingFinal(
__builtin_unreachable();
};
if (num_output_streams <= 1 || sort_description.empty())
{
pipe.addTransform(get_merging_processor());
return;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (const auto & desc : sort_description)
key_columns.push_back(header.getPositionByName(desc.column_name));
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_output_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors transforms;
std::vector<OutputPorts::iterator> output_ports;
transforms.reserve(ports.size() + num_output_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
{
auto copier = std::make_shared<CopyTransform>(header, num_output_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
transforms.emplace_back(std::move(copier));
}
for (size_t i = 0; i < num_output_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
transforms.emplace_back(std::move(merge));
}
return transforms;
});
pipe.addTransform(get_merging_processor());
}
@ -710,8 +667,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
Pipe pipe;
Pipes pipes;
{
RangesInDataParts new_parts;
@ -738,21 +694,39 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (new_parts.empty())
continue;
pipe = read(std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder,
num_streams, 0, info.use_uncompressed_cache);
if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
{
// Let's split parts into layers to ensure data parallelism of final.
auto reading_step_getter = [this, &column_names, &info](auto parts)
{
return read(
std::move(parts),
column_names,
ReadFromMergeTree::ReadType::InOrder,
1 /* num_streams */,
0 /* min_marks_for_concurrent_read */,
info.use_uncompressed_cache);
};
pipes = buildPipesForReadingByPKRanges(
metadata_for_reading->getPrimaryKey(), std::move(new_parts), num_streams, context, std::move(reading_step_getter));
}
else
{
pipes.emplace_back(read(
std::move(new_parts), column_names, ReadFromMergeTree::ReadType::InOrder, num_streams, 0, info.use_uncompressed_cache));
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe.getHeader());
out_projection = createProjection(pipes.front().getHeader());
}
auto sorting_expr = std::make_shared<ExpressionActions>(
metadata_for_reading->getSortingKey().expression->getActionsDAG().clone());
pipe.addSimpleTransform([sorting_expr](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, sorting_expr);
});
for (auto & pipe : pipes)
pipe.addSimpleTransform([sorting_expr](const Block & header)
{ return std::make_shared<ExpressionTransform>(header, sorting_expr); });
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
@ -760,7 +734,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
partition_pipes.emplace_back(std::move(pipe));
partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
continue;
}
@ -777,21 +751,21 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
addMergingFinal(
pipe,
std::min<size_t>(num_streams, settings.max_final_threads),
sort_description, data.merging_params, partition_key_columns, max_block_size);
for (auto & pipe : pipes)
addMergingFinal(
pipe,
sort_description,
data.merging_params,
partition_key_columns,
max_block_size);
partition_pipes.emplace_back(std::move(pipe));
partition_pipes.emplace_back(Pipe::unitePipes(std::move(pipes)));
}
if (!lonely_parts.empty())
{
RangesInDataParts new_parts;
size_t num_streams_for_lonely_parts = num_streams * lonely_parts.size();
const size_t min_marks_for_concurrent_read = MergeTreeDataSelectExecutor::minMarksForConcurrentRead(
settings.merge_tree_min_rows_for_concurrent_read,
settings.merge_tree_min_bytes_for_concurrent_read,

View File

@ -1,76 +0,0 @@
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
AddingSelectorTransform::AddingSelectorTransform(
const Block & header, size_t num_outputs_, ColumnNumbers key_columns_)
: ISimpleTransform(header, header, false)
, num_outputs(num_outputs_)
, key_columns(std::move(key_columns_))
, hash(0)
{
setInputNotNeededAfterRead(false);
if (num_outputs <= 1)
throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs),
ErrorCodes::LOGICAL_ERROR);
if (key_columns.empty())
throw Exception("SplittingByHashTransform cannot split by empty set of key columns",
ErrorCodes::LOGICAL_ERROR);
for (auto & column : key_columns)
if (column >= header.columns())
throw Exception("Invalid column number: " + std::to_string(column) +
". There is only " + std::to_string(header.columns()) + " columns in header",
ErrorCodes::LOGICAL_ERROR);
}
static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
hash.reset(num_rows);
for (const auto & column_number : key_columns)
columns[column_number]->updateWeakHash32(hash);
}
static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs)
{
/// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i.
const auto & hash_data = hash.getData();
size_t num_rows = hash_data.size();
IColumn::Selector selector(num_rows);
for (size_t row = 0; row < num_rows; ++row)
{
selector[row] = hash_data[row]; /// [0, 2^32)
selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values.
selector[row] >>= 32u; /// [0, num_outputs)
}
return selector;
}
void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk)
{
auto chunk_info = std::make_shared<SelectorInfo>();
calculateWeakHash32(input_chunk, key_columns, hash);
chunk_info->selector = fillSelector(hash, num_outputs);
input_chunk.swap(output_chunk);
output_chunk.setChunkInfo(std::move(chunk_info));
}
}

View File

@ -1,26 +0,0 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
#include <Common/WeakHash.h>
namespace DB
{
/// Add IColumn::Selector to chunk (see SelectorInfo.h).
/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT).
class AddingSelectorTransform : public ISimpleTransform
{
public:
AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_);
String getName() const override { return "AddingSelector"; }
void transform(Chunk & input_chunk, Chunk & output_chunk) override;
private:
size_t num_outputs;
ColumnNumbers key_columns;
WeakHash32 hash;
};
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <Interpreters/ExpressionActions.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
namespace DB
{
/// Could be used when the predicate given by expression_ is true only on one continuous range of values and input is monotonous by that value.
/// The following optimization applies: when a new chunk of data comes in we firstly execute the expression_ only on the first and the last row.
/// If it evaluates to true on both rows then the whole chunk is immediately passed to further steps.
/// Otherwise, we apply the expression_ to all rows.
class FilterSortedStreamByRange : public ISimpleTransform
{
public:
FilterSortedStreamByRange(
const Block & header_,
ExpressionActionsPtr expression_,
String filter_column_name_,
bool remove_filter_column_,
bool on_totals_ = false)
: ISimpleTransform(
header_,
FilterTransform::transformHeader(header_, expression_->getActionsDAG(), filter_column_name_, remove_filter_column_),
true)
, filter_transform(header_, expression_, filter_column_name_, remove_filter_column_, on_totals_)
{
}
String getName() const override { return "FilterSortedStreamByRange"; }
void transform(Chunk & chunk) override
{
int rows_before_filtration = chunk.getNumRows();
if (rows_before_filtration < 2)
{
filter_transform.transform(chunk);
return;
}
// Evaluate expression on just the first and the last row.
// If both of them satisfies conditions, than skip calculation for all the rows in between.
auto quick_check_columns = chunk.cloneEmptyColumns();
auto src_columns = chunk.detachColumns();
for (auto row : {0, rows_before_filtration - 1})
for (size_t col = 0; col < quick_check_columns.size(); ++col)
quick_check_columns[col]->insertFrom(*src_columns[col].get(), row);
chunk.setColumns(std::move(quick_check_columns), 2);
filter_transform.transform(chunk);
const bool all_rows_will_pass_filter = chunk.getNumRows() == 2;
chunk.setColumns(std::move(src_columns), rows_before_filtration);
// Not all rows satisfy conditions.
if (!all_rows_will_pass_filter)
filter_transform.transform(chunk);
}
private:
FilterTransform filter_transform;
};
}

View File

@ -32,7 +32,6 @@ public:
Status prepare() override;
protected:
void transform(Chunk & chunk) override;
private:

View File

@ -1,14 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
#include <Common/PODArray.h>
namespace DB
{
/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform.
struct SelectorInfo : public ChunkInfo
{
IColumn::Selector selector;
};
}

View File

@ -28,7 +28,10 @@ void printPipeline(const Processors & processors, const Statuses & statuses, Wri
/// Nodes // TODO quoting and escaping
for (const auto & processor : processors)
{
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << processor->getDescription();
auto description = processor->getDescription();
if (!description.empty())
description = ": " + description;
out << " n" << get_proc_id(*processor) << "[label=\"" << processor->getName() << description;
if (statuses_iter != statuses.end())
{

View File

@ -18,6 +18,7 @@
<value>collapsing_final_16p_str_keys_rnd</value>
<value>collapsing_final_1024p_ord</value>
<value>collapsing_final_1024p_rnd</value>
<value>collapsing_final_1p_ord</value>
</values>
</substitution>
</substitutions>
@ -30,6 +31,7 @@
<create_query>create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 </create_query>
<create_query>create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) </create_query>
<create_query>create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 </create_query>
<create_query>create table collapsing_final_1p_ord (key1 UInt64, key2 UInt64, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2)</create_query>
<!-- 16 parts, 8192 * 1024 rows each -->
<fill_query>insert into collapsing_final_16p_ord select number, number, 1, number from numbers_mt(8388608) </fill_query>
@ -43,6 +45,9 @@
<fill_query>insert into collapsing_final_1024p_ord select number, 1, number from numbers_mt(16777216) </fill_query>
<fill_query>insert into collapsing_final_1024p_rnd select number, 1, number from numbers_mt(16777216) </fill_query>
<!-- 1 big part of 5e7 rows -->
<fill_query>insert into collapsing_final_1p_ord select number, number + 1, 1, number from numbers_mt(5e7)</fill_query>
<fill_query>optimize table {collapsing} final</fill_query>
<query>SELECT count() FROM {collapsing} final</query>

View File

@ -16,8 +16,15 @@ ExpressionTransform
ExpressionTransform × 2
(ReadFromMergeTree)
ExpressionTransform × 2
ReplacingSorted × 2 2 → 1
Copy × 2 1 → 2
AddingSelector × 2
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1
ReplacingSorted
ExpressionTransform
FilterSortedStreamByRange
Description: filter values in [(5), +inf)
ExpressionTransform
MergeTreeInOrder 0 → 1
ReplacingSorted 2 → 1
ExpressionTransform × 2
FilterSortedStreamByRange × 2
Description: filter values in [-inf, (5))
ExpressionTransform × 2
MergeTreeInOrder × 2 0 → 1

View File

@ -0,0 +1,9 @@
2
2
3
5
8
8
8
8
8

View File

@ -0,0 +1,31 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
test_random_values() {
layers=$1
$CLICKHOUSE_CLIENT -n -q "
create table tbl_8parts_${layers}granules_rnd (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 % 8);
insert into tbl_8parts_${layers}granules_rnd select number, 1 from numbers_mt($((layers * 8 * 8192)));
explain pipeline select * from tbl_8parts_${layers}granules_rnd final settings max_threads = 16;" 2>&1 |
grep -c "CollapsingSortedTransform"
}
for layers in 2 3 5 8; do
test_random_values $layers
done;
test_sequential_values() {
layers=$1
$CLICKHOUSE_CLIENT -n -q "
create table tbl_8parts_${layers}granules_seq (key1 UInt32, sign Int8) engine = CollapsingMergeTree(sign) order by (key1) partition by (key1 / $((layers * 8192)))::UInt64;
insert into tbl_8parts_${layers}granules_seq select number, 1 from numbers_mt($((layers * 8 * 8192)));
explain pipeline select * from tbl_8parts_${layers}granules_seq final settings max_threads = 8;" 2>&1 |
grep -c "CollapsingSortedTransform"
}
for layers in 2 3 5 8 16; do
test_sequential_values $layers
done;