Updates to make all live view tests to pass.

This commit is contained in:
Vitaliy Zakaznikov 2020-01-03 02:48:15 +01:00
parent 2c4bf0581f
commit 25458a4865
2 changed files with 42 additions and 66 deletions

View File

@ -1,4 +1,4 @@
/* iopyright (c) 2018 BlackBerry Limited
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -95,50 +95,45 @@ static void extractDependentTable(ASTPtr & query, String & select_database_name,
DB::ErrorCodes::LOGICAL_ERROR);
}
BlocksPtrs StorageLiveView::collectMergeableBlocks(const Context & context)
MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & context)
{
ASTPtr mergeable_query = inner_query;
if (inner_subquery)
mergeable_query = inner_subquery;
BlocksPtrs new_mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
MergeableBlocksPtr new_mergeable_blocks = std::make_shared<MergeableBlocks>();
BlocksPtrs new_blocks = std::make_shared<std::vector<BlocksPtr>>();
BlocksPtr base_blocks = std::make_shared<Blocks>();
InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
InterpreterSelectQuery interpreter(mergeable_query->clone(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block this_block = view_mergeable_stream->read())
base_mergeable_blocks->push_back(this_block);
base_blocks->push_back(this_block);
new_mergeable_blocks->push_back(base_mergeable_blocks);
new_blocks->push_back(base_blocks);
mergeable_blocks = new_mergeable_blocks;
new_mergeable_blocks->blocks = new_blocks;
new_mergeable_blocks->sample_block = view_mergeable_stream->getHeader();
return mergeable_blocks;
return new_mergeable_blocks;
}
BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks)
BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks, Block & sample_block)
{
BlockInputStreams streams;
for (auto & blocks_ : *blocks)
{
if (blocks_->empty())
continue;
auto sample_block = blocks_->front().cloneEmpty();
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
streams.push_back(std::move(stream));
}
return streams;
}
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams & from)
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
{
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
@ -167,8 +162,6 @@ void StorageLiveView::writeIntoLiveView(
const Context & context)
{
BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
auto block_context = std::make_unique<Context>(context.getGlobalContext());
block_context->makeQueryContext();
/// Check if live view has any readers if not
/// just reset blocks to empty and do nothing else
@ -184,28 +177,30 @@ void StorageLiveView::writeIntoLiveView(
bool is_block_processed = false;
BlockInputStreams from;
BlocksPtrs mergeable_blocks;
MergeableBlocksPtr mergeable_blocks;
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
ASTPtr mergeable_query = live_view.getInnerQuery();
if (live_view.getInnerSubQuery())
mergeable_query = live_view.getInnerSubQuery();
{
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
if (!mergeable_blocks || mergeable_blocks->blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
{
mergeable_blocks = live_view.collectMergeableBlocks(context);
from = live_view.blocksToInputStreams(mergeable_blocks);
live_view.setMergeableBlocks(mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
is_block_processed = true;
}
}
if (!is_block_processed)
{
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
ASTPtr mergeable_query = live_view.getInnerQuery();
if (live_view.getInnerSubQuery())
mergeable_query = live_view.getInnerSubQuery();
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto blocks_storage = StorageBlocks::createStorage(live_view.database_name, live_view.table_name,
live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
@ -226,8 +221,8 @@ void StorageLiveView::writeIntoLiveView(
std::lock_guard lock(live_view.mutex);
mergeable_blocks = live_view.getMergeableBlocks();
mergeable_blocks->push_back(new_mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks);
mergeable_blocks->blocks->push_back(new_mergeable_blocks);
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
}
}
@ -329,36 +324,10 @@ bool StorageLiveView::getNewBlocks()
UInt128 key;
BlocksPtr new_blocks = std::make_shared<Blocks>();
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
ASTPtr mergeable_query = inner_query;
if (inner_subquery)
mergeable_query = inner_subquery;
InterpreterSelectQuery interpreter(mergeable_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
while (Block block = mergeable_stream->read())
new_mergeable_blocks->push_back(block);
auto block_context = std::make_unique<Context>(global_context);
block_context->makeQueryContext();
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
mergeable_blocks->push_back(new_mergeable_blocks);
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
auto blocks_storage = StorageBlocks::createStorage(database_name, table_name, global_context.getTable(select_database_name, select_table_name)->getColumns(), {from}, QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(table_name + "_blocks", blocks_storage);
InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
/// and two-level aggregation is triggered).
data = std::make_shared<SquashingBlockInputStream>(
data, global_context.getSettingsRef().min_insert_block_size_rows, global_context.getSettingsRef().min_insert_block_size_bytes);
mergeable_blocks = collectMergeableBlocks(*live_view_context);
BlockInputStreams from = blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
BlockInputStreamPtr data = completeQuery({from});
while (Block block = data->read())
{

View File

@ -27,9 +27,16 @@ struct BlocksMetadata
UInt64 version;
};
struct MergeableBlocks
{
BlocksPtrs blocks;
Block sample_block;
};
class IAST;
using ASTPtr = std::shared_ptr<IAST>;
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
using MergeableBlocksPtr = std::shared_ptr<MergeableBlocks>;
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
{
@ -139,14 +146,14 @@ public:
unsigned num_streams) override;
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
MergeableBlocksPtr getMergeableBlocks() { return mergeable_blocks; }
/// collect and set mergeable blocks. Must be called holding mutex
BlocksPtrs collectMergeableBlocks(const Context & context);
/// Collect mergeable blocks and their sample. Must be called holding mutex
MergeableBlocksPtr collectMergeableBlocks(const Context & context);
/// Complete query using input streams from mergeable blocks
BlockInputStreamPtr completeQuery(BlockInputStreams & from);
BlockInputStreamPtr completeQuery(BlockInputStreams from);
void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; }
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
/// Read new data blocks that store query result
@ -155,7 +162,7 @@ public:
Block getHeader() const;
/// convert blocks to input streams
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks);
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks, Block & sample_block);
static void writeIntoLiveView(
StorageLiveView & live_view,
@ -191,7 +198,7 @@ private:
std::shared_ptr<BlocksPtr> blocks_ptr;
/// Current data blocks metadata
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
BlocksPtrs mergeable_blocks;
MergeableBlocksPtr mergeable_blocks;
/// Background thread for temporary tables
/// which drops this table if there are no users