Processors support for StorageLiveView reading.

This commit is contained in:
Nikolai Kochetov 2020-02-17 19:35:23 +03:00
parent 34d7873524
commit fbfaac6391
4 changed files with 41 additions and 40 deletions

View File

@ -19,33 +19,31 @@ namespace DB
/** A stream of blocks from a shared vector of blocks
*/
class BlocksBlockInputStream : public IBlockInputStream
class BlocksSource : public SourceWithProgress
{
public:
/// Acquires shared ownership of the blocks vector
BlocksBlockInputStream(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header_)
: blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()), header(std::move(header_)) {}
BlocksSource(const std::shared_ptr<BlocksPtr> & blocks_ptr_, Block header)
: SourceWithProgress(std::move(header))
, blocks(*blocks_ptr_), it((*blocks_ptr_)->begin()), end((*blocks_ptr_)->end()) {}
String getName() const override { return "Blocks"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override
Chunk generate() override
{
if (it == end)
return Block();
return {};
Block res = *it;
++it;
return res;
return Chunk(res.getColumns(), res.rows());
}
private:
BlocksPtr blocks;
Blocks::iterator it;
const Blocks::iterator end;
Block header;
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Storages/IStorage.h>
#include <Processors/Pipe.h>
namespace DB
@ -13,21 +14,21 @@ class StorageBlocks : public IStorage
*/
public:
StorageBlocks(const StorageID & table_id_,
const ColumnsDescription & columns_, BlockInputStreams streams_,
const ColumnsDescription & columns_, Pipes pipes_,
QueryProcessingStage::Enum to_stage_)
: IStorage(table_id_), streams(streams_), to_stage(to_stage_)
: IStorage(table_id_), pipes(std::move(pipes_)), to_stage(to_stage_)
{
setColumns(columns_);
}
static StoragePtr createStorage(const StorageID & table_id,
const ColumnsDescription & columns, BlockInputStreams streams, QueryProcessingStage::Enum to_stage)
const ColumnsDescription & columns, Pipes pipes, QueryProcessingStage::Enum to_stage)
{
return std::make_shared<StorageBlocks>(table_id, columns, streams, to_stage);
return std::make_shared<StorageBlocks>(table_id, columns, std::move(pipes), to_stage);
}
std::string getName() const override { return "Blocks"; }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & /*context*/) const override { return to_stage; }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -35,12 +36,12 @@ public:
size_t /*max_block_size*/,
unsigned /*num_streams*/) override
{
return streams;
return pipes;
}
private:
Block res_block;
BlockInputStreams streams;
Pipes pipes;
QueryProcessingStage::Enum to_stage;
};

View File

@ -40,6 +40,7 @@ limitations under the License. */
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Access/AccessFlags.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
namespace DB
@ -123,26 +124,24 @@ MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & conte
return new_mergeable_blocks;
}
BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks, Block & sample_block)
Pipes StorageLiveView::blocksToPipes(BlocksPtrs blocks, Block & sample_block)
{
BlockInputStreams streams;
Pipes pipes;
for (auto & blocks_ : *blocks)
{
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
streams.push_back(std::move(stream));
}
return streams;
pipes.emplace_back(std::make_shared<BlocksSource>(std::make_shared<BlocksPtr>(blocks_), sample_block));
return pipes;
}
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
BlockInputStreamPtr StorageLiveView::completeQuery(Pipes pipes)
{
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
auto blocks_storage_id = getBlocksStorageID();
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, getParentStorage()->getColumns(),
std::move(from), QueryProcessingStage::WithMergeableState);
std::move(pipes), QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
@ -179,7 +178,7 @@ void StorageLiveView::writeIntoLiveView(
}
bool is_block_processed = false;
BlockInputStreams from;
Pipes from;
MergeableBlocksPtr mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
@ -191,7 +190,7 @@ void StorageLiveView::writeIntoLiveView(
{
mergeable_blocks = live_view.collectMergeableBlocks(context);
live_view.setMergeableBlocks(mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
from = live_view.blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
is_block_processed = true;
}
}
@ -205,10 +204,11 @@ void StorageLiveView::writeIntoLiveView(
if (live_view.getInnerSubQuery())
mergeable_query = live_view.getInnerSubQuery();
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id,
live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
live_view.getParentStorage()->getColumns(), std::move(pipes), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
QueryProcessingStage::WithMergeableState);
@ -227,11 +227,11 @@ void StorageLiveView::writeIntoLiveView(
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->blocks->push_back(new_mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
from = live_view.blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
}
}
BlockInputStreamPtr data = live_view.completeQuery(from);
BlockInputStreamPtr data = live_view.completeQuery(std::move(from));
copyData(*data, *output);
}
@ -331,8 +331,8 @@ bool StorageLiveView::getNewBlocks()
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
mergeable_blocks = collectMergeableBlocks(*live_view_context);
BlockInputStreams from = blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery({from});
Pipes from = blocksToPipes(mergeable_blocks->blocks, mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery(std::move(from));
while (Block block = data->read())
{
@ -520,7 +520,7 @@ void StorageLiveView::refresh(const Context & context)
}
}
BlockInputStreams StorageLiveView::read(
Pipes StorageLiveView::readWithProcessors(
const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
@ -528,7 +528,7 @@ BlockInputStreams StorageLiveView::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
std::shared_ptr<BlocksBlockInputStream> stream;
Pipes pipes;
{
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
@ -536,9 +536,9 @@ BlockInputStreams StorageLiveView::read(
if (getNewBlocks())
condition.notify_all();
}
stream = std::make_shared<BlocksBlockInputStream>(blocks_ptr, getHeader());
pipes.emplace_back(std::make_shared<BlocksSource>(blocks_ptr, getHeader()));
}
return { stream };
return pipes;
}
BlockInputStreams StorageLiveView::watch(

View File

@ -126,7 +126,7 @@ public:
void refresh(const Context & context);
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
@ -134,6 +134,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
@ -148,7 +150,7 @@ public:
/// Collect mergeable blocks and their sample. Must be called holding mutex
MergeableBlocksPtr collectMergeableBlocks(const Context & context);
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr completeQuery(BlockInputStreams from);
BlockInputStreamPtr completeQuery(Pipes pipes);
void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
@ -159,7 +161,7 @@ public:
Block getHeader() const;
/// convert blocks to input streams
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks, Block & sample_block);
static Pipes blocksToPipes(BlocksPtrs blocks, Block & sample_block);
static void writeIntoLiveView(
StorageLiveView & live_view,