Added NullSource, ExpressionTransform, FilterTransform, LimitByTransform processors.

This commit is contained in:
Nikolai Kochetov 2019-02-22 20:45:56 +03:00
parent 25f6b97486
commit c8c2e1fd82
12 changed files with 425 additions and 7 deletions

View File

@ -105,6 +105,7 @@ add_headers_and_sources(dbms src/Processors)
add_headers_and_sources(dbms src/Processors/Executors)
add_headers_and_sources(dbms src/Processors/Formats)
add_headers_and_sources(dbms src/Processors/Formats/Impl)
add_headers_and_sources(dbms src/Processors/Transforms)
add_headers_only(dbms src/Server)
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})

View File

@ -4,6 +4,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int POSITION_OUT_OF_BOUND;
}
Chunk::Chunk(DB::Columns columns_, UInt64 num_rows_) : columns(std::move(columns_)), num_rows(num_rows_)
{
checkNumRowsIsConsistent();
@ -100,6 +105,18 @@ void Chunk::clear()
chunk_info.reset();
}
void Chunk::erase(size_t position)
{
if (columns.empty())
throw Exception("Chunk is empty", ErrorCodes::POSITION_OUT_OF_BOUND);
if (position >= columns.size())
throw Exception("Position " + toString(position) + " out of bound in Chunk::erase(), max position = "
+ toString(columns.size() - 1), ErrorCodes::POSITION_OUT_OF_BOUND);
columns.erase(columns.begin() + position);
}
void ChunkMissingValues::setBit(size_t column_idx, size_t row_idx)
{

View File

@ -38,10 +38,13 @@ public:
UInt64 getNumRows() const { return num_rows; }
UInt64 getNumColumns() const { return columns.size(); }
bool empty() const { return num_rows == 0; }
operator bool() const { return !empty() || !columns.empty(); }
bool hasNoRows() const { return num_rows == 0; }
bool hasNoColumns() const { return columns.empty(); }
bool empty() const { return hasNoRows() && hasNoColumns(); }
operator bool() const { return !empty(); }
void clear();
void erase(size_t position);
private:
Columns columns;

View File

@ -4,9 +4,11 @@
namespace DB
{
ISimpleTransform::ISimpleTransform(Block input_header, Block output_header)
: IProcessor({std::move(input_header)}, {std::move(output_header)}),
input(inputs.front()), output(outputs.front())
ISimpleTransform::ISimpleTransform(Block input_header, Block output_header, bool skip_empty_chunks)
: IProcessor({std::move(input_header)}, {std::move(output_header)})
, input(inputs.front())
, output(outputs.front())
, skip_empty_chunks(skip_empty_chunks)
{
}
@ -59,7 +61,13 @@ ISimpleTransform::Status ISimpleTransform::prepare()
void ISimpleTransform::work()
{
transform(current_chunk);
transformed = true;
if (!skip_empty_chunks || current_chunk)
transformed = true;
if (transformed && !current_chunk)
/// Support invariant that chunks must have the same number of columns as header.
current_chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
}
}

View File

@ -18,11 +18,12 @@ protected:
Chunk current_chunk;
bool has_input = false;
bool transformed = false;
const bool skip_empty_chunks;
virtual void transform(Chunk & chunk) = 0;
public:
ISimpleTransform(Block input_header, Block output_header);
ISimpleTransform(Block input_header, Block output_header, bool skip_empty_chunks);
Status prepare() override;
void work() override;

View File

@ -0,0 +1,18 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
class NullSource : public ISource
{
public:
explicit NullSource(Block header) : ISource(std::move(header)) {}
String getName() const override { return "NullSource"; }
protected:
Chunk generate() override { return Chunk(); }
};
}

View File

@ -0,0 +1,28 @@
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
namespace DB
{
static Block transformHeader(Block header, const ExpressionActionsPtr & expression)
{
expression->execute(header, true);
return header;
}
ExpressionTransform::ExpressionTransform(const Block & header, ExpressionActionsPtr expression)
: ISimpleTransform(header, transformHeader(header, expression), false)
, expression(std::move(expression))
{
}
void ExpressionTransform::transform(Chunk & chunk)
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto num_rows = chunk.getNumRows();
expression->execute(block);
chunk.setColumns(block.getColumns(), num_rows);
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Processors/ISimpleTransform.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class ExpressionTransform : public ISimpleTransform
{
public:
ExpressionTransform(const Block & header, ExpressionActionsPtr expression);
String getName() const override { return "ExpressionTransform"; }
protected:
void transform(Chunk & chunk) override;
private:
ExpressionActionsPtr expression;
};
}

View File

