Merge pull request #6610 from CurtizJ/merging_with_ties_and_with_fill

WITH TIES modifier for LIMIT and WITH FILL modifier for ORDER BY. (continuation of #5069)
This commit is contained in:
Olga Khvostikova 2019-09-05 22:45:48 +03:00 committed by GitHub
commit f90642ce23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1951 additions and 171 deletions

View File

@ -449,6 +449,8 @@ namespace ErrorCodes
extern const int READONLY_SETTING = 472;
extern const int DEADLOCK_AVOIDED = 473;
extern const int INVALID_TEMPLATE_FORMAT = 474;
extern const int INVALID_WITH_FILL_EXPRESSION = 475;
extern const int WITH_TIES_WITHOUT_ORDER_BY = 476;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,89 @@
#pragma once
#include <algorithm>
#include <Core/Block.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
struct SharedBlockRowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
void swap(SharedBlockRowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedBlockRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const SharedBlockRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedBlockRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
void set(SharedBlockPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
}
};
}

View File

@ -4,13 +4,22 @@
#include <memory>
#include <cstddef>
#include <string>
#include <Core/Field.h>
class Collator;
namespace DB
{
struct FillColumnDescription
{
/// All missed values in range [FROM, TO) will be filled
/// Range [FROM, TO) respects sorting direction
Field fill_from; /// Fill value >= FILL_FROM
Field fill_to; /// Fill value + STEP < FILL_TO
Field fill_step; /// Default = 1 or -1 according to direction
};
/// Description of the sorting rule by one column.
struct SortColumnDescription
{
@ -20,12 +29,23 @@ struct SortColumnDescription
int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
/// To achieve NULLS LAST, set it equal to direction, to achieve NULLS FIRST, set it opposite.
std::shared_ptr<Collator> collator; /// Collator for locale-specific comparison of strings
bool with_fill;
FillColumnDescription fill_description;
SortColumnDescription(size_t column_number_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
SortColumnDescription(const std::string & column_name_, int direction_, int nulls_direction_, const std::shared_ptr<Collator> & collator_ = nullptr)
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_), collator(collator_) {}
SortColumnDescription(
size_t column_number_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
const FillColumnDescription & fill_description_ = {})
: column_number(column_number_), direction(direction_), nulls_direction(nulls_direction_), collator(collator_)
, with_fill(with_fill_), fill_description(fill_description_) {}
SortColumnDescription(
const std::string & column_name_, int direction_, int nulls_direction_,
const std::shared_ptr<Collator> & collator_ = nullptr, bool with_fill_ = false,
const FillColumnDescription & fill_description_ = {})
: column_name(column_name_), column_number(0), direction(direction_), nulls_direction(nulls_direction_)
, collator(collator_), with_fill(with_fill_), fill_description(fill_description_) {}
bool operator == (const SortColumnDescription & other) const
{

View File

@ -50,8 +50,8 @@ private:
std::vector<ColumnAggregateFunction *> columns_to_aggregate;
std::vector<SimpleAggregateDescription> columns_to_simple_aggregate;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.

View File

@ -47,12 +47,12 @@ private:
/// Read is finished.
bool finished = false;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
RowRef first_negative; /// The first negative row for the current primary key.
RowRef last_positive; /// The last positive row for the current primary key.
RowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
SharedBlockRowRef first_negative; /// The first negative row for the current primary key.
SharedBlockRowRef last_positive; /// The last positive row for the current primary key.
SharedBlockRowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.

View File

@ -0,0 +1,186 @@
#include <DataStreams/FillingBlockInputStream.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
FillingBlockInputStream::FillingBlockInputStream(
const BlockInputStreamPtr & input, const SortDescription & sort_description_)
: sort_description(sort_description_), filling_row(sort_description_), next_row(sort_description_)
{
children.push_back(input);
header = children.at(0)->getHeader();
std::vector<bool> is_fill_column(header.columns());
for (const auto & elem : sort_description)
is_fill_column[header.getPositionByName(elem.column_name)] = true;
auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
{
auto max_type = Field::Types::Null;
WhichDataType which(type);
DataTypePtr to_type;
if (isInteger(type) || which.isDateOrDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();
}
else if (which.isFloat())
{
max_type = Field::Types::Float64;
to_type = std::make_shared<DataTypeFloat64>();
}
if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
|| descr.fill_step.getType() > max_type)
return false;
descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
return true;
};
for (size_t i = 0; i < header.columns(); ++i)
{
if (is_fill_column[i])
{
size_t pos = fill_column_positions.size();
auto & descr = filling_row.getFillDescription(pos);
auto type = header.getByPosition(i).type;
if (!try_convert_fields(descr, type))
throw Exception("Incompatible types of WITH FILL expression values with column type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (type->isValueRepresentedByUnsignedInteger() &&
((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
(!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
{
throw Exception("WITH FILL bound values cannot be negative for unsigned type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
fill_column_positions.push_back(i);
}
else
other_column_positions.push_back(i);
}
}
Block FillingBlockInputStream::readImpl()
{
Columns old_fill_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_other_columns;
auto init_columns_by_positions = [](const Block & block, Columns & columns,
MutableColumns & mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
auto column = block.getByPosition(pos).column;
columns.push_back(column);
mutable_columns.push_back(column->cloneEmpty()->assumeMutable());
}
};
auto block = children.back()->read();
if (!block)
{
init_columns_by_positions(header, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(header, old_other_columns, res_other_columns, other_column_positions);
bool should_insert_first = next_row < filling_row;
bool generated = false;
for (size_t i = 0; i < filling_row.size(); ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
while (filling_row.next(next_row))
{
generated = true;
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
}
if (generated)
return createResultBlock(res_fill_columns, res_other_columns);
return block;
}
size_t rows = block.rows();
init_columns_by_positions(block, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(block, old_other_columns, res_other_columns, other_column_positions);
if (first)
{
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
break;
}
filling_row[i] = current_value;
}
first = false;
}
for (size_t row_ind = 0; row_ind < rows; ++row_ind)
{
bool should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
/// Insert generated filling row to block, while it is less than current row in block.
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
return createResultBlock(res_fill_columns, res_other_columns);
}
Block FillingBlockInputStream::createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(header.columns());
for (size_t i = 0; i < fill_columns.size(); ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0; i < other_columns.size(); ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
return header.cloneWithColumns(std::move(result_columns));
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/FillingRow.h>
namespace DB
{
/** Implements modifier WITH FILL of ORDER BY clause.
* It fills gaps in data stream by rows with missing values in columns with set WITH FILL and deafults in other columns.
* Optionally FROM, TO and STEP values can be specified.
*/
class FillingBlockInputStream : public IBlockInputStream
{
public:
FillingBlockInputStream(const BlockInputStreamPtr & input, const SortDescription & fill_description_);
String getName() const override { return "Filling"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block createResultBlock(MutableColumns & fill_columns, MutableColumns & other_columns) const;
const SortDescription sort_description; /// Contains only rows with WITH FILL.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
Block header;
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions other_column_positions;
bool first = true;
};
}

View File

@ -321,7 +321,7 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)

View File

@ -204,7 +204,7 @@ private:
StringRef current_group_path;
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
SharedBlockRowRef current_subgroup_newest_row;
/// Time of last read row
time_t current_time = 0;
@ -236,7 +236,7 @@ private:
void finishCurrentGroup(MutableColumns & merged_columns);
/// Update the state of the aggregate function with the new `value`.
void accumulateRow(RowRef & row);
void accumulateRow(SharedBlockRowRef & row);
};
}

View File

@ -6,8 +6,30 @@
namespace DB
{
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, bool use_limit_as_total_rows_approx)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
/// gets pointers to all columns of block, which were used for ORDER BY
static ColumnRawPtrs extractSortColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
{
if (use_limit_as_total_rows_approx)
{
@ -17,13 +39,45 @@ LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input,
children.push_back(input);
}
Block LimitBlockInputStream::readImpl()
{
Block res;
UInt64 rows = 0;
/// pos - how many rows were read, including the last read block
/// pos >= offset + limit and all rows in the end of previous block were equal
/// to row at 'limit' position. So we check current block.
if (!ties_row_ref.empty() && pos >= offset + limit)
{
res = children.back()->read();
rows = res.rows();
if (!res)
return res;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ptr->sort_columns = extractSortColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
if (len < rows)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
if (pos >= offset + limit)
{
@ -46,9 +100,18 @@ Block LimitBlockInputStream::readImpl()
pos += rows;
} while (pos <= offset);
/// return the whole block
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
if (with_ties)
ptr->sort_columns = extractSortColumns(*ptr, description);
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;
{
/// Save rowref for last row, because probalbly next block begins with the same row.
if (with_ties && pos == offset + limit)
ties_row_ref.set(ptr, &ptr->sort_columns, rows - 1);
return *ptr;
}
/// give away a piece of the block
UInt64 start = std::max(
@ -60,13 +123,36 @@ Block LimitBlockInputStream::readImpl()
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
for (size_t i = 0; i < res.columns(); ++i)
res.getByPosition(i).column = res.getByPosition(i).column->cut(start, length);
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(ptr, &ptr->sort_columns, start + length - 1);
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
{
SharedBlockRowRef current_row;
current_row.set(ptr, &ptr->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == rows)
return *ptr;
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return res;
return *ptr;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
namespace DB
@ -17,8 +18,13 @@ public:
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
* with_ties = true, when query has WITH TIES modifier. If so, description should be provided
* description lets us know which row we should check for equality
*/
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false);
LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_,
bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false,
bool with_ties_ = false, const SortDescription & description_ = {});
String getName() const override { return "Limit"; }
@ -32,6 +38,9 @@ private:
UInt64 offset;
UInt64 pos = 0;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
SharedBlockRowRef ties_row_ref;
};
}

View File

@ -5,6 +5,7 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <common/logger_useful.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/Row.h>
#include <Core/SortDescription.h>
@ -24,39 +25,6 @@ namespace ErrorCodes
}
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IBlockInputStream
@ -78,44 +46,6 @@ public:
Block getHeader() const override { return header; }
protected:
struct RowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num = 0;
SharedBlockPtr shared_block;
void swap(RowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const RowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const RowRef & other) const
{
return !(*this == other);
}
void reset()
{
RowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
/// Simple class, which allows to check stop condition during merge process
/// in simple case it just compare amount of merged rows with max_block_size
/// in `count_average` case it compares amount of merged rows with linear combination
@ -148,7 +78,6 @@ protected:
}
};
Block readImpl() override;
void readSuffixImpl() override;
@ -230,7 +159,7 @@ protected:
}
template <typename TSortCursor>
void setRowRef(RowRef & row_ref, TSortCursor & cursor)
void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
@ -238,7 +167,7 @@ protected:
}
template <typename TSortCursor>
void setPrimaryKeyRef(RowRef & row_ref, TSortCursor & cursor)
void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];

View File

@ -41,11 +41,11 @@ private:
bool finished = false;
/// Primary key of current row.
RowRef current_key;
SharedBlockRowRef current_key;
/// Primary key of next row.
RowRef next_key;
SharedBlockRowRef next_key;
/// Last row with maximum version for current primary key.
RowRef selected_row;
SharedBlockRowRef selected_row;
/// The position (into current_row_sources) of the row with the highest version.
size_t max_pos = 0;

View File

@ -129,8 +129,8 @@ private:
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<MapDescription> maps_to_sum;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.

View File

@ -47,7 +47,7 @@ void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
}
}
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
@ -111,7 +111,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
SortCursor current = queue.top();
size_t current_block_granularity = current->rows;
RowRef next_key;
SharedBlockRowRef next_key;
Int8 sign = assert_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];

View File

@ -197,7 +197,7 @@ private:
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
FixedSizeDequeWithGaps<SharedBlockRowRef> current_keys;
size_t blocks_written = 0;
@ -207,7 +207,7 @@ private:
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};

