WITH FILL version

This commit is contained in:
dmitriiut 2019-04-22 02:04:23 +03:00
parent 3c2922c406
commit 50c322bc6d
7 changed files with 213 additions and 53 deletions

View File

@ -134,7 +134,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctio
list (APPEND dbms_sources src/Dictionaries/DictionaryFactory.cpp src/Dictionaries/DictionarySourceFactory.cpp src/Dictionaries/DictionaryStructure.cpp)
list (APPEND dbms_headers src/Dictionaries/DictionaryFactory.h src/Dictionaries/DictionarySourceFactory.h src/Dictionaries/DictionaryStructure.h)
add_library(clickhouse_common_io ${LINK_MODE} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources} src/Common/SharedBlockRowRef.h)
add_library(clickhouse_common_io ${LINK_MODE} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)

View File

@ -1,9 +1,10 @@
#pragma once
#include <algorithm>
#include "../Core/Block.h"
#include "../../../contrib/boost/boost/smart_ptr/intrusive_ptr.hpp"
#include "../Columns/IColumn.h"
#include <Core/Block.h>
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
@ -97,7 +98,7 @@ struct SharedBlockRowRef
}
static void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns,
static void setSharedBlockRowRef(SharedBlockRowRef & row_ref, SharedBlockPtr & shared_block, ColumnRawPtrs * columns,
size_t row_num)
{
row_ref.row_num = row_num;
@ -108,4 +109,4 @@ struct SharedBlockRowRef
}
}

View File