@ -0,0 +1,187 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <Columns/ColumnsCommon.h>
namespace DB
{
static Block transformHeader(
Block header,
const ExpressionActionsPtr & expression,
const String & filter_column_name,
bool remove_filter_column)
{
expression->execute(header);
ConstantFilterDescription constant_filter_description;
auto filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);
/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u);
}
if (remove_filter_column)
header.erase(filter_column_name);
return header;
}
FilterTransform::FilterTransform(
const Block & header,
ExpressionActionsPtr expression,
String filter_column_name,
bool remove_filter_column)
: ISimpleTransform(header, transformHeader(header, expression, filter_column_name, remove_filter_column), true)
, expression(std::move(expression))
, filter_column_name(std::move(filter_column_name))
, remove_filter_column(remove_filter_column)
{
auto & transformed_header = getOutputPort().getHeader();
filter_column_position = transformed_header.getPositionByName(filter_column_name);
auto & filter_column = transformed_header.getByPosition(filter_column_position);
constant_filter_description = ConstantFilterDescription(*filter_column.column);
}
IProcessor::Status FilterTransform::prepare()
{
if (constant_filter_description.always_false)
{
input.close();
output.finish();
return Status::Finished;
}
return ISimpleTransform::prepare();
}
void FilterTransform::removeFilterIfNeed(Chunk & chunk)
{
if (chunk && remove_filter_column)
chunk.erase(filter_column_position);
}
void FilterTransform::transform(Chunk & chunk)
{
auto columns = chunk.detachColumns();
size_t num_rows_before_filtration = chunk.getNumRows();
{
Block block = getInputPort().getHeader().cloneWithColumns(columns);
columns.clear();
expression->execute(block);
columns = block.getColumns();
}
if (constant_filter_description.always_true)
{
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
size_t num_columns = columns.size();
ColumnPtr & filter_column = columns[filter_column_position];
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*filter_column);
if (constant_filter_description.always_false)
return; /// Will finish at next prepare call
if (constant_filter_description.always_true)
{
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
FilterDescription filter_and_holder(*filter_column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = num_columns;
for (size_t i = 0; i < num_columns; ++i)
{
if (!columns[i]->isColumnConst())
{
first_non_constant_column = i;
break;
}
}
size_t num_filtered_rows = 0;
if (first_non_constant_column != num_columns)
{
columns[first_non_constant_column] = columns[first_non_constant_column]->filter(*filter_and_holder.data, -1);
num_filtered_rows = columns[first_non_constant_column]->size();
}
else
num_filtered_rows = countBytesInFilter(*filter_and_holder.data);
/// If the current block is completely filtered out, let's move on to the next one.
if (num_filtered_rows == 0)
/// SimpleTransform will skip it.
return;
auto & result_header = getOutputPort().getHeader();
/// If all the rows pass through the filter.
if (num_filtered_rows == num_rows_before_filtration)
{
/// Replace the column with the filter by a constant.
auto & type = result_header.getByPosition(filter_column_position).type;
columns[filter_column_position] = type->createColumnConst(num_filtered_rows, 1u);
/// No need to touch the rest of the columns.
chunk.setColumns(std::move(columns), num_rows_before_filtration);
removeFilterIfNeed(chunk);
return;
}
/// Filter the rest of the columns.
for (size_t i = 0; i < num_columns; ++i)
{
const auto & current_type = result_header.safeGetByPosition(i).type;
auto & current_column = columns[i];
if (i == filter_column_position)
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column = current_type->createColumnConst(num_filtered_rows, 1u);
continue;
}
if (i == first_non_constant_column)
continue;
if (current_column->isColumnConst())
current_column = current_column->cut(0, num_filtered_rows);
else
current_column = current_column->filter(*filter_and_holder.data, num_filtered_rows);
}
chunk.setColumns(std::move(columns), num_filtered_rows);
removeFilterIfNeed(chunk);
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Columns/FilterDescription.h>
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Has one input and one output.
* Simply pull a block from input, transform it, and push it to output.
* If remove_filter_column is true, remove filter column from block.
*/
class FilterTransform : public ISimpleTransform
{
public:
FilterTransform(
const Block & header, ExpressionActionsPtr expression, String filter_column_name, bool remove_filter_column);
Status prepare() override;
protected:
void transform(Chunk & chunk) override;
private:
ExpressionActionsPtr expression;
String filter_column_name;
bool remove_filter_column;
ConstantFilterDescription constant_filter_description;
size_t filter_column_position = 0;
void removeFilterIfNeed(Chunk & chunk);
};
}

View File

@ -0,0 +1,67 @@
#include <Processors/Transforms/LimitByTransform.h>
#include <Common/SipHash.h>
namespace DB
{
LimitByTransform::LimitByTransform(const Block & header, size_t group_size_, const Names & columns)
: ISimpleTransform(header, header, true), group_size(group_size_)
{
key_positions.reserve(columns.size());
for (const auto & name : columns)
{
auto position = header.getPositionByName(name);
auto & column = header.getByPosition(position).column;
/// Ignore all constant columns.
if (!(column && column->isColumnConst()))
key_positions.emplace_back(position);
}
}
void LimitByTransform::transform(Chunk & chunk)
{
size_t num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
IColumn::Filter filter(num_rows);
size_t inserted_count = 0;
for (size_t row = 0; row < num_rows; ++row)
{
UInt128 key(0, 0);
SipHash hash;
for (auto position : key_positions)
columns[position]->updateHashWithValue(row, hash);
hash.get128(key.low, key.high);
if (keys_counts[key]++ < group_size)
{
inserted_count++;
filter[row] = 1;
}
else
filter[row] = 0;
}
/// Just go to the next block if there isn't any new records in the current one.
if (!inserted_count)
/// SimpleTransform will skip it.
return;
if (inserted_count < num_rows)
{
for (auto & column : columns)
if (column->isColumnConst())
column = column->cut(0, inserted_count);
else
column = column->filter(filter, inserted_count);
}
chunk.setColumns(std::move(columns), inserted_count);
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Common/HashTable/HashMap.h>
#include <Common/UInt128.h>
namespace DB
{
class LimitByTransform : public ISimpleTransform
{
public:
LimitByTransform(const Block & header, size_t group_size_, const Names & columns);
String getName() const override { return "LimitByTransform"; }
protected:
void transform(Chunk & chunk) override;
private:
using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>;
MapHashed keys_counts;
std::vector<size_t> key_positions;
const size_t group_size;
};
}