View File

@ -0,0 +1,127 @@
#include <Interpreters/FillingRow.h>
namespace DB
{
bool less(const Field & lhs, const Field & rhs, int direction)
{
if (direction == -1)
return applyVisitor(FieldVisitorAccurateLess(), rhs, lhs);
return applyVisitor(FieldVisitorAccurateLess(), lhs, rhs);
}
bool equals(const Field & lhs, const Field & rhs)
{
return applyVisitor(FieldVisitorAccurateEquals(), lhs, rhs);
}
FillingRow::FillingRow(const SortDescription & description_) : description(description_)
{
row.resize(description.size());
}
bool FillingRow::operator<(const FillingRow & other) const
{
for (size_t i = 0; i < size(); ++i)
{
if (row[i].isNull() || other[i].isNull() || equals(row[i], other[i]))
continue;
return less(row[i], other[i], getDirection(i));
}
return false;
}
bool FillingRow::operator==(const FillingRow & other) const
{
for (size_t i = 0; i < size(); ++i)
if (!equals(row[i], other[i]))
return false;
return true;
}
bool FillingRow::next(const FillingRow & to_row)
{
size_t pos = 0;
/// Find position we need to increment for generating next row.
for (; pos < row.size(); ++pos)
if (!row[pos].isNull() && !to_row[pos].isNull() && !equals(row[pos], to_row[pos]))
break;
if (pos == row.size() || less(to_row[pos], row[pos], getDirection(pos)))
return false;
/// If we have any 'fill_to' value at position greater than 'pos',
/// we need to generate rows up to 'fill_to' value.
for (size_t i = row.size() - 1; i > pos; --i)
{
if (getFillDescription(i).fill_to.isNull() || row[i].isNull())
continue;
auto next_value = row[i];
applyVisitor(FieldVisitorSum(getFillDescription(i).fill_step), next_value);
if (less(next_value, getFillDescription(i).fill_to, getDirection(i)))
{
row[i] = next_value;
initFromDefaults(i + 1);
return true;
}
}
auto next_value = row[pos];
applyVisitor(FieldVisitorSum(getFillDescription(pos).fill_step), next_value);
if (less(to_row[pos], next_value, getDirection(pos)))
return false;
row[pos] = next_value;
if (equals(row[pos], to_row[pos]))
{
bool is_less = false;
for (size_t i = pos + 1; i < size(); ++i)
{
const auto & fill_from = getFillDescription(i).fill_from;
if (!fill_from.isNull())
row[i] = fill_from;
else
row[i] = to_row[i];
is_less |= less(row[i], to_row[i], getDirection(i));
}
return is_less;
}
initFromDefaults(pos + 1);
return true;
}
void FillingRow::initFromDefaults(size_t from_pos)
{
for (size_t i = from_pos; i < row.size(); ++i)
row[i] = getFillDescription(i).fill_from;
}
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & other_columns, const FillingRow & filling_row)
{
for (size_t i = 0; i < filling_columns.size(); ++i)
{
if (filling_row[i].isNull())
filling_columns[i]->insertDefault();
else
filling_columns[i]->insert(filling_row[i]);
}
for (size_t i = 0; i < other_columns.size(); ++i)
other_columns[i]->insertDefault();
}
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num)
{
for (size_t i = 0; i < source.size(); ++i)
dest[i]->insertFrom(*source[i], row_num);
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <Core/SortDescription.h>
#include <Columns/IColumn.h>
#include <Common/FieldVisitors.h>
namespace DB
{
/// Compares fields in terms of sorting order, considering direction.
bool less(const Field & lhs, const Field & rhs, int direction);
bool equals(const Field & lhs, const Field & rhs);
/** Helps to implement modifier WITH FILL for ORDER BY clause.
* Stores row as array of fields and provides functions to generate next row for filling gaps and for comparing rows.
* Used in FillingBlockInputStream and in FillingTransform.
*/
class FillingRow
{
public:
FillingRow(const SortDescription & sort_description);
/// Generates next row according to fill 'from', 'to' and 'step' values.
bool next(const FillingRow & to_row);
void initFromDefaults(size_t from_pos = 0);
Field & operator[](size_t ind) { return row[ind]; }
const Field & operator[](size_t ind) const { return row[ind]; }
size_t size() const { return row.size(); }
bool operator<(const FillingRow & other) const;
bool operator==(const FillingRow & other) const;
int getDirection(size_t ind) const { return description[ind].direction; }
FillColumnDescription & getFillDescription(size_t ind) { return description[ind].fill_description; }
private:
std::vector<Field> row;
SortDescription description;
};
void insertFromFillingRow(MutableColumns & filling_columns, MutableColumns & other_columns, const FillingRow & filling_row);
void copyRowFromColumns(MutableColumns & dest, const Columns & source, size_t row_num);
}

View File

@ -24,6 +24,7 @@
#include <DataStreams/ConvertColumnLowCardinalityToFullBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -57,6 +58,7 @@
#include <Core/Field.h>
#include <Core/Types.h>
#include <Columns/Collator.h>
#include <Common/FieldVisitors.h>
#include <Common/typeid_cast.h>
#include <Common/checkStackSize.h>
#include <Parsers/queryToString.h>
@ -81,6 +83,7 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/FillingTransform.h>
#include <Processors/LimitTransform.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
@ -103,6 +106,7 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int INVALID_LIMIT_EXPRESSION;
extern const int INVALID_WITH_FILL_EXPRESSION;
}
namespace
@ -681,8 +685,62 @@ InterpreterSelectQuery::analyzeExpressions(
return res;
}
static Field getWithFillFieldValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
static SortDescription getSortDescription(const ASTSelectQuery & query)
if (!isColumnedAsNumber(type))
throw Exception("Illegal type " + type->getName() + " of WITH FILL expression, must be numeric type", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
return field;
}
static FillColumnDescription getWithFillDescription(const ASTOrderByElement & order_by_elem, const Context & context)
{
FillColumnDescription descr;
if (order_by_elem.fill_from)
descr.fill_from = getWithFillFieldValue(order_by_elem.fill_from, context);
if (order_by_elem.fill_to)
descr.fill_to = getWithFillFieldValue(order_by_elem.fill_to, context);
if (order_by_elem.fill_step)
descr.fill_step = getWithFillFieldValue(order_by_elem.fill_step, context);
else
descr.fill_step = order_by_elem.direction;
if (applyVisitor(FieldVisitorAccurateEquals(), descr.fill_step, Field{0}))
throw Exception("WITH FILL STEP value cannot be zero", ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (order_by_elem.direction == 1)
{
if (applyVisitor(FieldVisitorAccurateLess(), descr.fill_step, Field{0}))
throw Exception("WITH FILL STEP value cannot be negative for sorting in ascending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
applyVisitor(FieldVisitorAccurateLess(), descr.fill_to, descr.fill_from))
{
throw Exception("WITH FILL TO value cannot be less than FROM value for sorting in ascending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
}
else
{
if (applyVisitor(FieldVisitorAccurateLess(), Field{0}, descr.fill_step))
throw Exception("WITH FILL STEP value cannot be positive for sorting in descending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (!descr.fill_from.isNull() && !descr.fill_to.isNull() &&
applyVisitor(FieldVisitorAccurateLess(), descr.fill_from, descr.fill_to))
{
throw Exception("WITH FILL FROM value cannot be less than TO value for sorting in descending direction",
ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
}
return descr;
}
static SortDescription getSortDescription(const ASTSelectQuery & query, const Context & context)
{
SortDescription order_descr;
order_descr.reserve(query.orderBy()->children.size());
@ -695,13 +753,19 @@ static SortDescription getSortDescription(const ASTSelectQuery & query)
if (order_by_elem.collation)
collator = std::make_shared<Collator>(order_by_elem.collation->as<ASTLiteral &>().value.get<String>());
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
order_descr.emplace_back(name, order_by_elem.direction,
order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
}
return order_descr;
}
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context)
{
const auto & [field, type] = evaluateConstantExpression(node, context);
@ -736,7 +800,7 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
if (!query.distinct && !query.limitBy())
if (!query.distinct && !query.limitBy() && !query.limit_with_ties)
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
return limit_length + limit_offset;
@ -751,7 +815,7 @@ static SortingInfoPtr optimizeReadInOrder(const MergeTreeData & merge_tree, cons
if (!merge_tree.hasSortingKey())
return {};
auto order_descr = getSortDescription(query);
auto order_descr = getSortDescription(query, context);
SortDescription prefix_order_descr;
int read_direction = order_descr.at(0).direction;
@ -1173,7 +1237,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of rows in each up to `offset + limit`.
*/
if (query.limitLength() && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
{
executePreLimit(pipeline);
}
@ -1206,6 +1270,8 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeLimitBy(pipeline);
}
executeWithFill(pipeline);
/** We must do projection after DISTINCT because projection may remove some columns.
*/
executeProjection(pipeline, expressions.final_projection);
@ -1222,7 +1288,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeSubqueriesInSetsAndJoins(pipeline, expressions.subqueries_for_sets);
}
template <typename TPipeline>
void InterpreterSelectQuery::executeFetchColumns(
QueryProcessingStage::Enum processing_stage, TPipeline & pipeline,
@ -1420,11 +1485,12 @@ void InterpreterSelectQuery::executeFetchColumns(
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY, WITH TIES but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
if (!query.distinct
&& !query.limit_with_ties
&& !query.prewhere()
&& !query.where()
&& !query.groupBy()
@ -2007,7 +2073,7 @@ void InterpreterSelectQuery::executeExpression(QueryPipeline & pipeline, const E
void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
const Settings & settings = context.getSettingsRef();
UInt64 limit = getLimitForSorting(query, context);
@ -2079,7 +2145,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoPtr sorting_info)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2160,7 +2226,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2187,7 +2253,7 @@ void InterpreterSelectQuery::executeMergeSorted(Pipeline & pipeline)
void InterpreterSelectQuery::executeMergeSorted(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
SortDescription order_descr = getSortDescription(query);
SortDescription order_descr = getSortDescription(query, context);
UInt64 limit = getLimitForSorting(query, context);
const Settings & settings = context.getSettingsRef();
@ -2234,7 +2300,7 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or
UInt64 limit_for_distinct = 0;
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
if (!query.orderBy() || !before_order)
if ((!query.orderBy() || !before_order) && !query.limit_with_ties)
limit_for_distinct = limit_length + limit_offset;
pipeline.transform([&](auto & stream)
@ -2303,9 +2369,16 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
if (query.limitLength())
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
SortDescription sort_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
sort_descr = getSortDescription(query, context);
}
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false);
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false, false, query.limit_with_ties, sort_descr);
});
}
}
@ -2417,17 +2490,73 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
always_read_till_end = true;
SortDescription order_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
order_descr = getSortDescription(query, context);
}
UInt64 limit_length;
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end);
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end, false, query.limit_with_ties, order_descr);
});
}
}
void InterpreterSelectQuery::executeWithFill(Pipeline & pipeline)
{
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
{
if (desc.with_fill)
fill_descr.push_back(desc);
}
if (fill_descr.empty())
return;
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<FillingBlockInputStream>(stream, fill_descr);
});
}
}
void InterpreterSelectQuery::executeWithFill(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
if (query.orderBy())
{
SortDescription order_descr = getSortDescription(query, context);
SortDescription fill_descr;
for (auto & desc : order_descr)
{
if (desc.with_fill)
fill_descr.push_back(desc);
}
if (fill_descr.empty())
return;
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FillingTransform>(header, fill_descr);
});
}
}
void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
{
auto & query = getSelectQuery();
@ -2455,13 +2584,21 @@ void InterpreterSelectQuery::executeLimit(QueryPipeline & pipeline)
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
SortDescription order_descr;
if (query.limit_with_ties)
{
if (!query.orderBy())
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
order_descr = getSortDescription(query, context);
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
return std::make_shared<LimitTransform>(
header, limit_length, limit_offset, always_read_till_end);
header, limit_length, limit_offset, always_read_till_end, query.limit_with_ties, order_descr);
});
}
}

