diff --git a/contrib/poco b/contrib/poco index bcf9ebad48b..81d4fdfcb88 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d +Subproject commit 81d4fdfcb887f89b0f7b1e9b503cbe63e6d8366b diff --git a/dbms/src/Columns/FilterDescription.cpp b/dbms/src/Columns/FilterDescription.cpp index 4d39a9195f3..11c110912dc 100644 --- a/dbms/src/Columns/FilterDescription.cpp +++ b/dbms/src/Columns/FilterDescription.cpp @@ -15,7 +15,7 @@ namespace ErrorCodes } -FilterDescription::FilterDescription(const IColumn & column) +ConstantFilterDescription::ConstantFilterDescription(const IColumn & column) { if (column.onlyNull()) { @@ -31,7 +31,11 @@ FilterDescription::FilterDescription(const IColumn & column) always_false = true; return; } +} + +FilterDescription::FilterDescription(const IColumn & column) +{ if (const ColumnUInt8 * concrete_column = typeid_cast(&column)) { data = &concrete_column->getData(); diff --git a/dbms/src/Columns/FilterDescription.h b/dbms/src/Columns/FilterDescription.h index 79d6e2418dc..0c9c4c217ff 100644 --- a/dbms/src/Columns/FilterDescription.h +++ b/dbms/src/Columns/FilterDescription.h @@ -6,15 +6,27 @@ namespace DB { -/// Obtain a filter from Column, that may have type: UInt8, Nullable(UInt8), Const(UInt8), Const(Nullable(UInt8)). -struct FilterDescription +/// Support methods for implementation of WHERE, PREWHERE and HAVING. + + +/// Analyze if the column for filter is constant thus filter is always false or always true. +struct ConstantFilterDescription { bool always_false = false; bool always_true = false; + + ConstantFilterDescription() {} + explicit ConstantFilterDescription(const IColumn & column); +}; + + +/// Obtain a filter from non constant Column, that may have type: UInt8, Nullable(UInt8). +struct FilterDescription +{ const IColumn::Filter * data = nullptr; /// Pointer to filter when it is not always true or always false. ColumnPtr data_holder; /// If new column was generated, it will be owned by holder. - FilterDescription(const IColumn & column); + explicit FilterDescription(const IColumn & column); }; } diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp index 20e87d03d3f..6017a5c9ddf 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.cpp @@ -6,6 +6,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + Block AggregatingSortedBlockInputStream::readImpl() { @@ -15,10 +20,14 @@ Block AggregatingSortedBlockInputStream::readImpl() if (children.size() == 1) return children[0]->read(); - Block merged_block; + Block header; MutableColumnRawPtrs merged_columns; - init(merged_block, merged_columns); + init(header, merged_columns); + + if (has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); + if (merged_columns.empty()) return Block(); @@ -30,7 +39,7 @@ Block AggregatingSortedBlockInputStream::readImpl() /// Fill in the column numbers that need to be aggregated. for (size_t i = 0; i < num_columns; ++i) { - ColumnWithTypeAndName & column = merged_block.safeGetByPosition(i); + ColumnWithTypeAndName & column = header.safeGetByPosition(i); /// We leave only states of aggregate functions. if (!startsWith(column.type->getName(), "AggregateFunction")) @@ -59,24 +68,19 @@ Block AggregatingSortedBlockInputStream::readImpl() for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i) columns_to_aggregate[i] = typeid_cast(merged_columns[column_numbers_to_aggregate[i]]); - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue); - - return merged_block; + merge(merged_columns, queue); + return header.cloneWithColumns(merged_columns); } -template -void AggregatingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void AggregatingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; /// We take the rows in the correct order and put them in `merged_block`, while the rows are no more than `max_block_size` while (!queue.empty()) { - TSortCursor current = queue.top(); + SortCursor current = queue.top(); setPrimaryKeyRef(next_key, current); @@ -133,8 +137,7 @@ void AggregatingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_colu } -template -void AggregatingSortedBlockInputStream::addRow(TSortCursor & cursor) +void AggregatingSortedBlockInputStream::addRow(SortCursor & cursor) { for (size_t i = 0, size = column_numbers_to_aggregate.size(); i < size; ++i) { diff --git a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h index 763dc2568c4..1ce5b691ec2 100644 --- a/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingSortedBlockInputStream.h @@ -70,12 +70,10 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - template void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); /** Extract all states of aggregate functions and merge them with the current group. */ - template void addRow(TSortCursor & cursor); }; diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp index a4e1259dcba..9fd16bf380c 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp @@ -12,6 +12,7 @@ namespace DB namespace ErrorCodes { extern const int INCORRECT_DATA; + extern const int LOGICAL_ERROR; } @@ -39,14 +40,14 @@ void CollapsingSortedBlockInputStream::reportIncorrectData() } -void CollapsingSortedBlockInputStream::insertRows(MutableColumnRawPtrs & merged_columns, size_t & merged_rows, bool last_in_stream) +void CollapsingSortedBlockInputStream::insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream) { if (count_positive == 0 && count_negative == 0) return; if (count_positive == count_negative && !last_is_positive) { - /// If all the rows in the input streams collapsed, we still want to give at least one block in the result. + /// If all the rows in the input streams was collapsed, we still want to give at least one block in the result. if (last_in_stream && merged_rows == 0 && !blocks_written) { LOG_INFO(log, "All rows collapsed"); @@ -97,25 +98,29 @@ void CollapsingSortedBlockInputStream::insertRows(MutableColumnRawPtrs & merged_ if (out_row_sources_buf) out_row_sources_buf->write( - reinterpret_cast(current_row_sources.data()), - current_row_sources.size() * sizeof(RowSourcePart)); + reinterpret_cast(current_row_sources.data()), + current_row_sources.size() * sizeof(RowSourcePart)); } Block CollapsingSortedBlockInputStream::readImpl() { if (finished) - return Block(); + return {}; if (children.size() == 1) return children[0]->read(); - Block merged_block; - MutableColumnRawPtrs merged_columns; + Block header; + MutableColumns merged_columns; + + init(header, merged_columns); + + if (has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); - init(merged_block, merged_columns); if (merged_columns.empty()) - return Block(); + return {}; /// Additional initialization. if (first_negative.empty()) @@ -124,27 +129,22 @@ Block CollapsingSortedBlockInputStream::readImpl() last_negative.columns.resize(num_columns); last_positive.columns.resize(num_columns); - sign_column_number = merged_block.getPositionByName(sign_column); + sign_column_number = header.getPositionByName(sign_column); } - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue); - - return merged_block; + merge(merged_columns, queue); + return header.cloneWithColumns(merged_columns); } -template -void CollapsingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; - /// Take rows in correct order and put them into `merged_block` until the rows no more than `max_block_size` + /// Take rows in correct order and put them into `merged_columns` until the rows no more than `max_block_size` for (; !queue.empty(); ++current_pos) { - TSortCursor current = queue.top(); + SortCursor current = queue.top(); if (current_key.empty()) { diff --git a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h index 9399b301e4b..88a314c7ecd 100644 --- a/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/CollapsingSortedBlockInputStream.h @@ -90,11 +90,10 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursors and calls to virtual functions. */ - template - void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, std::priority_queue & queue); /// Output to result rows for the current primary key. - void insertRows(MutableColumnRawPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false); + void insertRows(MutableColumns & merged_columns, size_t & merged_rows, bool last_in_stream = false); void reportIncorrectData(); }; diff --git a/dbms/src/DataStreams/ColumnGathererStream.cpp b/dbms/src/DataStreams/ColumnGathererStream.cpp index bf2f6680a90..f6f94664365 100644 --- a/dbms/src/DataStreams/ColumnGathererStream.cpp +++ b/dbms/src/DataStreams/ColumnGathererStream.cpp @@ -88,9 +88,10 @@ Block ColumnGathererStream::readImpl() if (!source_to_fully_copy && row_sources_buf.eof()) return Block(); - MutableColumnPtr output_column = column.column->cloneEmpty(); - output_column->gather(*this); - return output_block.cloneWithColumns(MutableColumns{std::move(output_column)}); + MutableColumns output_columns(1); + output_columns[0] = column.column->cloneEmpty(); + output_columns[0]->gather(*this); + return output_block.cloneWithColumns(std::move(output_columns)); } diff --git a/dbms/src/DataStreams/FilterBlockInputStream.cpp b/dbms/src/DataStreams/FilterBlockInputStream.cpp index 340ca4de411..e3430119a1b 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.cpp +++ b/dbms/src/DataStreams/FilterBlockInputStream.cpp @@ -79,9 +79,9 @@ Block FilterBlockInputStream::readImpl() ColumnPtr column = sample_block.safeGetByPosition(filter_column_in_sample_block).column; if (column) - analyzeConstantFilter(*column, filter_always_false, filter_always_true); + constant_filter_description = ConstantFilterDescription(*column); - if (filter_always_false) + if (constant_filter_description.always_false) return res; } @@ -94,7 +94,7 @@ Block FilterBlockInputStream::readImpl() expression->execute(res); - if (filter_always_true) + if (constant_filter_description.always_true) return res; /// Find the current position of the filter column in the block. @@ -104,49 +104,23 @@ Block FilterBlockInputStream::readImpl() size_t columns = res.columns(); ColumnPtr column = res.safeGetByPosition(filter_column).column; - auto filter_and_holder = getFilterFromColumn(*column); + /** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet, + * and now - are calculated. That is, not all cases are covered by the code above. + * This happens if the function returns a constant for a non-constant argument. + * For example, `ignore` function. + */ + constant_filter_description = ConstantFilterDescription(*column); - IColumn * observed_column = column.get(); - bool is_nullable_column = observed_column->isColumnNullable(); - if (is_nullable_column) - observed_column = &static_cast(*column.get()).getNestedColumn(); - - ColumnUInt8 * column_vec = typeid_cast(observed_column); - if (!column_vec) + if (constant_filter_description.always_false) { - /** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet, - * and now - are calculated. That is, not all cases are covered by the code above. - * This happens if the function returns a constant for a non-constant argument. - * For example, `ignore` function. - */ - analyzeConstantFilter(*observed_column, filter_always_false, filter_always_true); - - if (filter_always_false) - { - res.clear(); - return res; - } - - if (filter_always_true) - return res; - - throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8 or Nullable variants of them.", - ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER); + res.clear(); + return res; } - IColumn::Filter & filter = column_vec->getData(); + if (constant_filter_description.always_true) + return res; - if (is_nullable_column) - { - /// Exclude the entries of the filter column that actually are NULL values. - - const NullMap & null_map = static_cast(*column).getNullMapData(); - - IColumn::Filter & filter = column_vec->getData(); - for (size_t i = 0, size = null_map.size(); i < size; ++i) - if (null_map[i]) - filter[i] = 0; - } + FilterDescription filter_and_holder(*column); /** Let's find out how many rows will be in result. * To do this, we filter out the first non-constant column @@ -168,12 +142,12 @@ Block FilterBlockInputStream::readImpl() if (first_non_constant_column != static_cast(filter_column)) { ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column); - current_column.column = current_column.column->filter(filter, -1); + current_column.column = current_column.column->filter(*filter_and_holder.data, -1); filtered_rows = current_column.column->size(); } else { - filtered_rows = countBytesInFilter(filter); + filtered_rows = countBytesInFilter(*filter_and_holder.data); } /// If the current block is completely filtered out, let's move on to the next one. @@ -181,7 +155,7 @@ Block FilterBlockInputStream::readImpl() continue; /// If all the rows pass through the filter. - if (filtered_rows == filter.size()) + if (filtered_rows == filter_and_holder.data->size()) { /// Replace the column with the filter by a constant. res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1)); @@ -211,7 +185,7 @@ Block FilterBlockInputStream::readImpl() if (current_column.column->isColumnConst()) current_column.column = current_column.column->cut(0, filtered_rows); else - current_column.column = current_column.column->filter(filter, -1); + current_column.column = current_column.column->filter(*filter_and_holder.data, -1); } return res; diff --git a/dbms/src/DataStreams/FilterBlockInputStream.h b/dbms/src/DataStreams/FilterBlockInputStream.h index 651fab7f514..56a38b9e39a 100644 --- a/dbms/src/DataStreams/FilterBlockInputStream.h +++ b/dbms/src/DataStreams/FilterBlockInputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB @@ -36,8 +37,8 @@ private: String filter_column_name; bool is_first = true; - bool filter_always_true = false; - bool filter_always_false = false; + + ConstantFilterDescription constant_filter_description; }; } diff --git a/dbms/src/DataStreams/FormatFactory.cpp b/dbms/src/DataStreams/FormatFactory.cpp index f3831b32427..c9954dd8352 100644 --- a/dbms/src/DataStreams/FormatFactory.cpp +++ b/dbms/src/DataStreams/FormatFactory.cpp @@ -63,7 +63,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu } else if (name == "RowBinary") { - return wrap_row_stream(std::make_shared(buf)); + return wrap_row_stream(std::make_shared(buf, sample)); } else if (name == "TabSeparated" || name == "TSV") /// TSV is a synonym/alias for the original TabSeparated format { @@ -79,7 +79,7 @@ BlockInputStreamPtr FormatFactory::getInput(const String & name, ReadBuffer & bu } else if (name == "Values") { - return wrap_row_stream(std::make_shared(buf, context, settings.input_format_values_interpret_expressions)); + return wrap_row_stream(std::make_shared(buf, sample, context, settings.input_format_values_interpret_expressions)); } else if (name == "CSV") { diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp index e15ccb79219..37b96c9fb0e 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.cpp @@ -8,6 +8,7 @@ namespace DB namespace ErrorCodes { extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int LOGICAL_ERROR; } @@ -67,10 +68,14 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() if (finished) return Block(); - Block merged_block; - MutableColumnRawPtrs merged_columns; + Block header; + MutableColumns merged_columns; + + init(header, merged_columns); + + if (has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); - init(merged_block, merged_columns); if (merged_columns.empty()) return Block(); @@ -85,10 +90,10 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() place_for_aggregate_state.resize(max_size_of_aggregate_state); /// Memoize column numbers in block. - path_column_num = merged_block.getPositionByName(params.path_column_name); - time_column_num = merged_block.getPositionByName(params.time_column_name); - value_column_num = merged_block.getPositionByName(params.value_column_name); - version_column_num = merged_block.getPositionByName(params.version_column_name); + path_column_num = header.getPositionByName(params.path_column_name); + time_column_num = header.getPositionByName(params.time_column_name); + value_column_num = header.getPositionByName(params.value_column_name); + version_column_num = header.getPositionByName(params.version_column_name); for (size_t i = 0; i < num_columns; ++i) if (i != time_column_num && i != value_column_num && i != version_column_num) @@ -98,23 +103,18 @@ Block GraphiteRollupSortedBlockInputStream::readImpl() current_selected_row.columns.resize(num_columns); } - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue); - - return merged_block; + merge(merged_columns, queue); + return header.cloneWithColumns(merged_columns); } -template -void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void GraphiteRollupSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { const DateLUTImpl & date_lut = DateLUT::instance(); size_t started_rows = 0; /// Number of times startNextRow() has been called. - /// Take rows in needed order and put them into `merged_block` until we get `max_block_size` rows. + /// Take rows in needed order and put them into `merged_columns` until we get `max_block_size` rows. /// /// Variables starting with current_* refer to the rows previously popped from the queue that will /// contribute towards current output row. @@ -122,7 +122,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_c while (!queue.empty()) { - TSortCursor next_cursor = queue.top(); + SortCursor next_cursor = queue.top(); StringRef next_path = next_cursor->all_columns[path_column_num]->getDataAt(next_cursor->pos); bool path_differs = is_first || next_path != current_path; @@ -218,8 +218,7 @@ void GraphiteRollupSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_c } -template -void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumnRawPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern) +void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumns & merged_columns, SortCursor & cursor, const Graphite::Pattern * next_pattern) { /// Copy unmodified column values. for (size_t i = 0, size = unmodified_column_numbers.size(); i < size; ++i) @@ -238,7 +237,7 @@ void GraphiteRollupSortedBlockInputStream::startNextRow(MutableColumnRawPtrs & m } -void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumnRawPtrs & merged_columns) +void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumns & merged_columns) { /// Insert calculated values of the columns `time`, `value`, `version`. merged_columns[time_column_num]->insert(UInt64(current_time_rounded)); @@ -252,7 +251,7 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentRow(MutableColumnRawPtrs } else merged_columns[value_column_num]->insertFrom( - *current_selected_row.columns[value_column_num], current_selected_row.row_num); + *current_selected_row.columns[value_column_num], current_selected_row.row_num); } diff --git a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h index 471dc595023..2aa69568f85 100644 --- a/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h +++ b/dbms/src/DataStreams/GraphiteRollupSortedBlockInputStream.h @@ -195,15 +195,13 @@ private: UInt32 selectPrecision(const Graphite::Retentions & retentions, time_t time) const; - template - void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, std::priority_queue & queue); /// Insert the values into the resulting columns, which will not be changed in the future. - template - void startNextRow(MutableColumnRawPtrs & merged_columns, TSortCursor & cursor, const Graphite::Pattern * next_pattern); + void startNextRow(MutableColumns & merged_columns, SortCursor & cursor, const Graphite::Pattern * next_pattern); /// Insert the calculated `time`, `value`, `version` values into the resulting columns by the last group of rows. - void finishCurrentRow(MutableColumnRawPtrs & merged_columns); + void finishCurrentRow(MutableColumns & merged_columns); /// Update the state of the aggregate function with the new `value`. void accumulateRow(RowRef & row); diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index 0892847935d..8db83998240 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -91,13 +91,13 @@ void IProfilingBlockInputStream::readSuffix() void IProfilingBlockInputStream::updateExtremes(Block & block) { - size_t columns = block.columns(); + size_t num_columns = block.columns(); if (!extremes) { - MutableColumns extremes_columns; + MutableColumns extremes_columns(num_columns); - for (size_t i = 0; i < columns; ++i) + for (size_t i = 0; i < num_columns; ++i) { const ColumnPtr & src = block.safeGetByPosition(i).column; @@ -120,11 +120,11 @@ void IProfilingBlockInputStream::updateExtremes(Block & block) } } - extremes = block.cloneWithColumns(extremes_columns); + extremes = block.cloneWithColumns(std::move(extremes_columns)); } else { - for (size_t i = 0; i < columns; ++i) + for (size_t i = 0; i < num_columns; ++i) { ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column; @@ -144,7 +144,7 @@ void IProfilingBlockInputStream::updateExtremes(Block & block) if (cur_max_value > max_value) max_value = cur_max_value; - MutableColumn new_extremes = old_extremes->cloneEmpty(); + MutableColumnPtr new_extremes = old_extremes->cloneEmpty(); new_extremes->insert(min_value); new_extremes->insert(max_value); diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 736998f8522..8de315a9725 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -204,12 +204,10 @@ Block MergeSortingBlocksBlockInputStream::readImpl() template Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queue & queue) { - Block merged = blocks[0].cloneEmpty(); size_t num_columns = blocks[0].columns(); - MutableColumnRawPtrs merged_columns; - for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve - merged_columns.push_back(merged.safeGetByPosition(i).column.get()); + MutableColumnPtrs merged_columns = blocks[0].cloneEmptyColumns(); + /// TODO: reserve (in each column) /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; @@ -231,18 +229,18 @@ Block MergeSortingBlocksBlockInputStream::mergeImpl(std::priority_queuecloneEmpty(); + header = shared_block_ptr->cloneEmpty(); break; } } @@ -114,7 +114,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawP continue; size_t src_columns = shared_block_ptr->columns(); - size_t dst_columns = merged_block.columns(); + size_t dst_columns = header.columns(); if (src_columns != dst_columns) throw Exception("Merging blocks has different number of columns (" @@ -122,22 +122,17 @@ void MergingSortedBlockInputStream::init(Block & merged_block, MutableColumnRawP ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH); for (size_t i = 0; i < src_columns; ++i) - { - if (shared_block_ptr->safeGetByPosition(i).name != merged_block.safeGetByPosition(i).name - || shared_block_ptr->safeGetByPosition(i).type->getName() != merged_block.safeGetByPosition(i).type->getName() - || shared_block_ptr->safeGetByPosition(i).column->getName() != merged_block.safeGetByPosition(i).column->getName()) - { + if (!blocksHaveEqualStructure(*shared_block_ptr, header)) throw Exception("Merging blocks has different names or types of columns:\n" - + shared_block_ptr->dumpStructure() + "\nand\n" + merged_block.dumpStructure(), + + shared_block_ptr->dumpStructure() + "\nand\n" + header.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE); - } - } } + merged_columns.resize(num_columns); for (size_t i = 0; i < num_columns; ++i) { - merged_columns.emplace_back(merged_block.safeGetByPosition(i).column.get()); - merged_columns.back()->reserve(expected_block_size); + merged_columns[i] = header.safeGetByPosition(i).column->cloneEmpty(); + merged_columns[i]->reserve(expected_block_size); } } @@ -154,24 +149,24 @@ void MergingSortedBlockInputStream::initQueue(std::priority_queue & Block MergingSortedBlockInputStream::readImpl() { if (finished) - return Block(); + return {}; if (children.size() == 1) return children[0]->read(); - Block merged_block; + Block header; MutableColumnRawPtrs merged_columns; - init(merged_block, merged_columns); + init(header, merged_columns); if (merged_columns.empty()) - return Block(); + return {}; if (has_collation) - merge(merged_block, merged_columns, queue_with_collation); + merge(merged_columns, queue_with_collation); else - merge(merged_block, merged_columns, queue); + merge(merged_columns, queue); - return merged_block; + return header.cloneWithColumns(merged_columns); } @@ -207,7 +202,7 @@ void MergingSortedBlockInputStream::fetchNextBlock(cons template -void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void MergingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; @@ -235,7 +230,7 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw return false; }; - /// Take rows in required order and put them into `merged_block`, while the rows are no more than `max_block_size` + /// Take rows in required order and put them into `merged_columns`, while the rows are no more than `max_block_size` while (!queue.empty()) { TSortCursor current = queue.top(); @@ -243,8 +238,8 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw while (true) { - /** And what if the block is smaller or equal than the rest for the current cursor? - * Or is there only one data source left in the queue? Then you can take the entire block of current cursor. + /** And what if the block is totally less or equal than the rest for the current cursor? + * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. */ if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) { @@ -265,18 +260,18 @@ void MergingSortedBlockInputStream::merge(Block & merged_block, MutableColumnRaw throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); for (size_t i = 0; i < num_columns; ++i) - merged_block.getByPosition(i).column = source_blocks[source_num]->getByPosition(i).column; + merged_columns[i] = source_blocks[source_num]->getByPosition(i).column->mutate(); // std::cerr << "copied columns\n"; - size_t merged_rows = merged_block.rows(); + size_t merged_rows = merged_columns.at(0)->size(); if (limit && total_merged_rows + merged_rows > limit) { merged_rows = limit - total_merged_rows; for (size_t i = 0; i < num_columns; ++i) { - auto & column = merged_block.getByPosition(i).column; + auto & column = merged_columns[i]; column = column->cut(0, merged_rows); } diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.h b/dbms/src/DataStreams/MergingSortedBlockInputStream.h index 8ba5aea736b..2a5c5e4584d 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.h @@ -113,7 +113,7 @@ protected: void readSuffixImpl() override; /// Initializes the queue and the next result block. - void init(Block & merged_block, MutableColumnRawPtrs & merged_columns); + void init(Block & header, MutableColumns & merged_columns); /// Gets the next block from the source corresponding to the `current`. template @@ -214,7 +214,7 @@ private: void initQueue(std::priority_queue & queue); template - void merge(Block & merged_block, MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); + void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); Logger * log = &Logger::get("MergingSortedBlockInputStream"); diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp index 168a1d3eaf3..89439698a01 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.cpp @@ -6,8 +6,13 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} -void ReplacingSortedBlockInputStream::insertRow(MutableColumnRawPtrs & merged_columns, size_t & merged_rows) + +void ReplacingSortedBlockInputStream::insertRow(MutableColumns & merged_columns, size_t & merged_rows) { if (out_row_sources_buf) { @@ -33,10 +38,14 @@ Block ReplacingSortedBlockInputStream::readImpl() if (children.size() == 1) return children[0]->read(); - Block merged_block; - MutableColumnRawPtrs merged_columns; + Block header; + MutableColumns merged_columns; + + init(header, merged_columns); + + if (has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); - init(merged_block, merged_columns); if (merged_columns.empty()) return Block(); @@ -46,27 +55,22 @@ Block ReplacingSortedBlockInputStream::readImpl() selected_row.columns.resize(num_columns); if (!version_column.empty()) - version_column_number = merged_block.getPositionByName(version_column); + version_column_number = header.getPositionByName(version_column); } - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue); - - return merged_block; + merge(merged_columns, queue); + return header.cloneWithColumns(merged_columns); } -template -void ReplacingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void ReplacingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { size_t merged_rows = 0; - /// Take the rows in needed order and put them into `merged_block` until rows no more than `max_block_size` + /// Take the rows in needed order and put them into `merged_columns` until rows no more than `max_block_size` while (!queue.empty()) { - TSortCursor current = queue.top(); + SortCursor current = queue.top(); if (current_key.empty()) { diff --git a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h index 37fe719e238..0e7fdadc25a 100644 --- a/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/ReplacingSortedBlockInputStream.h @@ -63,11 +63,10 @@ private: PODArray current_row_sources; /// Sources of rows with the current primary key - template - void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, std::priority_queue & queue); /// Output into result the rows for current primary key. - void insertRow(MutableColumnRawPtrs & merged_columns, size_t & merged_rows); + void insertRow(MutableColumns & merged_columns, size_t & merged_rows); }; } diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp index fd1373737b8..d29988b1693 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.cpp @@ -17,6 +17,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + String SummingSortedBlockInputStream::getID() const { @@ -36,7 +41,7 @@ String SummingSortedBlockInputStream::getID() const } -void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumnRawPtrs & merged_columns, bool force_insertion) +void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion) { for (auto & desc : columns_to_aggregate) { @@ -111,12 +116,16 @@ Block SummingSortedBlockInputStream::readImpl() if (children.size() == 1) return children[0]->read(); - Block merged_block; - MutableColumnRawPtrs merged_columns; + Block header; + MutableColumns merged_columns; + + init(header, merged_columns); + + if (has_collation) + throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); - init(merged_block, merged_columns); if (merged_columns.empty()) - return Block(); + return {}; /// Additional initialization. if (current_row.empty()) @@ -134,7 +143,7 @@ Block SummingSortedBlockInputStream::readImpl() */ for (size_t i = 0; i < num_columns; ++i) { - ColumnWithTypeAndName & column = merged_block.safeGetByPosition(i); + const ColumnWithTypeAndName & column = header.safeGetByPosition(i); /// Discover nested Maps and find columns for summation if (typeid_cast(column.type.get())) @@ -196,7 +205,7 @@ Block SummingSortedBlockInputStream::readImpl() /// no elements of map could be in primary key auto column_num_it = map.second.begin(); for (; column_num_it != map.second.end(); ++column_num_it) - if (isInPrimaryKey(description, merged_block.safeGetByPosition(*column_num_it).name, *column_num_it)) + if (isInPrimaryKey(description, header.safeGetByPosition(*column_num_it).name, *column_num_it)) break; if (column_num_it != map.second.end()) { @@ -212,7 +221,7 @@ Block SummingSortedBlockInputStream::readImpl() column_num_it = map.second.begin(); for (; column_num_it != map.second.end(); ++column_num_it) { - const ColumnWithTypeAndName & key_col = merged_block.safeGetByPosition(*column_num_it); + const ColumnWithTypeAndName & key_col = header.safeGetByPosition(*column_num_it); const String & name = key_col.name; const IDataType & nested_type = *static_cast(key_col.type.get())->getNestedType(); @@ -271,32 +280,27 @@ Block SummingSortedBlockInputStream::readImpl() size_t tuple_size = desc.column_numbers.size(); Columns tuple_columns(tuple_size); for (size_t i = 0; i < tuple_size; ++i) - tuple_columns[i] = merged_block.safeGetByPosition(desc.column_numbers[i]).column; + tuple_columns[i] = header.safeGetByPosition(desc.column_numbers[i]).column; desc.merged_column = ColumnTuple::create(tuple_columns); } else - desc.merged_column = merged_block.safeGetByPosition(desc.column_numbers[0]).column; + desc.merged_column = header.safeGetByPosition(desc.column_numbers[0]).column->cloneEmpty(); } - if (has_collation) - merge(merged_columns, queue_with_collation); - else - merge(merged_columns, queue); - - return merged_block; + merge(merged_columns, queue); + return header.cloneWithColumns(merged_columns); } -template -void SummingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue) +void SummingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::priority_queue & queue) { merged_rows = 0; - /// Take the rows in needed order and put them in `merged_block` until rows no more than `max_block_size` + /// Take the rows in needed order and put them in `merged_columns` until rows no more than `max_block_size` while (!queue.empty()) { - TSortCursor current = queue.top(); + SortCursor current = queue.top(); setPrimaryKeyRef(next_key, current); @@ -364,8 +368,8 @@ void SummingSortedBlockInputStream::merge(MutableColumnRawPtrs & merged_columns, finished = true; } -template -bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & row, TSortCursor & cursor) + +bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & row, SortCursor & cursor) { /// Strongly non-optimal. @@ -448,8 +452,7 @@ bool SummingSortedBlockInputStream::mergeMap(const MapDescription & desc, Row & } -template -void SummingSortedBlockInputStream::addRow(TSortCursor & cursor) +void SummingSortedBlockInputStream::addRow(SortCursor & cursor) { for (auto & desc : columns_to_aggregate) { diff --git a/dbms/src/DataStreams/SummingSortedBlockInputStream.h b/dbms/src/DataStreams/SummingSortedBlockInputStream.h index 63ffc160d8f..9b3bc7a0255 100644 --- a/dbms/src/DataStreams/SummingSortedBlockInputStream.h +++ b/dbms/src/DataStreams/SummingSortedBlockInputStream.h @@ -77,7 +77,7 @@ private: AggregateFunctionPtr function; IAggregateFunction::AddFunc add_function = nullptr; std::vector column_numbers; - ColumnPtr merged_column; + MutableColumnPtr merged_column; std::vector state; bool created = false; @@ -138,16 +138,14 @@ private: /** We support two different cursors - with Collation and without. * Templates are used instead of polymorphic SortCursor and calls to virtual functions. */ - template - void merge(MutableColumnRawPtrs & merged_columns, std::priority_queue & queue); + void merge(MutableColumns & merged_columns, std::priority_queue & queue); /// Insert the summed row for the current group into the result and updates some of per-block flags if the row is not "zero". /// If force_insertion=true, then the row will be inserted even if it is "zero" - void insertCurrentRowIfNeeded(MutableColumnRawPtrs & merged_columns, bool force_insertion); + void insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion); /// Returns true if merge result is not empty - template - bool mergeMap(const MapDescription & map, Row & row, TSortCursor & cursor); + bool mergeMap(const MapDescription & map, Row & row, SortCursor & cursor); // Add the row under the cursor to the `row`. template