@ -11,6 +11,17 @@ class Collator;
namespace DB
{
struct FillColumnDescription
{
/// All missed values in range [FROM, TO] will be filled
/// Range [FROM, TO] respects sorting direction
bool has_from = false;
bool has_to = false;
Field fill_from; /// Fill value >= FILL_FROM
Field fill_to; /// Fill value + STEP <= FILL_TO
Field fill_step; /// Default = 1
};
/// Description of the sorting rule by one column.
struct SortColumnDescription
{
@ -20,25 +31,23 @@ struct SortColumnDescription
int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
/// To achieve NULLS LAST, set it equal to direction, to achieve NULLS FIRST, set it opposite.
std::shared_ptr<Collator> collator; /// Collator for locale-specific comparison of strings
bool with_fill; /// If true, all missed values in range [FROM, TO] will be filled
/// Range [FROM, TO] respects sorting direction
double fill_from; /// Fill value >= FROM
double fill_to; /// Fill value + STEP <= TO
double fill_step; /// Default = 1
bool with_fill;
FillColumnDescription fill_description;
SortColumnDescription(
size_t column_number_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
double fill_from_ = 0, double fill_to_ = 0, double fill_step_ = 0)
FillColumnDescription fill_description_ = {})
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_)
, with_fill(with_fill_), fill_from(fill_from_), fill_to(fill_to_), fill_step(fill_step_) {}
, with_fill(with_fill_), fill_description(fill_description_) {}
SortColumnDescription(
const std::string & column_name_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
double fill_from_ = 0, double fill_to_ = 0, double fill_step_ = 0)
FillColumnDescription fill_description_ = {})
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_)
, collator(collator_), with_fill(with_fill_), fill_from(fill_from_), fill_to(fill_to_), fill_step(fill_step_) {}
, collator(collator_), with_fill(with_fill_), fill_description(fill_description_) {}
bool operator == (const SortColumnDescription & other) const
{

View File

@ -1,8 +1,76 @@
#include "FillingBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
namespace detail
{
ColumnRawPtrs getColumnsExcept(SharedBlockPtr & block_ptr, ColumnRawPtrs & except_columns)
{
ColumnRawPtrs res;
res.reserve(block_ptr->columns() - except_columns.size());
for (size_t i = 0; i < block_ptr->columns(); ++i)
{
const IColumn * raw_col = block_ptr->safeGetByPosition(i).column.get();
if (std::find(except_columns.begin(), except_columns.end(), raw_col) != except_columns.end())
res.emplace_back(raw_col);
}
return res;
}
void copyRowFromColumns(ColumnRawPtrs & from, ColumnRawPtrs & to, size_t row_num)
{
for (size_t i = 0; i < from.size(); ++i)
const_cast<IColumn *>(to[i])->insertFrom(*from[i], row_num);
}
void fillRestOfRow(
size_t cols_copied, ColumnRawPtrs & res_fill_columns, ColumnRawPtrs & res_rest_columns,
ColumnRawPtrs & old_rest_columns, UInt64 & next_row_num)
{
/// step_val was inserted, fill all other columns with default values
if (cols_copied < res_fill_columns.size())
{
for (; cols_copied < res_fill_columns.size(); ++cols_copied)
const_cast<IColumn *>(res_fill_columns[cols_copied])->insertDefault();
for (size_t it = 0; it < res_rest_columns.size(); ++it)
const_cast<IColumn *>(res_rest_columns[it])->insertDefault();
return;
}
/// fill row wasn't created, copy rest values from row
detail::copyRowFromColumns(old_rest_columns, res_rest_columns, next_row_num);
++next_row_num;
}
Field sumTwoFields(Field & a, Field & b)
{
switch (a.getType())
{
case Field::Types::Null: return a;
case Field::Types::UInt64: return a.get<UInt64>() + b.get<UInt64>();
case Field::Types::Int64: return a.get<Int64>() + b.get<Int64>();
case Field::Types::Int128: return a.get<Int128>() + b.get<Int128>();
case Field::Types::Float64: return a.get<Float64>() + b.get<Float64>();
default:
throw Exception("WITH FILL can be used only with numeric types", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
}
}
FillingBlockInputStream::FillingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & description_)
: description(description_)
const BlockInputStreamPtr & input, const SortDescription & fill_description_)
: fill_description(fill_description_)
{
children.push_back(input);
}
@ -10,9 +78,92 @@ FillingBlockInputStream::FillingBlockInputStream(
Block FillingBlockInputStream::readImpl()
{
Block res;
Block cur_block;
UInt64 rows = 0;
cur_block = children.back()->read();
if (!cur_block)
return cur_block;
Block res_block = cur_block.cloneEmpty();
rows = cur_block.rows();
SharedBlockPtr old_block_ptr = new detail::SharedBlock(std::move(cur_block));
ColumnRawPtrs old_fill_columns = SharedBlockRowRef::getBlockColumns(*old_block_ptr, fill_description);
ColumnRawPtrs old_rest_columns = detail::getColumnsExcept(old_block_ptr, old_fill_columns);
SharedBlockPtr res_block_ptr = new detail::SharedBlock(std::move(res_block));
ColumnRawPtrs res_fill_columns = SharedBlockRowRef::getBlockColumns(*res_block_ptr, fill_description);
ColumnRawPtrs res_rest_columns = detail::getColumnsExcept(res_block_ptr, res_fill_columns);
/// number of next row in current block
UInt64 next_row_num = 0;
/// read first block
if (!pos)
{
++next_row_num;
/// create row number 0 in result block here
detail::copyRowFromColumns(old_fill_columns, res_fill_columns, 0);
detail::copyRowFromColumns(old_rest_columns, res_rest_columns, 0);
}
pos += rows;
/// current block is not first, need to compare with row in other block
if (!next_row_num)
{
size_t cnt_cols = 0;
size_t fill_columns_size = old_fill_columns.size();
for (; cnt_cols < fill_columns_size; ++cnt_cols)
{
Field step = fill_description[cnt_cols].fill_description.fill_step;
Field prev_val = (*last_row_ref.columns)[cnt_cols][last_row_ref.row_num];
Field step_val = detail::sumTwoFields(prev_val, step);
Field next_val = old_fill_columns[cnt_cols][next_row_num];
if (step_val >= next_val)
const_cast<IColumn *>(res_fill_columns[cnt_cols])->insertFrom(*old_fill_columns[cnt_cols], next_row_num);
else
{
const_cast<IColumn *>(res_fill_columns[cnt_cols])->insert(step_val);
break;
}
}
/// create row number 0 in result block here
detail::fillRestOfRow(cnt_cols, res_fill_columns, res_rest_columns, old_rest_columns, next_row_num);
}
/// number of last added row in result block
UInt64 last_row_num = 0;
while (next_row_num < rows)
{
size_t cnt_cols = 0;
size_t fill_columns_size = old_fill_columns.size();
for (; cnt_cols < fill_columns_size; ++cnt_cols)
{
Field step = fill_description[cnt_cols].fill_description.fill_step;
Field prev_val = res_fill_columns[cnt_cols][last_row_num];
Field step_val = detail::sumTwoFields(prev_val, step);
Field next_val = old_fill_columns[cnt_cols][next_row_num];
if (step_val >= next_val)
const_cast<IColumn *>(res_fill_columns[cnt_cols])->insertFrom(*old_fill_columns[cnt_cols], next_row_num);
else
{
const_cast<IColumn *>(res_fill_columns[cnt_cols])->insert(step_val);
break;
}
}
/// create new row in result block, increment last_row_num
detail::fillRestOfRow(cnt_cols, res_fill_columns, res_rest_columns, old_rest_columns, next_row_num);
++last_row_num;
}
/// finished current block, need to remember last row
SharedBlockRowRef::setSharedBlockRowRef(last_row_ref, res_block_ptr, & res_fill_columns, last_row_num);
return *res_block_ptr;
}
}

View File

@ -11,9 +11,9 @@ namespace DB
class FillingBlockInputStream : public IBlockInputStream
{
public:
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_);
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & fill_description_);
String getName() const override { return "With fill"; }
String getName() const override { return "WithFill"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
@ -21,8 +21,9 @@ protected:
Block readImpl() override;
private:
const SortDescription description;
SharedBlockRowRef last_row_ref;
UInt64 pos = 0; /// total number of read rows
const SortDescription fill_description; /// contains only rows with WITH_FILL
SharedBlockRowRef last_row_ref; /// ref to last written row
};

