Better semantic of sharing columns: development [#CLICKHOUSE-2].

This commit is contained in:
Alexey Milovidov 2017-12-15 03:01:59 +03:00
parent c2985857f8
commit 8926af2ced
21 changed files with 199 additions and 213 deletions

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d
Subproject commit 81d4fdfcb887f89b0f7b1e9b503cbe63e6d8366b

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
FilterDescription::FilterDescription(const IColumn & column)
ConstantFilterDescription::ConstantFilterDescription(const IColumn & column)
{
if (column.onlyNull())
{
@ -31,7 +31,11 @@ FilterDescription::FilterDescription(const IColumn & column)
always_false = true;
return;
}
}
FilterDescription::FilterDescription(const IColumn & column)
{
if (const ColumnUInt8 * concrete_column = typeid_cast<const ColumnUInt8 *>(&column))
{
data = &concrete_column->getData();

View File

@ -6,15 +6,27 @@
namespace DB
{
/// Obtain a filter from Column, that may have type: UInt8, Nullable(UInt8), Const(UInt8), Const(Nullable(UInt8)).
struct FilterDescription
/// Support methods for implementation of WHERE, PREWHERE and HAVING.
/// Analyze if the column for filter is constant thus filter is always false or always true.
struct ConstantFilterDescription
{
bool always_false = false;
bool always_true = false;
ConstantFilterDescription() {}
explicit ConstantFilterDescription(const IColumn & column);
};
/// Obtain a filter from non constant Column, that may have type: UInt8, Nullable(UInt8).
struct FilterDescription
{
const IColumn::Filter * data = nullptr; /// Pointer to filter when it is not always true or always false.
ColumnPtr data_holder; /// If new column was generated, it will be owned by holder.
FilterDescription(const IColumn & column);
explicit FilterDescription(const IColumn & column);
};
}

View File

@ -6,6 +6,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Block AggregatingSortedBlockInputStream::readImpl()
{
@ -15,10 +20,14 @@ Block AggregatingSortedBlockInputStream::readImpl()
if (children.size() == 1)
return children[0]->read();
Block merged_block;
Block header;
MutableColumnRawPtrs merged_columns;
init(merged_block, merged_columns);
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return Block();
@ -30,7 +39,7 @@ Block AggregatingSortedBlockInputStream::readImpl()
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithTypeAndName & column = merged_block.safeGetByPosition(i);
ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// We leave only states of aggregate functions.
if (!startsWith(column.type->getName(), "AggregateFunction"))
@ -59,24 +68,19 @@ Block AggregatingSortedBlockInputStream::readImpl()
for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i)
columns_to_aggregate[i] = typeid_cast<ColumnAggregateFunction *>(merged_columns[column_numbers_to_aggregate[i]]);
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
}
template <typename TSortCursor>
void AggregatingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
/// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size`
while (!queue.empty())
{
TSortCursor current = queue.top();
SortCursor current = queue.top();
setPrimaryKeyRef(next_key, current);
@ -133,8 +137,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_colu
}
template <typename TSortCursor>
void AggregatingSortedBlockInputStream::addRow(TSortCursor & cursor)
void AggregatingSortedBlockInputStream::addRow(SortCursor & cursor)
{
for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i)
{

View File

@ -70,12 +70,10 @@ private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/** Extract all states of aggregate functions and merge them with the current group.
*/
template <typename TSortCursor>
void addRow(TSortCursor & cursor);
};

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
@ -39,14 +40,14 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
}
void CollapsingSortedBlockInputStream::insertRows(MutableColumnRawPtrs & merged_columns, size_t & merged_rows, bool last_in_stream)
void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream)
{
if (count_positive == 0 && count_negative == 0)
return;
if (count_positive == count_negative && !last_is_positive)
{
/// If all the rows in the input streams collapsed, we still want to give at least one block in the result.
/// If all the rows in the input streams was collapsed, we still want to give at least one block in the result.
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
LOG_INFO(log, "All rows collapsed");
@ -97,25 +98,29 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumnRawPtrs & merged_
if (out_row_sources_buf)
out_row_sources_buf->write(
reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
}
Block CollapsingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
return {};
if (children.size() == 1)
return children[0]->read();
Block merged_block;
MutableColumnRawPtrs merged_columns;
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
return {};
/// Additional initialization.
if (first_negative.empty())
@ -124,27 +129,22 @@ Block CollapsingSortedBlockInputStream::readImpl()
last_negative.columns.resize(num_columns);
last_positive.columns.resize(num_columns);
sign_column_number = merged_block.getPositionByName(sign_column);
sign_column_number = header.getPositionByName(sign_column);
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
}
template <typename TSortCursor>
void CollapsingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
/// Take rows in correct order and put them into `merged_block` until the rows no more than `max_block_size`
/// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size`
for (; !queue.empty(); ++current_pos)
{
TSortCursor current = queue.top();
SortCursor current = queue.top();
if (current_key.empty())
{

View File

@ -90,11 +90,10 @@ private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursors and calls to virtual functions.
*/
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result rows for the current primary key.
void insertRows(MutableColumnRawPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false);
void insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream = false);
void reportIncorrectData();
};

View File

@ -88,9 +88,10 @@ Block ColumnGathererStream::readImpl()
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();
MutableColumnPtr output_column = column.column->cloneEmpty();
output_column->gather(*this);
return output_block.cloneWithColumns(MutableColumns{std::move(output_column)});
MutableColumns output_columns(1);
output_columns[0] = column.column->cloneEmpty();
output_columns[0]->gather(*this);
return output_block.cloneWithColumns(std::move(output_columns));
}