View File

@ -204,6 +204,7 @@ private:
void executeHaving(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(Pipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(Pipeline & pipeline, SortingInfoPtr sorting_info);
void executeWithFill(Pipeline & pipeline);
void executeMergeSorted(Pipeline & pipeline);
void executePreLimit(Pipeline & pipeline);
void executeUnion(Pipeline & pipeline, Block header); /// If header is not empty, convert streams structure to it.
@ -221,6 +222,7 @@ private:
void executeHaving(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeExpression(QueryPipeline & pipeline, const ExpressionActionsPtr & expression);
void executeOrder(QueryPipeline & pipeline, SortingInfoPtr sorting_info);
void executeWithFill(QueryPipeline & pipeline);
void executeMergeSorted(QueryPipeline & pipeline);
void executePreLimit(QueryPipeline & pipeline);
void executeLimitBy(QueryPipeline & pipeline);

View File

@ -25,6 +25,26 @@ void ASTOrderByElement::formatImpl(const FormatSettings & settings, FormatState
settings.ostr << (settings.hilite ? hilite_keyword : "") << " COLLATE " << (settings.hilite ? hilite_none : "");
collation->formatImpl(settings, state, frame);
}
if (with_fill)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH FILL " << (settings.hilite ? hilite_none : "");
if (fill_from)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "");
fill_from->formatImpl(settings, state, frame);
}
if (fill_to)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "");
fill_to->formatImpl(settings, state, frame);
}
if (fill_step)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " STEP " << (settings.hilite ? hilite_none : "");
fill_step->formatImpl(settings, state, frame);
}
}
}
}

