This commit is contained in:
jsc0218 2024-09-06 04:12:03 +00:00
parent 87c7a8b4fb
commit 67ad7b592c
2 changed files with 40 additions and 20 deletions

View File

@ -10,11 +10,12 @@ namespace ErrorCodes
}
VirtualRowTransform::VirtualRowTransform(const Block & header)
: IInflatingTransform(header, header)
: IProcessor({header}, {header})
, input(inputs.front()), output(outputs.front())
{
}
IInflatingTransform::Status VirtualRowTransform::prepare()
VirtualRowTransform::Status VirtualRowTransform::prepare()
{
/// Check can output.
@ -46,13 +47,8 @@ IInflatingTransform::Status VirtualRowTransform::prepare()
{
if (input.isFinished())
{
if (is_finished)
{
output.finish();
return Status::Finished;
}
is_finished = true;
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
@ -69,6 +65,28 @@ IInflatingTransform::Status VirtualRowTransform::prepare()
return Status::Ready;
}
void VirtualRowTransform::work()
{
if (can_generate)
{
if (generated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it already was generated");
current_chunk = generate();
generated = true;
can_generate = false;
}
else
{
if (!has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it wasn't read");
consume(std::move(current_chunk));
has_input = false;
can_generate = true;
}
}
void VirtualRowTransform::consume(Chunk chunk)
{
if (!is_first)
@ -91,9 +109,4 @@ Chunk VirtualRowTransform::generate()
return result;
}
bool VirtualRowTransform::canGenerate()
{
return !temp_chunk.empty();
}
}

View File

@ -6,7 +6,7 @@
namespace DB
{
class VirtualRowTransform : public IInflatingTransform
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header);
@ -14,13 +14,20 @@ public:
String getName() const override { return "VirtualRowTransform"; }
Status prepare() override;
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
void work() override;
private:
void consume(Chunk chunk);
Chunk generate();
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool generated = false;
bool can_generate = false;
bool is_first = false;
Chunk temp_chunk;
};