Fix tests.

This commit is contained in:
Nikolai Kochetov 2021-08-12 13:46:58 +03:00
parent beed4c21a5
commit 9c066d476e
6 changed files with 12 additions and 4 deletions

View File

@ -261,7 +261,6 @@ BlockIO InterpreterInsertQuery::execute()
{
InterpreterWatchQuery interpreter_watch{ query.watch, getContext() };
res = interpreter_watch.execute();
res.pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(std::move(res.in))));
}
for (size_t i = 0; i < out_streams_size; i++)

View File

@ -165,6 +165,7 @@ SRCS(
Transforms/ReverseTransform.cpp
Transforms/RollupTransform.cpp
Transforms/SortingTransform.cpp
Transforms/SquashingChunksTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/WindowTransform.cpp
Transforms/getSourceFromFromASTInsertQuery.cpp

View File

@ -11,8 +11,15 @@ namespace DB
class LiveViewSink : public SinkToStorage
{
/// _version column is added manually in sink.
static Block updateHeader(Block block)
{
block.erase("_version");
return block;
}
public:
explicit LiveViewSink(StorageLiveView & storage_) : SinkToStorage(storage_.getHeader()), storage(storage_) {}
explicit LiveViewSink(StorageLiveView & storage_) : SinkToStorage(updateHeader(storage_.getHeader())), storage(storage_) {}
String getName() const override { return "LiveViewSink"; }

View File

@ -19,6 +19,7 @@ limitations under the License. */
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Transforms/SquashingChunksTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <DataStreams/copyData.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>
@ -116,6 +117,8 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
return std::make_shared<MaterializingTransform>(cur_header);
});
new_mergeable_blocks->sample_block = io.pipeline.getHeader();
PullingPipelineExecutor executor(io.pipeline);
Block this_block;
@ -125,7 +128,6 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(ContextPtr local_cont
new_blocks->push_back(base_blocks);
new_mergeable_blocks->blocks = new_blocks;
new_mergeable_blocks->sample_block = io.pipeline.getHeader();
return new_mergeable_blocks;
}
@ -154,7 +156,6 @@ QueryPipeline StorageLiveView::completeQuery(Pipes pipes)
std::move(pipes), QueryProcessingStage::WithMergeableState);
};
block_context->addExternalTable(getBlocksTableName(), TemporaryTableHolder(getContext(), creator));
InterpreterSelectQuery select(getInnerBlocksQuery(), block_context, StoragePtr(), nullptr, SelectQueryOptions(QueryProcessingStage::Complete));
auto io = select.execute();
io.pipeline.addSimpleTransform([&](const Block & cur_header)