View File

@ -79,9 +79,9 @@ Block FilterBlockInputStream::readImpl()
ColumnPtr column = sample_block.safeGetByPosition(filter_column_in_sample_block).column;
if (column)
analyzeConstantFilter(*column, filter_always_false, filter_always_true);
constant_filter_description = ConstantFilterDescription(*column);
if (filter_always_false)
if (constant_filter_description.always_false)
return res;
}
@ -94,7 +94,7 @@ Block FilterBlockInputStream::readImpl()
expression->execute(res);
if (filter_always_true)
if (constant_filter_description.always_true)
return res;
/// Find the current position of the filter column in the block.
@ -104,49 +104,23 @@ Block FilterBlockInputStream::readImpl()
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
auto filter_and_holder = getFilterFromColumn(*column);
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column);
IColumn * observed_column = column.get();
bool is_nullable_column = observed_column->isColumnNullable();
if (is_nullable_column)
observed_column = &static_cast<const ColumnNullable &>(*column.get()).getNestedColumn();
ColumnUInt8 * column_vec = typeid_cast<ColumnUInt8 *>(observed_column);
if (!column_vec)
if (constant_filter_description.always_false)
{
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
analyzeConstantFilter(*observed_column, filter_always_false, filter_always_true);
if (filter_always_false)
{
res.clear();
return res;
}
if (filter_always_true)
return res;
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8 or Nullable variants of them.",
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
res.clear();
return res;
}
IColumn::Filter & filter = column_vec->getData();
if (constant_filter_description.always_true)
return res;
if (is_nullable_column)
{
/// Exclude the entries of the filter column that actually are NULL values.
const NullMap & null_map = static_cast<ColumnNullable &>(*column).getNullMapData();
IColumn::Filter & filter = column_vec->getData();
for (size_t i = 0, size = null_map.size(); i < size; ++i)
if (null_map[i])
filter[i] = 0;
}
FilterDescription filter_and_holder(*column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
@ -168,12 +142,12 @@ Block FilterBlockInputStream::readImpl()
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(filter, -1);
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(filter);
filtered_rows = countBytesInFilter(*filter_and_holder.data);
}
/// If the current block is completely filtered out, let's move on to the next one.
@ -181,7 +155,7 @@ Block FilterBlockInputStream::readImpl()
continue;
/// If all the rows pass through the filter.
if (filtered_rows == filter.size())
if (filtered_rows == filter_and_holder.data->size())
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
@ -211,7 +185,7 @@ Block FilterBlockInputStream::readImpl()
if (current_column.column->isColumnConst())
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(filter, -1);
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return res;

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Columns/FilterDescription.h>
namespace DB
@ -36,8 +37,8 @@ private:
String filter_column_name;
bool is_first = true;
bool filter_always_true = false;
bool filter_always_false = false;
ConstantFilterDescription constant_filter_description;
};
}

