Fix some tests.

This commit is contained in:
Nikolai Kochetov 2021-10-07 20:27:13 +03:00
parent d0c6f11fcb
commit 213d68d995
5 changed files with 50 additions and 17 deletions

View File

@ -47,7 +47,10 @@ void DistinctSortedTransform::transform(Chunk & chunk)
/// Just go to the next block if there isn't any new record in the current one.
if (!has_new_data)
{
chunk.clear();
return;
}
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
{

View File

@ -23,7 +23,7 @@ TTLTransform::TTLTransform(
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: ISimpleTransform(header_, header_, false)
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)"))
{
@ -97,16 +97,16 @@ Block reorderColumns(Block block, const Block & header)
return res;
}
void TTLTransform::transform(Chunk & chunk)
void TTLTransform::consume(Chunk chunk)
{
if (all_data_dropped)
{
stopReading();
chunk.clear();
finishConsume();
return;
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
algorithm->execute(block);
@ -114,8 +114,20 @@ void TTLTransform::transform(Chunk & chunk)
return;
size_t num_rows = block.rows();
setReadyChunk(Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows));
}
chunk = Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
Chunk TTLTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
size_t num_rows = block.rows();
return Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
}
void TTLTransform::finalize()
@ -133,7 +145,7 @@ void TTLTransform::finalize()
IProcessor::Status TTLTransform::prepare()
{
auto status = ISimpleTransform::prepare();
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -12,7 +12,7 @@
namespace DB
{
class TTLTransform : public ISimpleTransform
class TTLTransform : public IAccumulatingTransform
{
public:
TTLTransform(
@ -29,7 +29,8 @@ public:
Status prepare() override;
protected:
void transform(Chunk & chunk) override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void finalize();

View File

@ -11,9 +11,9 @@ TTLCalcTransform::TTLCalcTransform(
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: ISimpleTransform(header_, header_, true)
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)"))
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcTransform)"))
{
auto old_ttl_infos = data_part->ttl_infos;
@ -50,7 +50,7 @@ TTLCalcTransform::TTLCalcTransform(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
void TTLCalcTransform::transform(Chunk & chunk)
void TTLCalcTransform::consume(Chunk chunk)
{
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
@ -63,7 +63,23 @@ void TTLCalcTransform::transform(Chunk & chunk)
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
chunk = std::move(res);
setReadyChunk(std::move(res));
}
Chunk TTLCalcTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
return res;
}
void TTLCalcTransform::finalize()
@ -75,7 +91,7 @@ void TTLCalcTransform::finalize()
IProcessor::Status TTLCalcTransform::prepare()
{
auto status = ISimpleTransform::prepare();
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -11,7 +11,7 @@
namespace DB
{
class TTLCalcTransform : public ISimpleTransform
class TTLCalcTransform : public IAccumulatingTransform
{
public:
TTLCalcTransform(
@ -27,7 +27,8 @@ public:
Status prepare() override;
protected:
void transform(Chunk & chunk) override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void finalize();