push functional draft

This commit is contained in:
Guillaume Tassery 2020-05-07 20:40:50 +07:00
parent 8ce606571e
commit 2e719314a3
4 changed files with 25 additions and 118 deletions

View File

@ -656,16 +656,16 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, const Co
return order_descr; return order_descr;
} }
static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context) static UInt64 getLimitUIntValue(const ASTPtr & node, const Context & context, const std::string & expr)
{ {
const auto & [field, type] = evaluateConstantExpression(node, context); const auto & [field, type] = evaluateConstantExpression(node, context);
if (!isNativeNumber(type)) if (!isNativeNumber(type))
throw Exception("Illegal type " + type->getName() + " of LIMIT expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION); throw Exception("Illegal type " + type->getName() + " of " + expr + " expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
Field converted = convertFieldToType(field, DataTypeUInt64()); Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull()) if (converted.isNull())
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of LIMIT expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION); throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of " + expr + " expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
return converted.safeGet<UInt64>(); return converted.safeGet<UInt64>();
} }
@ -677,9 +677,9 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
UInt64 offset = 0; UInt64 offset = 0;
if (query.limitLength()) if (query.limitLength())
length = getLimitUIntValue(query.limitLength(), context); length = getLimitUIntValue(query.limitLength(), context, "LIMIT");
if (query.limitOffset()) if (query.limitOffset())
offset = getLimitUIntValue(query.limitOffset(), context); offset = getLimitUIntValue(query.limitOffset(), context, "OFFSET");
return {length, offset}; return {length, offset};
} }
@ -2336,8 +2336,8 @@ void InterpreterSelectQuery::executeLimitBy(Pipeline & pipeline)
Names columns; Names columns;
for (const auto & elem : query.limitBy()->children) for (const auto & elem : query.limitBy()->children)
columns.emplace_back(elem->getColumnName()); columns.emplace_back(elem->getColumnName());
UInt64 length = getLimitUIntValue(query.limitByLength(), *context); UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT");
UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0);
pipeline.transform([&](auto & stream) pipeline.transform([&](auto & stream)
{ {
@ -2355,8 +2355,8 @@ void InterpreterSelectQuery::executeLimitBy(QueryPipeline & pipeline)
for (const auto & elem : query.limitBy()->children) for (const auto & elem : query.limitBy()->children)
columns.emplace_back(elem->getColumnName()); columns.emplace_back(elem->getColumnName());
UInt64 length = getLimitUIntValue(query.limitByLength(), *context); UInt64 length = getLimitUIntValue(query.limitByLength(), *context, "LIMIT");
UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context) : 0); UInt64 offset = (query.limitByOffset() ? getLimitUIntValue(query.limitByOffset(), *context, "OFFSET") : 0);
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{ {
@ -2580,23 +2580,6 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline)
/// If there is LIMIT /// If there is LIMIT
if (!query.limitLength() && query.limitOffset()) if (!query.limitLength() && query.limitOffset())
{ {
/** Rare case:
* if there is no WITH TOTALS and there is a subquery in FROM, and there is WITH TOTALS on one of the levels,
* then when using LIMIT, you should read the data to the end, rather than cancel the query earlier,
* because if you cancel the query, we will not get `totals` data from the remote server.
*
* Another case:
* if there is WITH TOTALS and there is no ORDER BY, then read the data to the end,
* otherwise TOTALS is counted according to incomplete data.
*/
bool always_read_till_end = false;
if (query.group_by_with_totals && !query.orderBy())
always_read_till_end = true;
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
always_read_till_end = true;
UInt64 limit_length; UInt64 limit_length;
UInt64 limit_offset; UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context); std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, *context);
@ -2615,7 +2598,7 @@ void InterpreterSelectQuery::executeOffset(QueryPipeline & pipeline)
return nullptr; return nullptr;
std::cout << "TRANSFORM" << std::endl; std::cout << "TRANSFORM" << std::endl;
return std::make_shared<OffsetTransform>( return std::make_shared<OffsetTransform>(
header, limit_length, limit_offset, 1, always_read_till_end, query.limit_with_ties, order_descr); header, limit_offset, 1, true, query.limit_with_ties, order_descr);
}); });
} }
} }

View File

@ -247,11 +247,6 @@ bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
if (!exp_elem.parse(pos, limit_offset, expected)) if (!exp_elem.parse(pos, limit_offset, expected))
return false; return false;
if (s_with_ties.ignore(pos, expected))
{
limit_with_ties_occured = true;
select_query->limit_with_ties = true;
}
} }
/// Because TOP n in totally equals LIMIT n /// Because TOP n in totally equals LIMIT n

View File