View File

@ -63,7 +63,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
}
else if (name == "RowBinary")
{
return wrap_row_stream(std::make_shared<BinaryRowInputStream>(buf));
return wrap_row_stream(std::make_shared<BinaryRowInputStream>(buf, sample));
}
else if (name == "TabSeparated" || name == "TSV") /// TSV is a synonym/alias for the original TabSeparated format
{
@ -79,7 +79,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu
}
else if (name == "Values")
{
return wrap_row_stream(std::make_shared<ValuesRowInputStream>(buf, context, settings.input_format_values_interpret_expressions));
return wrap_row_stream(std::make_shared<ValuesRowInputStream>(buf, sample, context, settings.input_format_values_interpret_expressions));
}
else if (name == "CSV")
{

View File

@ -8,6 +8,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int LOGICAL_ERROR;
}
@ -67,10 +68,14 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (finished)
return Block();
Block merged_block;
MutableColumnRawPtrs merged_columns;
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
@ -85,10 +90,10 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Memoize column numbers in block.
path_column_num = merged_block.getPositionByName(params.path_column_name);
time_column_num = merged_block.getPositionByName(params.time_column_name);
value_column_num = merged_block.getPositionByName(params.value_column_name);
version_column_num = merged_block.getPositionByName(params.version_column_name);
path_column_num = header.getPositionByName(params.path_column_name);
time_column_num = header.getPositionByName(params.time_column_name);
value_column_num = header.getPositionByName(params.value_column_name);
version_column_num = header.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
@ -98,23 +103,18 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
current_selected_row.columns.resize(num_columns);
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
}
template <typename TSortCursor>
void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
const DateLUTImpl & date_lut = DateLUT::instance();
size_t started_rows = 0; /// Number of times startNextRow() has been called.
/// Take rows in needed order and put them into `merged_block` until we get `max_block_size` rows.
/// Take rows in needed order and put them into `merged_columns` until we get `max_block_size` rows.
///
/// Variables starting with current_* refer to the rows previously popped from the queue that will
/// contribute towards current output row.
@ -122,7 +122,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_c
while (!queue.empty())
{
TSortCursor next_cursor = queue.top();
SortCursor next_cursor = queue.top();
StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos);
bool path_differs = is_first || next_path != current_path;
@ -218,8 +218,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_c
}
template <typename TSortCursor>
void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumnRawPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern)
void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumns & merged_columns, SortCursor & cursor, const Graphite::Pattern * next_pattern)
{
/// Copy unmodified column values.
for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i)
@ -238,7 +237,7 @@ void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumnRawPtrs & m
}
void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumnRawPtrs & merged_columns)
void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumns & merged_columns)
{
/// Insert calculated values of the columns `time`, `value`, `version`.
merged_columns[time_column_num]->insert(UInt64(current_time_rounded));
@ -252,7 +251,7 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumnRawPtrs
}
else
merged_columns[value_column_num]->insertFrom(
*current_selected_row.columns[value_column_num], current_selected_row.row_num);
*current_selected_row.columns[value_column_num], current_selected_row.row_num);
}

View File

