virtualrow sketch

This commit is contained in:
jsc0218 2024-09-04 23:08:02 +00:00
parent 57996cc684
commit 87c7a8b4fb
4 changed files with 132 additions and 1 deletions

View File

@ -22,6 +22,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/SelectByIndicesTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexVectorSimilarity.h>
@ -635,6 +636,8 @@ Pipe ReadFromMergeTree::readInOrder(
});
}
pipe.addSimpleTransform([](const Block & header){ return std::make_shared<VirtualRowTransform>(header); });
return pipe;
}

View File

@ -6,6 +6,7 @@
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
@ -259,7 +260,7 @@ void SortingStep::enableVirtualRow(const QueryPipelineBuilder & pipeline) const
{
merge_tree_sources.push_back(merge_tree_source);
}
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor))
else if (!std::dynamic_pointer_cast<ExpressionTransform>(processor) && !std::dynamic_pointer_cast<VirtualRowTransform>(processor))
{
enable_virtual_row = false;
break;

View File

@ -0,0 +1,99 @@
#include <Processors/Transforms/VirtualRowTransform.h>
#include "Processors/Chunk.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header)
: IInflatingTransform(header, header)
{
}
IInflatingTransform::Status VirtualRowTransform::prepare()
{
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (generated)
{
output.push(std::move(current_chunk));
generated = false;
return Status::PortFull;
}
if (can_generate)
return Status::Ready;
/// Check can input.
if (!has_input)
{
if (input.isFinished())
{
if (is_finished)
{
output.finish();
return Status::Finished;
}
is_finished = true;
return Status::Ready;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Set input port NotNeeded after chunk was pulled.
current_chunk = input.pull(true);
has_input = true;
}
/// Now transform.
return Status::Ready;
}
void VirtualRowTransform::consume(Chunk chunk)
{
if (!is_first)
{
temp_chunk = std::move(chunk);
return;
}
is_first = false;
temp_chunk = std::move(chunk);
}
Chunk VirtualRowTransform::generate()
{
if (temp_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
Chunk result;
result.swap(temp_chunk);
return result;
}
bool VirtualRowTransform::canGenerate()
{
return !temp_chunk.empty();
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/IInflatingTransform.h>
#include <Processors/ISimpleTransform.h>
namespace DB
{
class VirtualRowTransform : public IInflatingTransform
{
public:
explicit VirtualRowTransform(const Block & header);
String getName() const override { return "VirtualRowTransform"; }
Status prepare() override;
protected:
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
private:
bool is_first = false;
Chunk temp_chunk;
};
}