fix progress

This commit is contained in:
dmitriiut 2019-04-21 19:16:25 +03:00
parent 8a7853b0a8
commit e94575e1da
8 changed files with 136 additions and 41 deletions

View File

@ -428,6 +428,7 @@ namespace ErrorCodes
extern const int BAD_TTL_FILE = 451; extern const int BAD_TTL_FILE = 451;
extern const int WITH_TIES_WITHOUT_ORDER_BY = 452; extern const int WITH_TIES_WITHOUT_ORDER_BY = 452;
extern const int INVALID_WITH_FILL_EXPRESSION = 453; extern const int INVALID_WITH_FILL_EXPRESSION = 453;
extern const int FILL_STEP_ZERO_VALUE = 454;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -77,6 +77,33 @@ struct SharedBlockRowRef
bool empty() const { return columns == nullptr; } bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); } size_t size() const { return empty() ? 0 : columns->size(); }
/// gets pointers to all columns of block, which were used for ORDER BY
static ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
static void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns,
size_t row_num)
{
row_ref.row_num = row_num;
row_ref.columns = columns;
row_ref.shared_block = shared_block;
}
}; };

View File

@ -22,8 +22,8 @@ struct SortColumnDescription
std::shared_ptr<Collator> collator; /// Collator for locale-specific comparison of strings std::shared_ptr<Collator> collator; /// Collator for locale-specific comparison of strings
bool with_fill; /// If true, all missed values in range [FROM, TO] will be filled bool with_fill; /// If true, all missed values in range [FROM, TO] will be filled
/// Range [FROM, TO] respects sorting direction /// Range [FROM, TO] respects sorting direction
double fill_from; /// Value >= FROM double fill_from; /// Fill value >= FROM
double fill_to; /// Value + STEP <= TO double fill_to; /// Fill value + STEP <= TO
double fill_step; /// Default = 1 double fill_step; /// Default = 1
SortColumnDescription( SortColumnDescription(

View File

@ -0,0 +1,18 @@
#include "FillingBlockInputStream.h"
FillingBlockInputStream::FillingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & description_)
: description(description_)
{
children.push_back(input);
}
Block FillingBlockInputStream::readImpl()
{
Block res;
UInt64 rows = 0;
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
namespace DB
{
/** Implements the WITH FILL part of ORDER BY operation.
*/
class FillingBlockInputStream : public IBlockInputStream
{
public:
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & description_);
String getName() const override { return "With fill"; }
Block getHeader() const override { return children.at(0)->getHeader(); }
protected:
Block readImpl() override;
private:
const SortDescription description;
SharedBlockRowRef last_row_ref;
};
}

View File

@ -6,39 +6,6 @@
namespace DB namespace DB
{ {
namespace detail
{
/// gets pointers to all columns of block, which were used for ORDER BY
ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
void setSharedBlockRowRef(SharedBlockRowRef &row_ref, SharedBlockPtr shared_block, ColumnRawPtrs *columns,
size_t row_num)
{
row_ref.row_num = row_num;
row_ref.columns = columns;
row_ref.shared_block = shared_block;
}
}
LimitBlockInputStream::LimitBlockInputStream( LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
@ -69,13 +36,13 @@ Block LimitBlockInputStream::readImpl()
pos += rows; pos += rows;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res)); SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ColumnRawPtrs columns = getBlockColumns(*ptr, description); ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description);
UInt64 len; UInt64 len;
for (len = 0; len < rows; ++len) for (len = 0; len < rows; ++len)
{ {
SharedBlockRowRef currentRow; SharedBlockRowRef currentRow;
setSharedBlockRowRef(currentRow, ptr, &columns, len); SharedBlockRowRef::setSharedBlockRowRef(currentRow, ptr, &columns, len);
if (currentRow != ties_row_ref) if (currentRow != ties_row_ref)
{ {
ties_row_ref.reset(); ties_row_ref.reset();
@ -135,13 +102,13 @@ Block LimitBlockInputStream::readImpl()
/// check if other rows in current block equals to last one in limit /// check if other rows in current block equals to last one in limit
if (with_ties) if (with_ties)
{ {
ColumnRawPtrs columns = getBlockColumns(*ptr, description); ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description);
setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1); SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1);
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i) for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
{ {
SharedBlockRowRef current_row; SharedBlockRowRef current_row;
setSharedBlockRowRef(current_row, ptr, &columns, i); SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, i);
if (current_row == ties_row_ref) if (current_row == ties_row_ref)
++length; ++length;
else else

View File

@ -22,6 +22,7 @@
#include <DataStreams/CubeBlockInputStream.h> #include <DataStreams/CubeBlockInputStream.h>
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h> #include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h> #include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
@ -75,6 +76,7 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND; extern const int ARGUMENT_OUT_OF_BOUND;
extern const int INVALID_LIMIT_EXPRESSION; extern const int INVALID_LIMIT_EXPRESSION;
extern const int INVALID_WITH_FILL_EXPRESSION; extern const int INVALID_WITH_FILL_EXPRESSION;
extern const int FILL_STEP_ZERO_VALUE;
} }
namespace namespace
@ -715,6 +717,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
executeLimitBy(pipeline); executeLimitBy(pipeline);
} }
executeWithFill(pipeline);
if (query.limitLength()) if (query.limitLength())
executePreLimit(pipeline); executePreLimit(pipeline);
} }
@ -840,6 +844,8 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
*/ */
executeExtremes(pipeline); executeExtremes(pipeline);
executeWithFill(pipeline);
executeLimit(pipeline); executeLimit(pipeline);
} }
} }
@ -915,6 +921,10 @@ static std::tuple<Float64, Float64, Float64> getWithFillParameters(const ASTOrde
fill_to = getWithFillFloatValue(node.fill_to, context); fill_to = getWithFillFloatValue(node.fill_to, context);
if (node.fill_step) if (node.fill_step)
fill_step = getWithFillFloatValue(node.fill_step, context); fill_step = getWithFillFloatValue(node.fill_step, context);
if (!fill_step)
throw Exception("STEP value can not be zero", ErrorCodes::FILL_STEP_ZERO_VALUE);
return {fill_from, fill_to, fill_step}; return {fill_from, fill_to, fill_step};
} }
@ -1457,6 +1467,24 @@ SortDescription InterpreterSelectQuery::getSortDescription(const ASTSelectQuery
if (order_by_elem.with_fill) if (order_by_elem.with_fill)
{ {
auto[fill_from, fill_to, fill_step] = getWithFillParameters(order_by_elem, context); auto[fill_from, fill_to, fill_step] = getWithFillParameters(order_by_elem, context);
if (order_by_elem.direction == -1)
{
/// if DESC, then STEP < 0, FROM > TO
fill_step = std::min(fill_step, fill_step * -1);
auto from = fill_from;
fill_from = std::max(fill_from, fill_to);
fill_to = std::min(from, fill_to);
}
else
{
/// if ASC, then STEP > 0, FROM < TO
fill_step = std::max(fill_step, fill_step * -1);
auto from = fill_from;
fill_from = std::min(fill_from, fill_to);
fill_to = std::max(from, fill_to);
}
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator,
true, fill_from, fill_to, fill_step); true, fill_from, fill_to, fill_step);
} }
@ -1589,7 +1617,7 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context); auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream) pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
{ {
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false); stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false, query.limit_with_ties, getSortDescription(query));
}); });
} }
} }
@ -1675,6 +1703,30 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
} }
void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline)
{
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query);
SortDescription fill_descr;
for (auto & desc : order_descr)
{
if (desc.with_fill)
fill_descr.push_back(desc);
}
if (!fill_descr.size())
return;
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FillingBlockInputStream>(stream, fill_descr);
});
}
}
void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline) void InterpreterSelectQuery::executeExtremes(Pipeline & pipeline)
{ {
if (!context.getSettingsRef().extremes) if (!context.getSettingsRef().extremes)

View File

@ -179,6 +179,7 @@ private:
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression); void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline); void executeOrder(Pipeline & pipeline);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline); void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline); void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline); void executeUnion(Pipeline & pipeline);