@ -195,15 +195,13 @@ private:
UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const;
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Insert the values into the resulting columns, which will not be changed in the future.
template <typename TSortCursor>
void startNextRow(MutableColumnRawPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern);
void startNextRow(MutableColumns & merged_columns, SortCursor & cursor, const Graphite::Pattern * next_pattern);
/// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows.
void finishCurrentRow(MutableColumnRawPtrs & merged_columns);
void finishCurrentRow(MutableColumns & merged_columns);
/// Update the state of the aggregate function with the new `value`.
void accumulateRow(RowRef & row);

View File

@ -91,13 +91,13 @@ void IProfilingBlockInputStream::readSuffix()
void IProfilingBlockInputStream::updateExtremes(Block & block)
{
size_t columns = block.columns();
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns;
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
@ -120,11 +120,11 @@ void IProfilingBlockInputStream::updateExtremes(Block & block)
}
}
extremes = block.cloneWithColumns(extremes_columns);
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
@ -144,7 +144,7 @@ void IProfilingBlockInputStream::updateExtremes(Block & block)
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumn new_extremes = old_extremes->cloneEmpty();
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);

View File

@ -204,12 +204,10 @@ Block MergeSortingBlocksBlockInputStream::readImpl()
template <typename TSortCursor>
Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCursor> & queue)
{
Block merged = blocks[0].cloneEmpty();
size_t num_columns = blocks[0].columns();
MutableColumnRawPtrs merged_columns;
for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve
merged_columns.push_back(merged.safeGetByPosition(i).column.get());
MutableColumnPtrs merged_columns = blocks[0].cloneEmptyColumns();
/// TODO: reserve (in each column)
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
@ -231,18 +229,18 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue<TSortCur
if (limit && total_merged_rows == limit)
{
blocks.clear();
return merged;
return blocks[0].cloneWithColumns(merged_columns);
}
++merged_rows;
if (merged_rows == max_merged_block_size)
return merged;
return blocks[0].cloneWithColumns(merged_columns);
}
if (merged_rows == 0)
merged.clear();
return {};
return merged;
return blocks[0].cloneWithColumns(merged_columns);
}

View File

