Improved code after introduction of method "getHeader" in every stream [#CLICKHOUSE-2]

This commit is contained in:
Alexey Milovidov 2018-04-07 04:46:50 +03:00 committed by proller
parent 40a8859803
commit 7512754ae5
16 changed files with 291 additions and 331 deletions

View File

@ -12,25 +12,10 @@ namespace ErrorCodes
} }
Block AggregatingSortedBlockInputStream::readImpl() AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{ {
if (finished)
return Block();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return Block();
/// Additional initialization.
if (next_key.empty())
{
/// Fill in the column numbers that need to be aggregated. /// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
{ {
@ -57,7 +42,22 @@ Block AggregatingSortedBlockInputStream::readImpl()
column_numbers_to_aggregate.push_back(i); column_numbers_to_aggregate.push_back(i);
} }
} }
Block AggregatingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
MutableColumns merged_columns;
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return Block();
columns_to_aggregate.resize(column_numbers_to_aggregate.size()); columns_to_aggregate.resize(column_numbers_to_aggregate.size());
for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i) for (size_t i = 0, size = columns_to_aggregate.size(); i < size; ++i)

View File

@ -21,10 +21,8 @@ namespace DB
class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream class AggregatingSortedBlockInputStream : public MergingSortedBlockInputStream
{ {
public: public:
AggregatingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_) AggregatingSortedBlockInputStream(
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_) const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_);
{
}
String getName() const override { return "AggregatingSorted"; } String getName() const override { return "AggregatingSorted"; }

View File

@ -108,10 +108,8 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (finished) if (finished)
return {}; return {};
Block header;
MutableColumns merged_columns; MutableColumns merged_columns;
init(merged_columns);
init(header, merged_columns);
if (has_collation) if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -119,11 +117,6 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (merged_columns.empty()) if (merged_columns.empty())
return {}; return {};
/// Additional initialization.
if (first_negative.empty())
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue); merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }

View File

@ -25,10 +25,10 @@ class CollapsingSortedBlockInputStream : public MergingSortedBlockInputStream
public: public:
CollapsingSortedBlockInputStream( CollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, BlockInputStreams inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) const String & sign_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
{ {
sign_column_number = header.getPositionByName(sign_column);
} }
String getName() const override { return "CollapsingSorted"; } String getName() const override { return "CollapsingSorted"; }
@ -38,8 +38,7 @@ protected:
Block readImpl() override; Block readImpl() override;
private: private:
String sign_column; size_t sign_column_number;
size_t sign_column_number = 0;
Logger * log = &Logger::get("CollapsingSortedBlockInputStream"); Logger * log = &Logger::get("CollapsingSortedBlockInputStream");

View File