View File

@ -18,12 +18,22 @@ public:
/** Collation for locale-specific string comparison. If empty, then sorting done by bytes. */
ASTPtr collation;
bool with_fill;
ASTPtr fill_from;
ASTPtr fill_to;
ASTPtr fill_step;
ASTOrderByElement(
const int direction_, const int nulls_direction_, const bool nulls_direction_was_explicitly_specified_, ASTPtr & collation_)
const int direction_, const int nulls_direction_, const bool nulls_direction_was_explicitly_specified_,
ASTPtr & collation_, const bool with_fill_, ASTPtr & fill_from_, ASTPtr & fill_to_, ASTPtr & fill_step_)
: direction(direction_)
, nulls_direction(nulls_direction_)
, nulls_direction_was_explicitly_specified(nulls_direction_was_explicitly_specified_)
, collation(collation_)
, with_fill(with_fill_)
, fill_from(fill_from_)
, fill_to(fill_to_)
, fill_step(fill_step_)
{
}

View File

@ -148,6 +148,8 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
s.ostr << ", ";
}
limitLength()->formatImpl(s, state, frame);
if (limit_with_ties)
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << " WITH TIES" << (s.hilite ? hilite_none : "");
}
if (settings())

View File

@ -42,6 +42,7 @@ public:
bool group_by_with_totals = false;
bool group_by_with_rollup = false;
bool group_by_with_cube = false;
bool limit_with_ties = false;
ASTPtr & refSelect() { return getExpression(Expression::SELECT); }
ASTPtr & refTables() { return getExpression(Expression::TABLES); }

View File

@ -1360,7 +1360,12 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword first("FIRST");
ParserKeyword last("LAST");
ParserKeyword collate("COLLATE");
ParserKeyword with_fill("WITH FILL");
ParserKeyword from("FROM");
ParserKeyword to("TO");
ParserKeyword step("STEP");
ParserStringLiteral collate_locale_parser;
ParserExpressionWithOptionalAlias exp_parser(false);
ASTPtr expr_elem;
if (!elem_p.parse(pos, expr_elem, expected))
@ -1395,7 +1400,27 @@ bool ParserOrderByElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
return false;
}
node = std::make_shared<ASTOrderByElement>(direction, nulls_direction, nulls_direction_was_explicitly_specified, locale_node);
/// WITH FILL [FROM x] [TO y] [STEP z]
bool has_with_fill = false;
ASTPtr fill_from;
ASTPtr fill_to;
ASTPtr fill_step;
if (with_fill.ignore(pos))
{
has_with_fill = true;
if (from.ignore(pos) && !exp_parser.parse(pos, fill_from, expected))
return false;
if (to.ignore(pos) && !exp_parser.parse(pos, fill_to, expected))
return false;
if (step.ignore(pos) && !exp_parser.parse(pos, fill_step, expected))
return false;
}
node = std::make_shared<ASTOrderByElement>(
direction, nulls_direction, nulls_direction_was_explicitly_specified, locale_node,
has_with_fill, fill_from, fill_to, fill_step);
node->children.push_back(expr_elem);
if (locale_node)
node->children.push_back(locale_node);

View File

