First changes.

This commit is contained in:
Vitaliy Zakaznikov 2020-01-02 12:24:55 -05:00
parent d0f7a2f3da
commit d43eae2db8
2 changed files with 76 additions and 45 deletions

View File

@ -95,6 +95,64 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name,
DB::ErrorCodes::LOGICAL_ERROR);
}
BlocksPtrs StorageLiveView::collectMergeableBlocks(const Context & context)
{
BlocksPtrs new_mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
new_mergeable_blocks->push_back(base_mergeable_blocks);
mergeable_blocks = new_mergeable_blocks;
}
BlockInputStreams blocksToInputStreams(BlocksPtrs blocks)
{
BlockInputStreams streams;
for (auto & blocks_ : *blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
streams.push_back(std::move(stream));
}
return streams;
}
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
{
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, parent_storage->getColumns(),
std::move(from), QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(table_name + "_blocks", blocks_storage);
InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, global_context.getSettingsRef().min_insert_block_size_rows,
global_context.getSettingsRef().min_insert_block_size_bytes);
return data;
}
void StorageLiveView::writeIntoLiveView(
StorageLiveView & live_view,
@ -132,41 +190,25 @@ void StorageLiveView::writeIntoLiveView(
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
{
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
mergeable_blocks->push_back(base_mergeable_blocks);
live_view.setMergeableBlocks(mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
mergeable_blocks = live_view.collectMergeableBlocks(context);
from = live_view.blocksToInputStreams(mergeable_blocks);
is_block_processed = true;
}
}
auto parent_storage = context.getTable(live_view.getSelectDatabaseName(), live_view.getSelectTableName());
if (!is_block_processed)
{
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name,
parent_storage->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
QueryProcessingStage::WithMergeableState);
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
select_block.execute().in);
while (Block this_block = data_mergeable_stream->read())
new_mergeable_blocks->push_back(this_block);
@ -178,31 +220,11 @@ void StorageLiveView::writeIntoLiveView(
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->push_back(new_mergeable_blocks);
/// Create from streams
for (auto & blocks_ : *mergeable_blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
from.push_back(std::move(stream));
}
from = live_view.blocksToInputStreams(mergeable_blocks);
}
}
auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(live_view.table_name + "_blocks", blocks_storage);
InterpreterSelectQuery select(live_view.getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes);
BlockInputStreamPtr data = completeQuery(std::move(from));
copyData(*data, *output);
}
@ -247,6 +269,8 @@ StorageLiveView::StorageLiveView(
DatabaseAndTableName(select_database_name, select_table_name),
DatabaseAndTableName(database_name, table_name));
parent_storage = local_context.getTable(select_database_name, select_table_name);
is_temporary = query.temporary;
temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();

View File

@ -45,6 +45,7 @@ public:
String getDatabaseName() const override { return database_name; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
StoragePtr getParentStorage() const { return parent_storage; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
@ -139,6 +140,8 @@ public:
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
/// collect and set mergeable blocks. Must be called holding mutex
BlocksPtrs collectMergeableBlocks(const Context & context);
void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
@ -147,6 +150,9 @@ public:
Block getHeader() const;
/// convert blocks to input streams
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks);
static void writeIntoLiveView(
StorageLiveView & live_view,
const Block & block,
@ -162,6 +168,7 @@ private:
ASTPtr inner_blocks_query; /// query over the mergeable blocks to produce final result
Context & global_context;
std::unique_ptr<Context> live_view_context;
StoragePtr parent_storage;
bool is_temporary = false;
/// Mutex to protect access to sample block