@ -12,6 +12,31 @@ namespace ErrorCodes
} }
GraphiteRollupSortedBlockInputStream::GraphiteRollupSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
params(params), time_of_merge(time_of_merge)
{
size_t max_size_of_aggregate_state = 0;
for (const auto & pattern : params.patterns)
if (pattern.function->sizeOfData() > max_size_of_aggregate_state)
max_size_of_aggregate_state = pattern.function->sizeOfData();
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Memoize column numbers in block.
path_column_num = header.getPositionByName(params.path_column_name);
time_column_num = header.getPositionByName(params.time_column_name);
value_column_num = header.getPositionByName(params.value_column_name);
version_column_num = header.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
}
const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const const Graphite::Pattern * GraphiteRollupSortedBlockInputStream::selectPatternForPath(StringRef path) const
{ {
for (const auto & pattern : params.patterns) for (const auto & pattern : params.patterns)
@ -68,10 +93,8 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (finished) if (finished)
return Block(); return Block();
Block header;
MutableColumns merged_columns; MutableColumns merged_columns;
init(merged_columns);
init(header, merged_columns);
if (has_collation) if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -79,27 +102,6 @@ Block GraphiteRollupSortedBlockInputStream::readImpl()
if (merged_columns.empty()) if (merged_columns.empty())
return Block(); return Block();
/// Additional initialization.
if (is_first)
{
size_t max_size_of_aggregate_state = 0;
for (const auto & pattern : params.patterns)
if (pattern.function->sizeOfData() > max_size_of_aggregate_state)
max_size_of_aggregate_state = pattern.function->sizeOfData();
place_for_aggregate_state.resize(max_size_of_aggregate_state);
/// Memoize column numbers in block.
path_column_num = header.getPositionByName(params.path_column_name);
time_column_num = header.getPositionByName(params.time_column_name);
value_column_num = header.getPositionByName(params.value_column_name);
version_column_num = header.getPositionByName(params.version_column_name);
for (size_t i = 0; i < num_columns; ++i)
if (i != time_column_num && i != value_column_num && i != version_column_num)
unmodified_column_numbers.push_back(i);
}
merge(merged_columns, queue); merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }

View File

@ -126,12 +126,8 @@ class GraphiteRollupSortedBlockInputStream : public MergingSortedBlockInputStrea
{ {
public: public:
GraphiteRollupSortedBlockInputStream( GraphiteRollupSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, size_t max_block_size_, const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
const Graphite::Params & params, time_t time_of_merge) const Graphite::Params & params, time_t time_of_merge);
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
params(params), time_of_merge(time_of_merge)
{
}
String getName() const override { return "GraphiteRollupSorted"; } String getName() const override { return "GraphiteRollupSorted"; }

View File

@ -63,14 +63,21 @@ static void enrichBlockWithConstants(Block & block, const Block & header)
} }
Block MergeSortingBlockInputStream::readImpl() MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{ {
if (!header) children.push_back(input);
{
header = getHeader(); header = getHeader();
removeConstantsFromSortDescription(header, description); removeConstantsFromSortDescription(header, description);
} }
Block MergeSortingBlockInputStream::readImpl()
{
/** Algorithm: /** Algorithm:
* - read to memory blocks from source stream; * - read to memory blocks from source stream;
* - if too many of them and if external sorting is enabled, * - if too many of them and if external sorting is enabled,

View File

@ -73,12 +73,7 @@ public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order. /// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, size_t limit_, size_t max_merged_block_size_, size_t limit_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_) size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
{
children.push_back(input);
}
String getName() const override { return "MergeSorting"; } String getName() const override { return "MergeSorting"; }

View File

@ -15,15 +15,17 @@ namespace ErrorCodes
MergingSortedBlockInputStream::MergingSortedBlockInputStream( MergingSortedBlockInputStream::MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_, const BlockInputStreams & inputs_, const SortDescription & description_,
size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_) size_t max_block_size_, size_t limit_, WriteBuffer * out_row_sources_buf_, bool quiet_)
: description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_) : description(description_), max_block_size(max_block_size_), limit(limit_), quiet(quiet_)
, source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_) , source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources_buf(out_row_sources_buf_)
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
header = children.at(0)->getHeader();
num_columns = header.columns();
} }
void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged_columns) void MergingSortedBlockInputStream::init(MutableColumns & merged_columns)
{ {
/// Read the first blocks, initialize the queue. /// Read the first blocks, initialize the queue.
if (first) if (first)
@ -44,9 +46,6 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
if (rows == 0) if (rows == 0)
continue; continue;
if (!num_columns)
num_columns = shared_block_ptr->columns();
if (expected_block_size < rows) if (expected_block_size < rows)
expected_block_size = std::min(rows, max_block_size); expected_block_size = std::min(rows, max_block_size);
@ -62,32 +61,9 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
initQueue(queue); initQueue(queue);
} }
/// Initialize the result.
/// We clone the structure of the first non-empty source block.
{
auto it = source_blocks.cbegin();
for (; it != source_blocks.cend(); ++it)
{
const SharedBlockPtr & shared_block_ptr = *it;
if (*shared_block_ptr)
{
header = shared_block_ptr->cloneEmpty();
break;
}
}
/// If all the input blocks are empty.
if (it == source_blocks.cend())
return;
}
/// Let's check that all source blocks have the same structure. /// Let's check that all source blocks have the same structure.
for (auto it = source_blocks.cbegin(); it != source_blocks.cend(); ++it) for (const SharedBlockPtr & shared_block_ptr : source_blocks)
{ {
const SharedBlockPtr & shared_block_ptr = *it;
if (!*shared_block_ptr) if (!*shared_block_ptr)
continue; continue;
@ -120,10 +96,9 @@ Block MergingSortedBlockInputStream::readImpl()
if (children.size() == 1) if (children.size() == 1)
return children[0]->read(); return children[0]->read();
Block header;
MutableColumns merged_columns; MutableColumns merged_columns;
init(header, merged_columns); init(merged_columns);
if (merged_columns.empty()) if (merged_columns.empty())
return {}; return {};

View File

@ -65,7 +65,7 @@ public:
* quiet - don't log profiling info * quiet - don't log profiling info
*/ */
MergingSortedBlockInputStream( MergingSortedBlockInputStream(
BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_, const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_,
size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false); size_t limit_ = 0, WriteBuffer * out_row_sources_buf_ = nullptr, bool quiet_ = false);
String getName() const override { return "MergingSorted"; } String getName() const override { return "MergingSorted"; }
@ -74,7 +74,7 @@ public:
bool isSortedOutput() const override { return true; } bool isSortedOutput() const override { return true; }
const SortDescription & getSortDescription() const override { return description; } const SortDescription & getSortDescription() const override { return description; }
Block getHeader() const override { return children.at(0)->getHeader(); } Block getHeader() const override { return header; }
protected: protected:
struct RowRef struct RowRef
@ -120,14 +120,16 @@ protected:
void readSuffixImpl() override; void readSuffixImpl() override;
/// Initializes the queue and the next result block. /// Initializes the queue and the columns of next result block.
void init(Block & header, MutableColumns & merged_columns); void init(MutableColumns & merged_columns);
/// Gets the next block from the source corresponding to the `current`. /// Gets the next block from the source corresponding to the `current`.
template <typename TSortCursor> template <typename TSortCursor>
void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue); void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
Block header;
const SortDescription description; const SortDescription description;
const size_t max_block_size; const size_t max_block_size;
size_t limit; size_t limit;

View File

@ -35,10 +35,8 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (finished) if (finished)
return Block(); return Block();
Block header;
MutableColumns merged_columns; MutableColumns merged_columns;
init(merged_columns);
init(header, merged_columns);
if (has_collation) if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
@ -46,13 +44,6 @@ Block ReplacingSortedBlockInputStream::readImpl()
if (merged_columns.empty()) if (merged_columns.empty())
return Block(); return Block();
/// Additional initialization.
if (selected_row.empty())
{
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
}
merge(merged_columns, queue); merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }

View File

@ -15,11 +15,13 @@ namespace DB
class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream
{ {
public: public:
ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_, ReplacingSortedBlockInputStream(
const String & version_column_, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr) const BlockInputStreams & inputs_, const SortDescription & description_,
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_), const String & version_column, size_t max_block_size_, WriteBuffer * out_row_sources_buf_ = nullptr)
version_column(version_column_) : MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
{ {
if (!version_column.empty())
version_column_number = header.getPositionByName(version_column);
} }
String getName() const override { return "ReplacingSorted"; } String getName() const override { return "ReplacingSorted"; }
@ -29,7 +31,6 @@ protected:
Block readImpl() override; Block readImpl() override;
private: private:
String version_column;
ssize_t version_column_number = -1; ssize_t version_column_number = -1;
Logger * log = &Logger::get("ReplacingSortedBlockInputStream"); Logger * log = &Logger::get("ReplacingSortedBlockInputStream");

View File

@ -24,60 +24,6 @@ namespace ErrorCodes
} }
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)
{
// Do not insert if the aggregation state hasn't been created
if (desc.created)
{
try
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
}
catch (...)
{
desc.destroyState();
throw;
}
desc.destroyState();
}
else
desc.merged_column->insertDefault();
}
/// If it is "zero" row and it is not the last row of the result block, then
/// rollback the insertion (at this moment we need rollback only cols from columns_to_aggregate)
if (!force_insertion && current_row_is_zero)
{
for (auto & desc : columns_to_aggregate)
desc.merged_column->popBack(1);
return;
}
for (auto i : column_numbers_not_to_aggregate)
merged_columns[i]->insert(current_row[i]);
/// Update per-block and per-group flags
++merged_rows;
output_is_non_empty = true;
}
namespace namespace
{ {
bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number) bool isInPrimaryKey(const SortDescription & description, const std::string & name, const size_t number)
@ -91,25 +37,14 @@ namespace
} }
Block SummingSortedBlockInputStream::readImpl() SummingSortedBlockInputStream::SummingSortedBlockInputStream(
const BlockInputStreams & inputs_,
const SortDescription & description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{ {
if (finished)
return Block();
Block header;
MutableColumns merged_columns;
init(header, merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return {};
/// Additional initialization.
if (current_row.empty())
{
current_row.resize(num_columns); current_row.resize(num_columns);
/// name of nested structure -> the column numbers that refer to it. /// name of nested structure -> the column numbers that refer to it.
@ -193,7 +128,7 @@ Block SummingSortedBlockInputStream::readImpl()
continue; continue;
} }
DataTypes argument_types = {}; DataTypes argument_types;
AggregateDescription desc; AggregateDescription desc;
MapDescription map_desc; MapDescription map_desc;
@ -248,9 +183,78 @@ Block SummingSortedBlockInputStream::readImpl()
maps_to_sum.emplace_back(std::move(map_desc)); maps_to_sum.emplace_back(std::move(map_desc));
} }
} }
}
void SummingSortedBlockInputStream::insertCurrentRowIfNeeded(MutableColumns & merged_columns, bool force_insertion)
{
for (auto & desc : columns_to_aggregate)
{
// Do not insert if the aggregation state hasn't been created
if (desc.created)
{
try
{
desc.function->insertResultInto(desc.state.data(), *desc.merged_column);
/// Update zero status of current row
if (desc.column_numbers.size() == 1)
{
// Flag row as non-empty if at least one column number if non-zero
current_row_is_zero = current_row_is_zero && desc.merged_column->get64(desc.merged_column->size() - 1) == 0;
}
else
{
/// It is sumMap aggregate function.
/// Assume that the row isn't empty in this case (just because it is compatible with previous version)
current_row_is_zero = false;
}
}
catch (...)
{
desc.destroyState();
throw;
}
desc.destroyState();
}
else
desc.merged_column->insertDefault();
} }
// Update aggregation result columns for current block /// If it is "zero" row and it is not the last row of the result block, then
/// rollback the insertion (at this moment we need rollback only cols from columns_to_aggregate)
if (!force_insertion && current_row_is_zero)
{
for (auto & desc : columns_to_aggregate)
desc.merged_column->popBack(1);
return;
}
for (auto i : column_numbers_not_to_aggregate)
merged_columns[i]->insert(current_row[i]);
/// Update per-block and per-group flags
++merged_rows;
output_is_non_empty = true;
}
Block SummingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
MutableColumns merged_columns;
init(merged_columns);
if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
if (merged_columns.empty())
return {};
/// Update aggregation result columns for current block
for (auto & desc : columns_to_aggregate) for (auto & desc : columns_to_aggregate)
{ {
// Wrap aggregated columns in a tuple to match function signature // Wrap aggregated columns in a tuple to match function signature

View File

@ -24,14 +24,12 @@ namespace ErrorCodes
class SummingSortedBlockInputStream : public MergingSortedBlockInputStream class SummingSortedBlockInputStream : public MergingSortedBlockInputStream
{ {
public: public:
SummingSortedBlockInputStream(BlockInputStreams inputs_, SummingSortedBlockInputStream(
const BlockInputStreams & inputs_,
const SortDescription & description_, const SortDescription & description_,
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken. /// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum_, const Names & column_names_to_sum_,
size_t max_block_size_) size_t max_block_size_);
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_), column_names_to_sum(column_names_to_sum_)
{
}
String getName() const override { return "SummingSorted"; } String getName() const override { return "SummingSorted"; }
@ -46,7 +44,6 @@ private:
bool finished = false; bool finished = false;
/// Columns with which values should be summed. /// Columns with which values should be summed.
Names column_names_to_sum; /// If set, it is converted to column_numbers_to_aggregate when initialized.
ColumnNumbers column_numbers_not_to_aggregate; ColumnNumbers column_numbers_not_to_aggregate;
/** A table can have nested tables that are treated in a special way. /** A table can have nested tables that are treated in a special way.

View File

@ -2,6 +2,7 @@
#include <DataStreams/VersionedCollapsingSortedBlockInputStream.h> #include <DataStreams/VersionedCollapsingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
namespace DB namespace DB
{ {
@ -11,6 +12,20 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
} }
VersionedCollapsingSortedBlockInputStream::VersionedCollapsingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
sign_column_number = header.getPositionByName(sign_column_);
}
inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source) inline ALWAYS_INLINE static void writeRowSourcePart(WriteBuffer & buffer, RowSourcePart row_source)
{ {
if constexpr (sizeof(RowSourcePart) == 1) if constexpr (sizeof(RowSourcePart) == 1)
@ -52,12 +67,8 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
if (finished) if (finished)
return {}; return {};
Block header;
MutableColumns merged_columns; MutableColumns merged_columns;
init(merged_columns);
bool is_initialized = !first;
init(header, merged_columns);
if (has_collation) if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::NOT_IMPLEMENTED);
@ -65,11 +76,6 @@ Block VersionedCollapsingSortedBlockInputStream::readImpl()
if (merged_columns.empty()) if (merged_columns.empty())
return {}; return {};
/// Additional initialization.
if (!is_initialized)
sign_column_number = header.getPositionByName(sign_column);
merge(merged_columns, queue); merge(merged_columns, queue);
return header.cloneWithColumns(std::move(merged_columns)); return header.cloneWithColumns(std::move(merged_columns));
} }

View File

@ -6,6 +6,7 @@
#include <deque> #include <deque>
namespace DB namespace DB
{ {
@ -16,6 +17,7 @@ namespace ErrorCodes
static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192; static const size_t MAX_ROWS_IN_MULTIVERSION_QUEUE = 8192;
/* Deque with fixed memory size. Allows pushing gaps. /* Deque with fixed memory size. Allows pushing gaps.
* frontGap() returns the number of gaps were inserted before front. * frontGap() returns the number of gaps were inserted before front.
* *
@ -173,15 +175,9 @@ public:
/// Don't need version column. It's in primary key. /// Don't need version column. It's in primary key.
/// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr). /// max_rows_in_queue should be about max_block_size_ if we won't store a lot of extra blocks (RowRef holds SharedBlockPtr).
VersionedCollapsingSortedBlockInputStream( VersionedCollapsingSortedBlockInputStream(
BlockInputStreams inputs_, const SortDescription & description_, const BlockInputStreams & inputs_, const SortDescription & description_,
const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_, const String & sign_column_, size_t max_block_size_, bool can_collapse_all_rows_,
WriteBuffer * out_row_sources_buf_ = nullptr) WriteBuffer * out_row_sources_buf_ = nullptr);
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_, 0, out_row_sources_buf_)
, sign_column(sign_column_)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 2)
, current_keys(max_rows_in_queue + 1), can_collapse_all_rows(can_collapse_all_rows_)
{
}
String getName() const override { return "VersionedCollapsingSorted"; } String getName() const override { return "VersionedCollapsingSorted"; }
@ -190,8 +186,6 @@ protected:
Block readImpl() override; Block readImpl() override;
private: private:
String sign_column;
size_t sign_column_number = 0; size_t sign_column_number = 0;
Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream"); Logger * log = &Logger::get("VersionedCollapsingSortedBlockInputStream");