View File

@ -31,9 +31,10 @@ Block LimitBlockInputStream::readImpl()
/// so we check current block
if (with_ties && ties_row_ref.shared_block)
{
res = children.back()->read();
rows = res.rows();
pos += rows;
res = children.back()->read();
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description);
@ -41,9 +42,9 @@ Block LimitBlockInputStream::readImpl()
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef currentRow;
SharedBlockRowRef::setSharedBlockRowRef(currentRow, ptr, &columns, len);
if (currentRow != ties_row_ref)
SharedBlockRowRef current_row;
SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;

View File

@ -895,37 +895,38 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
return 0;
}
static Float64 getWithFillFloatValue(const ASTPtr & node, const Context & context)
static Field getWithFillFieldValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
if (!isNumber(type))
throw Exception("Illegal type " + type->getName() + " of WITH FILL expression, must be numeric type", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
Field converted = convertFieldToType(field, DataTypeFloat64());
if (converted.isNull())
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of WITH FILL expression is not representable as Float64", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
return converted.safeGet<UInt64>();
return field;
}
static std::tuple<Float64, Float64, Float64> getWithFillParameters(const ASTOrderByElement & node, const Context & context)
static FillColumnDescription getWithFillDescription(const ASTOrderByElement &node, const Context &context)
{
Float64 fill_from = 0;
Float64 fill_to = 0;
Float64 fill_step = 1;
FillColumnDescription descr;
if (node.fill_from)
fill_from = getWithFillFloatValue(node.fill_from, context);
{
descr.has_from = true;
descr.fill_from = getWithFillFieldValue(node.fill_from, context);
}
if (node.fill_to)
fill_to = getWithFillFloatValue(node.fill_to, context);
{
descr.has_to = true;
descr.fill_to = getWithFillFieldValue(node.fill_to, context);
}
if (node.fill_step)
fill_step = getWithFillFloatValue(node.fill_step, context);
descr.fill_step = getWithFillFieldValue(node.fill_step, context);
else
descr.fill_step = 1;
if (!fill_step)
if (descr.fill_step == 0)
throw Exception("STEP value can not be zero", ErrorCodes::FILL_STEP_ZERO_VALUE);
return {fill_from, fill_to, fill_step};
return descr;
}
void InterpreterSelectQuery::executeFetchColumns(
@ -1466,27 +1467,23 @@ SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery
if (order_by_elem.with_fill)
{
auto[fill_from, fill_to, fill_step] = getWithFillParameters(order_by_elem, context);
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
if (order_by_elem.direction == -1)
{
/// if DESC, then STEP < 0, FROM > TO
fill_step = std::min(fill_step, fill_step * -1);
auto from = fill_from;
fill_from = std::max(fill_from, fill_to);
fill_to = std::min(from, fill_to);
if (fill_desc.has_from && fill_desc.has_to && fill_desc.fill_from < fill_desc.fill_to)
std::swap(fill_desc.fill_from, fill_desc.fill_to);
}
else
{
/// if ASC, then STEP > 0, FROM < TO
fill_step = std::max(fill_step, fill_step * -1);
auto from = fill_from;
fill_from = std::min(fill_from, fill_to);
fill_to = std::max(from, fill_to);
if (fill_desc.has_from && fill_desc.has_to && fill_desc.fill_from > fill_desc.fill_to)
std::swap(fill_desc.fill_from, fill_desc.fill_to);
}
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator,
true, fill_from, fill_to, fill_step);
true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
@ -1717,7 +1714,7 @@ void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline)
fill_descr.push_back(desc);
}
if (!fill_descr.size())
if (fill_descr.empty())
return;
pipeline.transform([&](auto & stream)