@ -46,7 +46,7 @@ String MergingSortedBlockInputStream::getID() const
return res.str();
}
void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawPtrs & merged_columns)
void MergingSortedBlockInputStream::init(Block & header, MutableColumnRawPtrs & merged_columns)
{
/// Read the first blocks, initialize the queue.
if (first)
@ -95,7 +95,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawP
if (*shared_block_ptr)
{
merged_block = shared_block_ptr->cloneEmpty();
header = shared_block_ptr->cloneEmpty();
break;
}
}
@ -114,7 +114,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawP
continue;
size_t src_columns = shared_block_ptr->columns();
size_t dst_columns = merged_block.columns();
size_t dst_columns = header.columns();
if (src_columns != dst_columns)
throw Exception("Merging blocks has different number of columns ("
@ -122,22 +122,17 @@ void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawP
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t i = 0; i < src_columns; ++i)
{
if (shared_block_ptr->safeGetByPosition(i).name != merged_block.safeGetByPosition(i).name
|| shared_block_ptr->safeGetByPosition(i).type->getName() != merged_block.safeGetByPosition(i).type->getName()
|| shared_block_ptr->safeGetByPosition(i).column->getName() != merged_block.safeGetByPosition(i).column->getName())
{
if (!blocksHaveEqualStructure(*shared_block_ptr, header))
throw Exception("Merging blocks has different names or types of columns:\n"
+ shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(),
+ shared_block_ptr->dumpStructure() + "\nand\n" + header.dumpStructure(),
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
}
}
merged_columns.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
merged_columns.emplace_back(merged_block.safeGetByPosition(i).column.get());
merged_columns.back()->reserve(expected_block_size);
merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty();
merged_columns[i]->reserve(expected_block_size);
}
}
@ -154,24 +149,24 @@ void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> &
Block MergingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
return {};
if (children.size() == 1)
return children[0]->read();
Block merged_block;
Block header;
MutableColumnRawPtrs merged_columns;
init(merged_block, merged_columns);
init(header, merged_columns);
if (merged_columns.empty())
return Block();
return {};
if (has_collation)
merge(merged_block, merged_columns, queue_with_collation);
merge(merged_columns, queue_with_collation);
else
merge(merged_block, merged_columns, queue);
merge(merged_columns, queue);
return merged_block;
return header.cloneWithColumns(merged_columns);
}
@ -207,7 +202,7 @@ void MergingSortedBlockInputStream::fetchNextBlock<SortCursorWithCollation>(cons
template <typename TSortCursor>
void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void MergingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
@ -235,7 +230,7 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw
return false;
};
/// Take rows in required order and put them into `merged_block`, while the rows are no more than `max_block_size`
/// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size`
while (!queue.empty())
{
TSortCursor current = queue.top();
@ -243,8 +238,8 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw
while (true)
{
/** And what if the block is smaller or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block of current cursor.
/** And what if the block is totally less or equal than the rest for the current cursor?
* Or is there only one data source left in the queue? Then you can take the entire block on current cursor.
*/
if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top())))
{
@ -265,18 +260,18 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw
throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < num_columns; ++i)
merged_block.getByPosition(i).column = source_blocks[source_num]->getByPosition(i).column;
merged_columns[i] = source_blocks[source_num]->getByPosition(i).column->mutate();
// std::cerr << "copied columns\n";
size_t merged_rows = merged_block.rows();
size_t merged_rows = merged_columns.at(0)->size();
if (limit && total_merged_rows + merged_rows > limit)
{
merged_rows = limit - total_merged_rows;
for (size_t i = 0; i < num_columns; ++i)
{
auto & column = merged_block.getByPosition(i).column;
auto & column = merged_columns[i];
column = column->cut(0, merged_rows);
}

View File

@ -113,7 +113,7 @@ protected:
void readSuffixImpl() override;
/// Initializes the queue and the next result block.
void init(Block & merged_block, MutableColumnRawPtrs & merged_columns);
void init(Block & header, MutableColumns & merged_columns);
/// Gets the next block from the source corresponding to the `current`.
template <typename TSortCursor>
@ -214,7 +214,7 @@ private:
void initQueue(std::priority_queue<TSortCursor> & queue);
template <typename TSortCursor>
void merge(Block & merged_block, MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
Logger * log = &Logger::get("MergingSortedBlockInputStream");

View File

@ -6,8 +6,13 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ReplacingSortedBlockInputStream::insertRow(MutableColumnRawPtrs & merged_columns, size_t & merged_rows)
void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns, size_t & merged_rows)
{
if (out_row_sources_buf)
{
@ -33,10 +38,14 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (children.size() == 1)
return children[0]->read();
Block merged_block;
MutableColumnRawPtrs merged_columns;
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
@ -46,27 +55,22 @@ Block ReplacingSortedBlockInputStream::readImpl()
selected_row.columns.resize(num_columns);
if (!version_column.empty())
version_column_number = merged_block.getPositionByName(version_column);
version_column_number = header.getPositionByName(version_column);
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
}
template <typename TSortCursor>
void ReplacingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
size_t merged_rows = 0;
/// Take the rows in needed order and put them into `merged_block` until rows no more than `max_block_size`
/// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size`
while (!queue.empty())
{
TSortCursor current = queue.top();
SortCursor current = queue.top();
if (current_key.empty())
{

View File

@ -63,11 +63,10 @@ private:
PODArray<RowSourcePart> current_row_sources; /// Sources of rows with the current primary key
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output into result the rows for current primary key.
void insertRow(MutableColumnRawPtrs & merged_columns, size_t & merged_rows);
void insertRow(MutableColumns & merged_columns, size_t & merged_rows);
};
}

View File

@ -17,6 +17,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
String SummingSortedBlockInputStream::getID() const
{
@ -36,7 +41,7 @@ String SummingSortedBlockInputStream::getID() const
}
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumnRawPtrs & merged_columns, bool force_insertion)
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)
{
@ -111,12 +116,16 @@ Block SummingSortedBlockInputStream::readImpl()
if (children.size() == 1)
return children[0]->read();
Block merged_block;
MutableColumnRawPtrs merged_columns;
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
return {};
/// Additional initialization.
if (current_row.empty())
@ -134,7 +143,7 @@ Block SummingSortedBlockInputStream::readImpl()
*/
for (size_t i = 0; i < num_columns; ++i)
{
ColumnWithTypeAndName & column = merged_block.safeGetByPosition(i);
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
/// Discover nested Maps and find columns for summation
if (typeid_cast<const DataTypeArray *>(column.type.get()))
@ -196,7 +205,7 @@ Block SummingSortedBlockInputStream::readImpl()
/// no elements of map could be in primary key
auto column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
if (isInPrimaryKey(description, merged_block.safeGetByPosition(*column_num_it).name, *column_num_it))
if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it))
break;
if (column_num_it != map.second.end())
{
@ -212,7 +221,7 @@ Block SummingSortedBlockInputStream::readImpl()
column_num_it = map.second.begin();
for (; column_num_it != map.second.end(); ++column_num_it)
{
const ColumnWithTypeAndName & key_col = merged_block.safeGetByPosition(*column_num_it);
const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it);
const String & name = key_col.name;
const IDataType & nested_type = *static_cast<const DataTypeArray *>(key_col.type.get())->getNestedType();
@ -271,32 +280,27 @@ Block SummingSortedBlockInputStream::readImpl()
size_t tuple_size = desc.column_numbers.size();
Columns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
tuple_columns[i] = merged_block.safeGetByPosition(desc.column_numbers[i]).column;
tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column;
desc.merged_column = ColumnTuple::create(tuple_columns);
}
else
desc.merged_column = merged_block.safeGetByPosition(desc.column_numbers[0]).column;
desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty();
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
merge(merged_columns, queue);
return header.cloneWithColumns(merged_columns);
}
template <typename TSortCursor>
void SummingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue)
{
merged_rows = 0;
/// Take the rows in needed order and put them in `merged_block` until rows no more than `max_block_size`
/// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size`
while (!queue.empty())
{
TSortCursor current = queue.top();
SortCursor current = queue.top();
setPrimaryKeyRef(next_key, current);
@ -364,8 +368,8 @@ void SummingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns,
finished = true;
}
template <typename TSortCursor>
bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & row, TSortCursor & cursor)
bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & row, SortCursor & cursor)
{
/// Strongly non-optimal.
@ -448,8 +452,7 @@ bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row &
}
template <typename TSortCursor>
void SummingSortedBlockInputStream::addRow(TSortCursor & cursor)
void SummingSortedBlockInputStream::addRow(SortCursor & cursor)
{
for (auto & desc : columns_to_aggregate)
{

View File

@ -77,7 +77,7 @@ private:
AggregateFunctionPtr function;
IAggregateFunction::AddFunc add_function = nullptr;
std::vector<size_t> column_numbers;
ColumnPtr merged_column;
MutableColumnPtr merged_column;
std::vector<char> state;
bool created = false;
@ -138,16 +138,14 @@ private:
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.
*/
template <typename TSortCursor>
void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero".
/// If force_insertion=true, then the row will be inserted even if it is "zero"
void insertCurrentRowIfNeeded(MutableColumnRawPtrs & merged_columns, bool force_insertion);
void insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion);
/// Returns true if merge result is not empty
template <typename TSortCursor>
bool mergeMap(const MapDescription & map, Row & row, TSortCursor & cursor);
bool mergeMap(const MapDescription & map, Row & row, SortCursor & cursor);
// Add the row under the cursor to the `row`.
template <typename TSortCursor>