@ -289,6 +289,7 @@ protected:
/** Element of ORDER BY expression - same as expression element, but in addition, ASC[ENDING] | DESC[ENDING] could be specified
* and optionally, NULLS LAST|FIRST
* and optionally, COLLATE 'locale'.
* and optionally, WITH FILL [FROM x] [TO y] [STEP z]
*/
class ParserOrderByElement : public IParserBase
{

View File

@ -17,6 +17,7 @@ namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int TOP_AND_LIMIT_TOGETHER;
extern const int WITH_TIES_WITHOUT_ORDER_BY;
}
@ -41,6 +42,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_rollup("ROLLUP");
ParserKeyword s_cube("CUBE");
ParserKeyword s_top("TOP");
ParserKeyword s_with_ties("WITH TIES");
ParserKeyword s_offset("OFFSET");
ParserNotEmptyExpressionList exp_list(false);
@ -76,7 +78,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
/// SELECT [DISTINCT] [TOP N] expr list
/// SELECT [DISTINCT] [TOP N [WITH TIES]] expr list
{
if (!s_select.ignore(pos, expected))
return false;
@ -100,6 +102,9 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!num.parse(pos, limit_length, expected))
return false;
}
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
if (!exp_list_for_select_clause.parse(pos, select_expression_list, expected))
@ -197,12 +202,18 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
limit_offset = limit_length;
if (!exp_elem.parse(pos, limit_length, expected))
return false;
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
else if (s_offset.ignore(pos, expected))
{
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
}
else if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
if (s_by.ignore(pos, expected))
{
limit_by_length = limit_length;
@ -215,7 +226,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
/// LIMIT length | LIMIT offset, length
/// LIMIT length [WITH TIES] | LIMIT offset, length [WITH TIES]
if (s_limit.ignore(pos, expected))
{
if (!limit_by_length|| limit_length)
@ -237,8 +248,15 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!exp_elem.parse(pos, limit_offset, expected))
return false;
}
if (s_with_ties.ignore(pos, expected))
select_query->limit_with_ties = true;
}
/// WITH TIES was used without ORDER BY
if (!order_expression_list && select_query->limit_with_ties)
throw Exception("Can not use WITH TIES without ORDER BY", ErrorCodes::WITH_TIES_WITHOUT_ORDER_BY);
/// SETTINGS key1 = value1, key2 = value2, ...
if (s_settings.ignore(pos, expected))
{

View File

@ -6,19 +6,26 @@ namespace DB
LimitTransform::LimitTransform(
const Block & header_, size_t limit_, size_t offset_,
bool always_read_till_end_)
bool always_read_till_end_, bool with_ties_,
const SortDescription & description_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, limit(limit_), offset(offset_)
, always_read_till_end(always_read_till_end_)
, with_ties(with_ties_), description(description_)
{
for (const auto & desc : description)
{
if (!desc.column_name.empty())
sort_column_positions.push_back(header_.getPositionByName(desc.column_name));
else
sort_column_positions.push_back(desc.column_number);
}
}
LimitTransform::Status LimitTransform::prepare()
{
/// Check can output.
bool output_finished = false;
if (output.isFinished())
@ -46,7 +53,7 @@ LimitTransform::Status LimitTransform::prepare()
}
/// Check if we are done with pushing.
bool pushing_is_finished = rows_read >= offset + limit;
bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty();
if (pushing_is_finished)
{
if (!always_read_till_end)
@ -116,6 +123,13 @@ LimitTransform::Status LimitTransform::prepare()
if (output.hasData())
return Status::PortFull;
if (with_ties && rows_read == offset + limit)
{
SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone());
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1);
}
output.push(std::move(current_chunk));
has_block = false;
@ -132,8 +146,39 @@ LimitTransform::Status LimitTransform::prepare()
void LimitTransform::work()
{
size_t num_rows = current_chunk.getNumRows();
size_t num_columns = current_chunk.getNumColumns();
SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
size_t num_rows = shared_chunk->getNumRows();
size_t num_columns = shared_chunk->getNumColumns();
if (!ties_row_ref.empty() && rows_read >= offset + limit)
{
UInt64 len;
for (len = 0; len < num_rows; ++len)
{
SharedChunkRowRef current_row;
current_row.set(shared_chunk, &shared_chunk->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break;
}
}
auto columns = shared_chunk->detachColumns();
if (len < num_rows)
{
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(0, len);
}
current_chunk.setColumns(std::move(columns), len);
block_processed = true;
return;
}
/// return a piece of the block
size_t start = std::max(
@ -145,7 +190,33 @@ void LimitTransform::work()
static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
auto columns = current_chunk.detachColumns();
/// check if other rows in current block equals to last one in limit
if (with_ties)
{
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1);
SharedChunkRowRef current_row;
for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i)
{
current_row.set(shared_chunk, &shared_chunk->sort_columns, i);
if (current_row == ties_row_ref)
++length;
else
{
ties_row_ref.reset();
break;
}
}
}
if (length == num_rows)
{
current_chunk = std::move(*shared_chunk);
block_processed = true;
return;
}
auto columns = shared_chunk->detachColumns();
for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(start, length);
@ -155,5 +226,15 @@ void LimitTransform::work()
block_processed = true;
}
ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns)
{
ColumnRawPtrs res;
res.reserve(description.size());
for (size_t pos : sort_column_positions)
res.push_back(columns[pos].get());
return res;
}
}

View File

@ -1,7 +1,8 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/SharedChunk.h>
#include <Core/SortDescription.h>
namespace DB
{
@ -23,10 +24,18 @@ private:
UInt64 rows_before_limit_at_least = 0;
bool with_ties;
const SortDescription description;
SharedChunkRowRef ties_row_ref;
std::vector<size_t> sort_column_positions;
ColumnRawPtrs extractSortColumns(const Columns & columns);
public:
LimitTransform(
const Block & header_, size_t limit_, size_t offset_,
bool always_read_till_end_ = false);
bool always_read_till_end_ = false, bool with_ties_ = false,
const SortDescription & description_ = {});
String getName() const override { return "Limit"; }

View File

@ -0,0 +1,91 @@
#pragma once
#include <algorithm>
#include <Processors/Chunk.h>
#include <Columns/IColumn.h>
#include <boost/smart_ptr/intrusive_ptr.hpp>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
struct SharedChunkRowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedChunkPtr shared_block;
void swap(SharedChunkRowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedChunkRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const SharedChunkRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedChunkRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
void set(SharedChunkPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
}
};
}

View File