@ -10,11 +10,11 @@ namespace ErrorCodes
} }
OffsetTransform::OffsetTransform( OffsetTransform::OffsetTransform(
const Block & header_, size_t limit_, size_t offset_, size_t num_streams, const Block & header_, size_t offset_, size_t num_streams,
bool always_read_till_end_, bool with_ties_, bool always_read_till_end_, bool with_ties_,
SortDescription description_) SortDescription description_)
: IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_)) : IProcessor(InputPorts(num_streams, header_), OutputPorts(num_streams, header_))
, limit(limit_), offset(offset_) , offset(offset_)
, always_read_till_end(always_read_till_end_) , always_read_till_end(always_read_till_end_)
, with_ties(with_ties_), description(std::move(description_)) , with_ties(with_ties_), description(std::move(description_))
{ {
@ -107,19 +107,6 @@ IProcessor::Status OffsetTransform::prepare(
if (num_finished_port_pairs == ports_data.size()) if (num_finished_port_pairs == ports_data.size())
return Status::Finished; return Status::Finished;
/// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data.
/// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1
// if ((rows_read >= offset) && !previous_row_chunk && !always_read_till_end)
// {
// for (auto & input : inputs)
// input.close();
// for (auto & output : outputs)
// output.finish();
//return Status::Finished;
//}
if (has_full_port) if (has_full_port)
return Status::PortFull; return Status::PortFull;
@ -145,11 +132,6 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
if (output.isFinished()) if (output.isFinished())
{ {
output_finished = true; output_finished = true;
if (!always_read_till_end)
{
input.close();
return Status::Finished;
}
} }
if (!output_finished && !output.canPush()) if (!output_finished && !output.canPush())
@ -167,8 +149,9 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
} }
input.setNeeded(); input.setNeeded();
if (!input.hasData()) if (!input.hasData()) {
return Status::NeedData; return Status::NeedData;
}
data.current_chunk = input.pull(true); data.current_chunk = input.pull(true);
@ -177,10 +160,14 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
if (rows_before_limit_at_least) if (rows_before_limit_at_least)
rows_before_limit_at_least->add(rows); rows_before_limit_at_least->add(rows);
/// Skip block (for 'always_read_till_end' case). /// Process block.
if (output_finished)
rows_read += rows;
if (rows_read < offset)
{ {
data.current_chunk.clear(); data.current_chunk.clear();
if (input.isFinished()) if (input.isFinished())
{ {
output.finish(); output.finish();
@ -192,42 +179,9 @@ OffsetTransform::Status OffsetTransform::preparePair(PortsData & data)
return Status::NeedData; return Status::NeedData;
} }
/// Process block. if (!(rows_read >= offset + rows))
rows_read += rows;
//if (rows_read <= offset)
//{
// data.current_chunk.clear();
//
// if (input.isFinished())
// {
// output.finish();
// return Status::Finished;
// }
/// Now, we pulled from input, and it must be empty.
// input.setNeeded();
// return Status::NeedData;
//}
if (rows_read >= offset + rows && rows_read <= offset)
{
/// Return the whole chunk.
/// Save the last row of current chunk to check if next block begins with the same row (for WITH TIES).
if (with_ties && rows_read == offset + limit)
previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, data.current_chunk.getNumRows() - 1);
}
else
/// This function may be heavy to execute in prepare. But it happens no more then twice, and make code simpler.
splitChunk(data); splitChunk(data);
//bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset;
/// No more data is needed.
//if (!always_read_till_end && (rows_read >= offset) && !may_need_more_data_for_ties)
// input.close();
output.push(std::move(data.current_chunk)); output.push(std::move(data.current_chunk));
return Status::PortFull; return Status::PortFull;
@ -245,32 +199,7 @@ void OffsetTransform::splitChunk(PortsData & data)
static_cast<Int64>(0), static_cast<Int64>(0),
static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)); static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
//size_t length = std::min( size_t length = static_cast<Int64>(rows_read) - static_cast<Int64>(offset);
// static_cast<Int64>(rows_read) - static_cast<Int64>(offset),
// static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows));
size_t length = static_cast<Int64>(num_rows);
std::cout << "===========================" << std::endl
<< start << " " << length << std::endl
<< static_cast<Int64>(rows_read) << " " << static_cast<Int64>(num_rows) << std::endl
<< "===========================" << std::endl;
/// check if other rows in current block equals to last one in limit
if (with_ties && length)
{
size_t current_row_num = start + length;
previous_row_chunk = makeChunkWithPreviousRow(data.current_chunk, current_row_num - 1);
for (; current_row_num < num_rows; ++current_row_num)
{
if (!sortColumnsEqualAt(current_chunk_sort_columns, current_row_num))
{
previous_row_chunk = {};
break;
}
}
length = current_row_num - start;
}
if (length == num_rows) if (length == num_rows)
return; return;
@ -283,6 +212,7 @@ void OffsetTransform::splitChunk(PortsData & data)
data.current_chunk.setColumns(std::move(columns), length); data.current_chunk.setColumns(std::move(columns), length);
} }
ColumnRawPtrs OffsetTransform::extractSortColumns(const Columns & columns) const ColumnRawPtrs OffsetTransform::extractSortColumns(const Columns & columns) const
{ {
ColumnRawPtrs res; ColumnRawPtrs res;

View File

@ -19,7 +19,6 @@ class OffsetTransform : public IProcessor
{ {
private: private:
size_t limit;
size_t offset; size_t offset;
bool always_read_till_end; bool always_read_till_end;
@ -52,11 +51,11 @@ private:
public: public:
OffsetTransform( OffsetTransform(
const Block & header_, size_t limit_, size_t offset_, size_t num_streams = 1, const Block & header_, size_t offset_, size_t num_streams = 1,
bool always_read_till_end_ = false, bool with_ties_ = false, bool always_read_till_end_ = false, bool with_ties_ = false,
SortDescription description_ = {}); SortDescription description_ = {});
String getName() const override { return "Limit"; } String getName() const override { return "Offset"; }
Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override; Status prepare(const PortNumbers & /*updated_input_ports*/, const PortNumbers & /*updated_output_ports*/) override;
Status prepare() override; /// Compatibility for TreeExecutor. Status prepare() override; /// Compatibility for TreeExecutor.