@ -0,0 +1,201 @@
#include <Processors/Transforms/FillingTransform.h>
#include <Interpreters/convertFieldToType.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_WITH_FILL_EXPRESSION;
}
FillingTransform::FillingTransform(
const Block & header_, const SortDescription & sort_description_)
: ISimpleTransform(header_, header_, true)
, sort_description(sort_description_)
, filling_row(sort_description_)
, next_row(sort_description_)
{
std::vector<bool> is_fill_column(header_.columns());
for (const auto & elem : sort_description)
is_fill_column[header_.getPositionByName(elem.column_name)] = true;
auto try_convert_fields = [](FillColumnDescription & descr, const DataTypePtr & type)
{
auto max_type = Field::Types::Null;
WhichDataType which(type);
DataTypePtr to_type;
if (isInteger(type) || which.isDateOrDateTime())
{
max_type = Field::Types::Int64;
to_type = std::make_shared<DataTypeInt64>();
}
else if (which.isFloat())
{
max_type = Field::Types::Float64;
to_type = std::make_shared<DataTypeFloat64>();
}
if (descr.fill_from.getType() > max_type || descr.fill_to.getType() > max_type
|| descr.fill_step.getType() > max_type)
return false;
descr.fill_from = convertFieldToType(descr.fill_from, *to_type);
descr.fill_to = convertFieldToType(descr.fill_to, *to_type);
descr.fill_step = convertFieldToType(descr.fill_step, *to_type);
return true;
};
for (size_t i = 0; i < header_.columns(); ++i)
{
if (is_fill_column[i])
{
size_t pos = fill_column_positions.size();
auto & descr = filling_row.getFillDescription(pos);
auto type = header_.getByPosition(i).type;
if (!try_convert_fields(descr, type))
throw Exception("Incompatible types of WITH FILL expression values with column type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
if (type->isValueRepresentedByUnsignedInteger() &&
((!descr.fill_from.isNull() && less(descr.fill_from, Field{0}, 1)) ||
(!descr.fill_to.isNull() && less(descr.fill_to, Field{0}, 1))))
{
throw Exception("WITH FILL bound values cannot be negative for unsigned type "
+ type->getName(), ErrorCodes::INVALID_WITH_FILL_EXPRESSION);
}
fill_column_positions.push_back(i);
}
else
other_column_positions.push_back(i);
}
}
IProcessor::Status FillingTransform::prepare()
{
if (input.isFinished() && !output.isFinished() && !has_input && !generate_suffix)
{
should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
if (filling_row < next_row)
{
generate_suffix = true;
return Status::Ready;
}
}
return ISimpleTransform::prepare();
}
void FillingTransform::transform(Chunk & chunk)
{
Columns old_fill_columns;
Columns old_other_columns;
MutableColumns res_fill_columns;
MutableColumns res_other_columns;
auto init_columns_by_positions = [](const Columns & old_columns, Columns & new_columns,
MutableColumns & new_mutable_columns, const Positions & positions)
{
for (size_t pos : positions)
{
new_columns.push_back(old_columns[pos]);
new_mutable_columns.push_back(old_columns[pos]->cloneEmpty()->assumeMutable());
}
};
if (generate_suffix)
{
const auto & empty_columns = inputs.front().getHeader().getColumns();
init_columns_by_positions(empty_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(empty_columns, old_other_columns, res_other_columns, other_column_positions);
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
setResultColumns(chunk, res_fill_columns, res_other_columns);
return;
}
size_t num_rows = chunk.getNumRows();
auto old_columns = chunk.detachColumns();
init_columns_by_positions(old_columns, old_fill_columns, res_fill_columns, fill_column_positions);
init_columns_by_positions(old_columns, old_other_columns, res_other_columns, other_column_positions);
if (first)
{
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[0];
const auto & fill_from = filling_row.getFillDescription(i).fill_from;
if (!fill_from.isNull() && !equals(current_value, fill_from))
{
filling_row.initFromDefaults(i);
if (less(fill_from, current_value, filling_row.getDirection(i)))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
break;
}
filling_row[i] = current_value;
}
first = false;
}
for (size_t row_ind = 0; row_ind < num_rows; ++row_ind)
{
should_insert_first = next_row < filling_row;
for (size_t i = 0; i < filling_row.size(); ++i)
{
auto current_value = (*old_fill_columns[i])[row_ind];
const auto & fill_to = filling_row.getFillDescription(i).fill_to;
if (fill_to.isNull() || less(current_value, fill_to, filling_row.getDirection(i)))
next_row[i] = current_value;
else
next_row[i] = fill_to;
}
/// A case, when at previous step row was initialized from defaults 'fill_from' values
/// and probably we need to insert it to block.
if (should_insert_first && filling_row < next_row)
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
/// Insert generated filling row to block, while it is less than current row in block.
while (filling_row.next(next_row))
insertFromFillingRow(res_fill_columns, res_other_columns, filling_row);
copyRowFromColumns(res_fill_columns, old_fill_columns, row_ind);
copyRowFromColumns(res_other_columns, old_other_columns, row_ind);
}
setResultColumns(chunk, res_fill_columns, res_other_columns);
}
void FillingTransform::setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & other_columns) const
{
MutableColumns result_columns(fill_columns.size() + other_columns.size());
/// fill_columns always non-empty.
size_t num_rows = fill_columns[0]->size();
for (size_t i = 0; i < fill_columns.size(); ++i)
result_columns[fill_column_positions[i]] = std::move(fill_columns[i]);
for (size_t i = 0; i < other_columns.size(); ++i)
result_columns[other_column_positions[i]] = std::move(other_columns[i]);
chunk.setColumns(std::move(result_columns), num_rows);
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Core/SortDescription.h>
#include <Interpreters/FillingRow.h>
namespace DB
{
/** Implements modifier WITH FILL of ORDER BY clause.
* It fills gaps in data stream by rows with missing values in columns with set WITH FILL and deafult values in other columns.
* Optionally FROM, TO and STEP values can be specified.
*/
class FillingTransform : public ISimpleTransform
{
public:
FillingTransform(const Block & header_, const SortDescription & fill_description_);
String getName() const override { return "FillingTransform"; }
Status prepare() override;
protected:
void transform(Chunk & Chunk) override;
private:
void setResultColumns(Chunk & chunk, MutableColumns & fill_columns, MutableColumns & other_columns) const;
const SortDescription sort_description; /// Contains only rows with WITH FILL.
FillingRow filling_row; /// Current row, which is used to fill gaps.
FillingRow next_row; /// Row to which we need to generate filling rows.
using Positions = std::vector<size_t>;
Positions fill_column_positions;
Positions other_column_positions;
bool first = true;
bool generate_suffix = false;
/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = false;
};
}

View File

@ -2,46 +2,13 @@
#include <Processors/IProcessor.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <Processors/SharedChunk.h>
#include <queue>
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedChunk : Chunk
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
class MergingSortedTransform : public IProcessor
{
public:

View File

@ -0,0 +1,510 @@
*** table without fill to compare ***
2019-05-07 18 prh
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-15 27 enb
2019-05-19 20 yfh
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-30 18 3kd
2019-06-04 5 6az
*** date WITH FILL, val ***
2019-05-07 18 prh
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-12 0
2019-05-13 0
2019-05-14 0
2019-05-15 27 enb
2019-05-16 0
2019-05-17 0
2019-05-18 0
2019-05-19 20 yfh
2019-05-20 0
2019-05-21 0
2019-05-22 0
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-26 0
2019-05-27 0
2019-05-28 0
2019-05-29 0
2019-05-30 18 3kd
2019-05-31 0
2019-06-01 0
2019-06-02 0
2019-06-03 0
2019-06-04 5 6az
*** date WITH FILL FROM 2019-05-01 TO 2019-05-31, val WITH FILL ***
2019-05-01 0
2019-05-02 0
2019-05-03 0
2019-05-04 0
2019-05-05 0
2019-05-06 0
2019-05-07 18 prh
2019-05-07 19
2019-05-07 20
2019-05-07 21
2019-05-07 22
2019-05-07 23
2019-05-07 24
2019-05-07 25
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 1 myj
2019-05-10 2
2019-05-10 3
2019-05-10 4
2019-05-10 5
2019-05-10 6
2019-05-10 7
2019-05-10 8
2019-05-10 9
2019-05-10 10
2019-05-10 11
2019-05-10 12
2019-05-10 13
2019-05-10 14
2019-05-10 15
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-12 0
2019-05-13 0
2019-05-14 0
2019-05-15 27 enb
2019-05-16 0
2019-05-17 0
2019-05-18 0
2019-05-19 20 yfh
2019-05-20 0
2019-05-21 0
2019-05-22 0
2019-05-23 15 01v
2019-05-23 16
2019-05-23 17
2019-05-23 18
2019-05-23 19
2019-05-23 20
2019-05-23 21
2019-05-23 22
2019-05-23 23
2019-05-23 24
2019-05-23 25
2019-05-23 26
2019-05-23 27
2019-05-23 28
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 17 0ei
2019-05-26 0
2019-05-27 0
2019-05-28 0
2019-05-29 0
2019-05-30 18 3kd
2019-06-04 5 6az
*** date DESC WITH FILL, val WITH FILL FROM 1 TO 6 ***
2019-06-04 1
2019-06-04 2
2019-06-04 3
2019-06-04 4
2019-06-04 5 6az
2019-06-03 1
2019-06-03 2
2019-06-03 3
2019-06-03 4
2019-06-03 5
2019-06-02 1
2019-06-02 2
2019-06-02 3
2019-06-02 4
2019-06-02 5
2019-06-01 1
2019-06-01 2
2019-06-01 3
2019-06-01 4
2019-06-01 5
2019-05-31 1
2019-05-31 2
2019-05-31 3
2019-05-31 4
2019-05-31 5
2019-05-30 1
2019-05-30 2
2019-05-30 3
2019-05-30 4
2019-05-30 5
2019-05-30 18 3kd
2019-05-29 1
2019-05-29 2
2019-05-29 3
2019-05-29 4
2019-05-29 5
2019-05-28 1
2019-05-28 2
2019-05-28 3
2019-05-28 4
2019-05-28 5
2019-05-27 1
2019-05-27 2
2019-05-27 3
2019-05-27 4
2019-05-27 5
2019-05-26 1
2019-05-26 2
2019-05-26 3
2019-05-26 4
2019-05-26 5
2019-05-25 1
2019-05-25 2
2019-05-25 3
2019-05-25 4
2019-05-25 5
2019-05-25 17 0ei
2019-05-24 1
2019-05-24 2
2019-05-24 3
2019-05-24 4
2019-05-24 5
2019-05-24 13 sd0
2019-05-23 1
2019-05-23 2
2019-05-23 3
2019-05-23 4
2019-05-23 5
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-22 1
2019-05-22 2
2019-05-22 3
2019-05-22 4
2019-05-22 5
2019-05-21 1
2019-05-21 2
2019-05-21 3
2019-05-21 4
2019-05-21 5
2019-05-20 1
2019-05-20 2
2019-05-20 3
2019-05-20 4
2019-05-20 5
2019-05-19 1
2019-05-19 2
2019-05-19 3
2019-05-19 4
2019-05-19 5
2019-05-19 20 yfh
2019-05-18 1
2019-05-18 2
2019-05-18 3
2019-05-18 4
2019-05-18 5
2019-05-17 1
2019-05-17 2
2019-05-17 3
2019-05-17 4
2019-05-17 5
2019-05-16 1
2019-05-16 2
2019-05-16 3
2019-05-16 4
2019-05-16 5
2019-05-15 1
2019-05-15 2
2019-05-15 3
2019-05-15 4
2019-05-15 5
2019-05-15 27 enb
2019-05-14 1
2019-05-14 2
2019-05-14 3
2019-05-14 4
2019-05-14 5
2019-05-13 1
2019-05-13 2
2019-05-13 3
2019-05-13 4
2019-05-13 5
2019-05-12 1
2019-05-12 2
2019-05-12 3
2019-05-12 4
2019-05-12 5
2019-05-11 1
2019-05-11 2
2019-05-11 3
2019-05-11 4
2019-05-11 5
2019-05-11 18 3s2
2019-05-10 1 myj
2019-05-10 2
2019-05-10 3
2019-05-10 4
2019-05-10 5
2019-05-10 16 vp7
2019-05-09 1
2019-05-09 2
2019-05-09 3
2019-05-09 4
2019-05-09 5
2019-05-09 25 798
2019-05-08 1
2019-05-08 2
2019-05-08 3
2019-05-08 4
2019-05-08 5
2019-05-08 28 otf
2019-05-07 1
2019-05-07 2
2019-05-07 3
2019-05-07 4
2019-05-07 5
2019-05-07 18 prh
2019-05-07 26 2ke
*** date DESC WITH FILL TO 2019-05-01 STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3 ***
2019-06-04 10
2019-06-04 7
2019-06-04 5 6az
2019-06-04 4
2019-06-04 1
2019-06-04 -2
2019-06-02 10
2019-06-02 7
2019-06-02 4
2019-06-02 1
2019-06-02 -2
2019-05-31 10
2019-05-31 7
2019-05-31 4
2019-05-31 1
2019-05-31 -2
2019-05-30 18 3kd
2019-05-29 10
2019-05-29 7
2019-05-29 4
2019-05-29 1
2019-05-29 -2
2019-05-27 10
2019-05-27 7
2019-05-27 4
2019-05-27 1
2019-05-27 -2
2019-05-25 17 0ei
2019-05-25 10
2019-05-25 7
2019-05-25 4
2019-05-25 1
2019-05-25 -2
2019-05-24 13 sd0
2019-05-23 29 72y
2019-05-23 15 01v
2019-05-23 10
2019-05-23 7
2019-05-23 4
2019-05-23 1
2019-05-23 -2
2019-05-21 10
2019-05-21 7
2019-05-21 4
2019-05-21 1
2019-05-21 -2
2019-05-19 20 yfh
2019-05-19 10
2019-05-19 7
2019-05-19 4
2019-05-19 1
2019-05-19 -2
2019-05-17 10
2019-05-17 7
2019-05-17 4
2019-05-17 1
2019-05-17 -2
2019-05-15 27 enb
2019-05-15 10
2019-05-15 7
2019-05-15 4
2019-05-15 1
2019-05-15 -2
2019-05-13 10
2019-05-13 7
2019-05-13 4
2019-05-13 1
2019-05-13 -2
2019-05-11 18 3s2
2019-05-11 10
2019-05-11 7
2019-05-11 4
2019-05-11 1
2019-05-11 -2
2019-05-10 16 vp7
2019-05-10 1 myj
2019-05-09 25 798
2019-05-09 10
2019-05-09 7
2019-05-09 4
2019-05-09 1
2019-05-09 -2
2019-05-08 28 otf
2019-05-07 26 2ke
2019-05-07 18 prh
2019-05-07 10
2019-05-07 7
2019-05-07 4
2019-05-07 1
2019-05-07 -2
2019-05-05 10
2019-05-05 7
2019-05-05 4
2019-05-05 1
2019-05-05 -2
2019-05-03 10
2019-05-03 7
2019-05-03 4
2019-05-03 1
2019-05-03 -2
2019-05-01 10
2019-05-01 7
2019-05-01 4
2019-05-01 1
2019-05-01 -2
*** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2
2019-05-07 -10
2019-05-07 -8
2019-05-07 -6
2019-05-07 -4
2019-05-07 -2
2019-05-07 0
2019-05-07 2
2019-05-07 4
2019-05-07 6
2019-05-07 8
2019-05-07 10
2019-05-07 12
2019-05-07 14
2019-05-07 16
2019-05-07 18 prh
2019-05-07 20
2019-05-07 22
2019-05-07 24
2019-05-07 26 2ke
2019-05-08 28 otf
2019-05-09 25 798
2019-05-10 -10
2019-05-10 -8
2019-05-10 -6
2019-05-10 -4
2019-05-10 -2
2019-05-10 0
2019-05-10 1 myj
2019-05-10 2
2019-05-10 4
2019-05-10 6
2019-05-10 8
2019-05-10 10
2019-05-10 12
2019-05-10 14
2019-05-10 16 vp7
2019-05-11 18 3s2
2019-05-13 -10
2019-05-15 27 enb
2019-05-16 -10
2019-05-19 -10
2019-05-19 -8
2019-05-19 -6
2019-05-19 -4
2019-05-19 -2
2019-05-19 0
2019-05-19 2
2019-05-19 4
2019-05-19 6
2019-05-19 8
2019-05-19 10
2019-05-19 12
2019-05-19 14
2019-05-19 16
2019-05-19 18
2019-05-19 20 yfh
2019-05-22 -10
2019-05-23 15 01v
2019-05-23 29 72y
2019-05-24 13 sd0
2019-05-25 -10
2019-05-25 -8
2019-05-25 -6
2019-05-25 -4
2019-05-25 -2
2019-05-25 0
2019-05-25 2
2019-05-25 4
2019-05-25 6
2019-05-25 8
2019-05-25 10
2019-05-25 12
2019-05-25 14
2019-05-25 16
2019-05-25 17 0ei
2019-05-28 -10
2019-05-30 18 3kd
2019-05-31 -10
2019-06-03 -10
2019-06-04 5 6az
2019-06-06 -10
2019-06-09 -10
2019-06-12 -10
2019-06-15 -10
2019-06-18 -10
2019-06-21 -10
*** table without fill to compare ***
1 -2
1 3
3 2
5 -1
6 5
8 0
*** a WITH FILL, b WITH fill ***
1 -2
1 -1
1 0
1 1
1 2
1 3
2 0
3 2
4 0
5 -1
6 5
7 0
8 0
*** a WITH FILL, b WITH fill TO 6 STEP 2 ***
1 -2
1 0
1 2
1 3
1 4
2 0
3 2
3 4
4 0
5 -1
5 1
5 3
5 5
6 5
7 0
8 0
8 2
8 4

View File

@ -0,0 +1,45 @@
DROP TABLE IF EXISTS fill;
CREATE TABLE fill (date Date, val Int, str String) ENGINE = Memory;
INSERT INTO fill VALUES (toDate('2019-05-24'), 13, 'sd0')(toDate('2019-05-10'), 16, 'vp7')(toDate('2019-05-25'), 17, '0ei')(toDate('2019-05-30'), 18, '3kd')(toDate('2019-05-15'), 27, 'enb')(toDate('2019-06-04'), 5, '6az')(toDate('2019-05-23'), 15, '01v')(toDate('2019-05-08'), 28, 'otf')(toDate('2019-05-19'), 20, 'yfh')(toDate('2019-05-07'), 26, '2ke')(toDate('2019-05-07'), 18, 'prh')(toDate('2019-05-09'), 25, '798')(toDate('2019-05-10'), 1, 'myj')(toDate('2019-05-11'), 18, '3s2')(toDate('2019-05-23'), 29, '72y');
SELECT '*** table without fill to compare ***';
SELECT * FROM fill ORDER BY date, val;
-- Some useful cases
SELECT '*** date WITH FILL, val ***';
SELECT * FROM fill ORDER BY date WITH FILL, val;
SELECT '*** date WITH FILL FROM 2019-05-01 TO 2019-05-31, val WITH FILL ***';
SELECT * FROM fill ORDER BY date WITH FILL FROM toDate('2019-05-01') TO toDate('2019-05-31'), val WITH FILL;
SELECT '*** date DESC WITH FILL, val WITH FILL FROM 1 TO 6 ***';
SELECT * FROM fill ORDER BY date DESC WITH FILL, val WITH FILL FROM 1 TO 6;
-- Some weird cases
SELECT '*** date DESC WITH FILL TO 2019-05-01 STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3 ***';
SELECT * FROM fill ORDER BY date DESC WITH FILL TO toDate('2019-05-01') STEP -2, val DESC WITH FILL FROM 10 TO -5 STEP -3;
SELECT '*** date WITH FILL TO 2019-06-23 STEP 3, val WITH FILL FROM -10 STEP 2';
SELECT * FROM fill ORDER BY date WITH FILL TO toDate('2019-06-23') STEP 3, val WITH FILL FROM -10 STEP 2;
DROP TABLE fill;
CREATE TABLE fill (a UInt32, b Int32) ENGINE = Memory;
INSERT INTO fill VALUES (1, -2), (1, 3), (3, 2), (5, -1), (6, 5), (8, 0);
SELECT '*** table without fill to compare ***';
SELECT * FROM fill ORDER BY a, b;
SELECT '*** a WITH FILL, b WITH fill ***';
SELECT * FROM fill ORDER BY a WITH FILL, b WITH fill;
SELECT '*** a WITH FILL, b WITH fill TO 6 STEP 2 ***';
SELECT * FROM fill ORDER BY a WITH FILL, b WITH fill TO 6 STEP 2;
SELECT * FROM fill ORDER BY a WITH FILL STEP -1; -- { serverError 475 }
SELECT * FROM fill ORDER BY a WITH FILL FROM 10 TO 1; -- { serverError 475 }
SELECT * FROM fill ORDER BY a DESC WITH FILL FROM 1 TO 10; -- { serverError 475 }
SELECT * FROM fill ORDER BY a WITH FILL FROM -10 to 10; -- { serverError 475 }
DROP TABLE fill;

View File

@ -0,0 +1,52 @@
1
1
*
1
1
2
2
2
2
*
1
1
2
2
2
2
*
1
*
1
2
2
2
2
*
1
1
*
2
2
2
2
*
1
1
2
2
2
2
*
1
1
*
2
2
2
2
*
2
2
2
*

View File

@ -0,0 +1,35 @@
DROP TABLE IF EXISTS ties;
CREATE TABLE ties (a Int) ENGINE = Memory;
-- SET experimental_use_processors=1;
INSERT INTO ties VALUES (1), (1), (2), (2), (2), (2) (3), (3);
SELECT a FROM ties order by a limit 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 5 with ties;
SELECT '*';
SET max_block_size = 2;
SELECT a FROM ties order by a limit 1, 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 1, 2 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2, 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 4 with ties;
SELECT '*';
SET max_block_size = 3;
SELECT a FROM ties order by a limit 1 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 2, 3 with ties;
SELECT '*';
SELECT a FROM ties order by a limit 3, 2 with ties;
SELECT '*';
DROP TABLE ties;