mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 12:14:18 +00:00
Merge branch 'master' into table_name_in_istorage
This commit is contained in:
commit
54a75269cb
@ -1,4 +1,4 @@
|
|||||||
/* iopyright (c) 2018 BlackBerry Limited
|
/* Copyright (c) 2018 BlackBerry Limited
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -96,6 +96,67 @@ static StorageID extractDependentTable(ASTPtr & query, Context & context, const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MergeableBlocksPtr StorageLiveView::collectMergeableBlocks(const Context & context)
|
||||||
|
{
|
||||||
|
ASTPtr mergeable_query = inner_query;
|
||||||
|
|
||||||
|
if (inner_subquery)
|
||||||
|
mergeable_query = inner_subquery;
|
||||||
|
|
||||||
|
MergeableBlocksPtr new_mergeable_blocks = std::make_shared<MergeableBlocks>();
|
||||||
|
BlocksPtrs new_blocks = std::make_shared<std::vector<BlocksPtr>>();
|
||||||
|
BlocksPtr base_blocks = std::make_shared<Blocks>();
|
||||||
|
|
||||||
|
InterpreterSelectQuery interpreter(mergeable_query->clone(), context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
|
||||||
|
|
||||||
|
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
|
||||||
|
|
||||||
|
while (Block this_block = view_mergeable_stream->read())
|
||||||
|
base_blocks->push_back(this_block);
|
||||||
|
|
||||||
|
new_blocks->push_back(base_blocks);
|
||||||
|
|
||||||
|
new_mergeable_blocks->blocks = new_blocks;
|
||||||
|
new_mergeable_blocks->sample_block = view_mergeable_stream->getHeader();
|
||||||
|
|
||||||
|
return new_mergeable_blocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockInputStreams StorageLiveView::blocksToInputStreams(BlocksPtrs blocks, Block & sample_block)
|
||||||
|
{
|
||||||
|
BlockInputStreams streams;
|
||||||
|
for (auto & blocks_ : *blocks)
|
||||||
|
{
|
||||||
|
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
|
||||||
|
streams.push_back(std::move(stream));
|
||||||
|
}
|
||||||
|
return streams;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Complete query using input streams from mergeable blocks
|
||||||
|
BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
|
||||||
|
{
|
||||||
|
auto block_context = std::make_unique<Context>(global_context);
|
||||||
|
block_context->makeQueryContext();
|
||||||
|
|
||||||
|
auto blocks_storage_id = getBlocksStorageID();
|
||||||
|
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, parent_storage->getColumns(),
|
||||||
|
std::move(from), QueryProcessingStage::WithMergeableState);
|
||||||
|
|
||||||
|
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
|
||||||
|
|
||||||
|
InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
|
||||||
|
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
||||||
|
|
||||||
|
/// Squashing is needed here because the view query can generate a lot of blocks
|
||||||
|
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
|
||||||
|
/// and two-level aggregation is triggered).
|
||||||
|
data = std::make_shared<SquashingBlockInputStream>(
|
||||||
|
data, global_context.getSettingsRef().min_insert_block_size_rows,
|
||||||
|
global_context.getSettingsRef().min_insert_block_size_bytes);
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
void StorageLiveView::writeIntoLiveView(
|
void StorageLiveView::writeIntoLiveView(
|
||||||
StorageLiveView & live_view,
|
StorageLiveView & live_view,
|
||||||
@ -103,8 +164,6 @@ void StorageLiveView::writeIntoLiveView(
|
|||||||
const Context & context)
|
const Context & context)
|
||||||
{
|
{
|
||||||
BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
|
BlockOutputStreamPtr output = std::make_shared<LiveViewBlockOutputStream>(live_view);
|
||||||
auto block_context = std::make_unique<Context>(context.getGlobalContext());
|
|
||||||
block_context->makeQueryContext();
|
|
||||||
|
|
||||||
/// Check if live view has any readers if not
|
/// Check if live view has any readers if not
|
||||||
/// just reset blocks to empty and do nothing else
|
/// just reset blocks to empty and do nothing else
|
||||||
@ -120,55 +179,42 @@ void StorageLiveView::writeIntoLiveView(
|
|||||||
|
|
||||||
bool is_block_processed = false;
|
bool is_block_processed = false;
|
||||||
BlockInputStreams from;
|
BlockInputStreams from;
|
||||||
BlocksPtrs mergeable_blocks;
|
MergeableBlocksPtr mergeable_blocks;
|
||||||
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
|
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
|
||||||
ASTPtr mergeable_query = live_view.getInnerQuery();
|
|
||||||
|
|
||||||
if (live_view.getInnerSubQuery())
|
|
||||||
mergeable_query = live_view.getInnerSubQuery();
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(live_view.mutex);
|
std::lock_guard lock(live_view.mutex);
|
||||||
|
|
||||||
mergeable_blocks = live_view.getMergeableBlocks();
|
mergeable_blocks = live_view.getMergeableBlocks();
|
||||||
if (!mergeable_blocks || mergeable_blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
|
if (!mergeable_blocks || mergeable_blocks->blocks->size() >= context.getGlobalContext().getSettingsRef().max_live_view_insert_blocks_before_refresh)
|
||||||
{
|
{
|
||||||
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
|
mergeable_blocks = live_view.collectMergeableBlocks(context);
|
||||||
BlocksPtr base_mergeable_blocks = std::make_shared<Blocks>();
|
|
||||||
InterpreterSelectQuery interpreter(mergeable_query, context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
|
|
||||||
auto view_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
|
|
||||||
interpreter.execute().in);
|
|
||||||
while (Block this_block = view_mergeable_stream->read())
|
|
||||||
base_mergeable_blocks->push_back(this_block);
|
|
||||||
mergeable_blocks->push_back(base_mergeable_blocks);
|
|
||||||
live_view.setMergeableBlocks(mergeable_blocks);
|
live_view.setMergeableBlocks(mergeable_blocks);
|
||||||
|
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
|
||||||
/// Create from streams
|
|
||||||
for (auto & blocks_ : *mergeable_blocks)
|
|
||||||
{
|
|
||||||
if (blocks_->empty())
|
|
||||||
continue;
|
|
||||||
auto sample_block = blocks_->front().cloneEmpty();
|
|
||||||
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
|
|
||||||
from.push_back(std::move(stream));
|
|
||||||
}
|
|
||||||
|
|
||||||
is_block_processed = true;
|
is_block_processed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto parent_storage = context.getTable(live_view.select_table_id);
|
|
||||||
auto blocks_storage_id = live_view.getBlocksStorageID();
|
auto blocks_storage_id = live_view.getBlocksStorageID();
|
||||||
|
|
||||||
if (!is_block_processed)
|
if (!is_block_processed)
|
||||||
{
|
{
|
||||||
|
ASTPtr mergeable_query = live_view.getInnerQuery();
|
||||||
|
|
||||||
|
if (live_view.getInnerSubQuery())
|
||||||
|
mergeable_query = live_view.getInnerSubQuery();
|
||||||
|
|
||||||
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
|
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
|
||||||
|
|
||||||
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id,
|
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id,
|
||||||
parent_storage->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
|
live_view.getParentStorage()->getColumns(), std::move(streams), QueryProcessingStage::FetchColumns);
|
||||||
|
|
||||||
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
|
InterpreterSelectQuery select_block(mergeable_query, context, blocks_storage,
|
||||||
QueryProcessingStage::WithMergeableState);
|
QueryProcessingStage::WithMergeableState);
|
||||||
|
|
||||||
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
|
auto data_mergeable_stream = std::make_shared<MaterializingBlockInputStream>(
|
||||||
select_block.execute().in);
|
select_block.execute().in);
|
||||||
|
|
||||||
while (Block this_block = data_mergeable_stream->read())
|
while (Block this_block = data_mergeable_stream->read())
|
||||||
new_mergeable_blocks->push_back(this_block);
|
new_mergeable_blocks->push_back(this_block);
|
||||||
|
|
||||||
@ -179,32 +225,12 @@ void StorageLiveView::writeIntoLiveView(
|
|||||||
std::lock_guard lock(live_view.mutex);
|
std::lock_guard lock(live_view.mutex);
|
||||||
|
|
||||||
mergeable_blocks = live_view.getMergeableBlocks();
|
mergeable_blocks = live_view.getMergeableBlocks();
|
||||||
mergeable_blocks->push_back(new_mergeable_blocks);
|
mergeable_blocks->blocks->push_back(new_mergeable_blocks);
|
||||||
|
from = live_view.blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
|
||||||
/// Create from streams
|
|
||||||
for (auto & blocks_ : *mergeable_blocks)
|
|
||||||
{
|
|
||||||
if (blocks_->empty())
|
|
||||||
continue;
|
|
||||||
auto sample_block = blocks_->front().cloneEmpty();
|
|
||||||
BlockInputStreamPtr stream = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(blocks_), sample_block);
|
|
||||||
from.push_back(std::move(stream));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, parent_storage->getColumns(), std::move(from), QueryProcessingStage::WithMergeableState);
|
BlockInputStreamPtr data = live_view.completeQuery(from);
|
||||||
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
|
|
||||||
|
|
||||||
InterpreterSelectQuery select(live_view.getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
|
|
||||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
|
||||||
|
|
||||||
/// Squashing is needed here because the view query can generate a lot of blocks
|
|
||||||
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
|
|
||||||
/// and two-level aggregation is triggered).
|
|
||||||
data = std::make_shared<SquashingBlockInputStream>(
|
|
||||||
data, context.getGlobalContext().getSettingsRef().min_insert_block_size_rows, context.getGlobalContext().getSettingsRef().min_insert_block_size_bytes);
|
|
||||||
|
|
||||||
copyData(*data, *output);
|
copyData(*data, *output);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -237,6 +263,8 @@ StorageLiveView::StorageLiveView(
|
|||||||
|
|
||||||
global_context.addDependency(select_table_id, table_id_);
|
global_context.addDependency(select_table_id, table_id_);
|
||||||
|
|
||||||
|
parent_storage = local_context.getTable(select_table_id);
|
||||||
|
|
||||||
is_temporary = query.temporary;
|
is_temporary = query.temporary;
|
||||||
temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();
|
temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();
|
||||||
|
|
||||||
@ -288,37 +316,10 @@ bool StorageLiveView::getNewBlocks()
|
|||||||
UInt128 key;
|
UInt128 key;
|
||||||
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
BlocksPtr new_blocks = std::make_shared<Blocks>();
|
||||||
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
|
BlocksMetadataPtr new_blocks_metadata = std::make_shared<BlocksMetadata>();
|
||||||
BlocksPtr new_mergeable_blocks = std::make_shared<Blocks>();
|
|
||||||
ASTPtr mergeable_query = inner_query;
|
|
||||||
|
|
||||||
if (inner_subquery)
|
mergeable_blocks = collectMergeableBlocks(*live_view_context);
|
||||||
mergeable_query = inner_subquery;
|
BlockInputStreams from = blocksToInputStreams(mergeable_blocks->blocks, mergeable_blocks->sample_block);
|
||||||
|
BlockInputStreamPtr data = completeQuery({from});
|
||||||
InterpreterSelectQuery interpreter(mergeable_query->clone(), *live_view_context, SelectQueryOptions(QueryProcessingStage::WithMergeableState), Names());
|
|
||||||
auto mergeable_stream = std::make_shared<MaterializingBlockInputStream>(interpreter.execute().in);
|
|
||||||
|
|
||||||
while (Block block = mergeable_stream->read())
|
|
||||||
new_mergeable_blocks->push_back(block);
|
|
||||||
|
|
||||||
auto block_context = std::make_unique<Context>(global_context);
|
|
||||||
block_context->makeQueryContext();
|
|
||||||
|
|
||||||
mergeable_blocks = std::make_shared<std::vector<BlocksPtr>>();
|
|
||||||
mergeable_blocks->push_back(new_mergeable_blocks);
|
|
||||||
BlockInputStreamPtr from = std::make_shared<BlocksBlockInputStream>(std::make_shared<BlocksPtr>(new_mergeable_blocks), mergeable_stream->getHeader());
|
|
||||||
|
|
||||||
auto blocks_storage_id = getBlocksStorageID();
|
|
||||||
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, global_context.getTable(select_table_id)->getColumns(), {from}, QueryProcessingStage::WithMergeableState);
|
|
||||||
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
|
|
||||||
|
|
||||||
InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
|
|
||||||
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
|
|
||||||
|
|
||||||
/// Squashing is needed here because the view query can generate a lot of blocks
|
|
||||||
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY
|
|
||||||
/// and two-level aggregation is triggered).
|
|
||||||
data = std::make_shared<SquashingBlockInputStream>(
|
|
||||||
data, global_context.getSettingsRef().min_insert_block_size_rows, global_context.getSettingsRef().min_insert_block_size_bytes);
|
|
||||||
|
|
||||||
while (Block block = data->read())
|
while (Block block = data->read())
|
||||||
{
|
{
|
||||||
|
@ -27,9 +27,16 @@ struct BlocksMetadata
|
|||||||
UInt64 version;
|
UInt64 version;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct MergeableBlocks
|
||||||
|
{
|
||||||
|
BlocksPtrs blocks;
|
||||||
|
Block sample_block;
|
||||||
|
};
|
||||||
|
|
||||||
class IAST;
|
class IAST;
|
||||||
using ASTPtr = std::shared_ptr<IAST>;
|
using ASTPtr = std::shared_ptr<IAST>;
|
||||||
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
|
using BlocksMetadataPtr = std::shared_ptr<BlocksMetadata>;
|
||||||
|
using MergeableBlocksPtr = std::shared_ptr<MergeableBlocks>;
|
||||||
|
|
||||||
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
|
class StorageLiveView : public ext::shared_ptr_helper<StorageLiveView>, public IStorage
|
||||||
{
|
{
|
||||||
@ -46,6 +53,7 @@ public:
|
|||||||
{
|
{
|
||||||
return StorageID("", getStorageID().table_name + "_blocks");
|
return StorageID("", getStorageID().table_name + "_blocks");
|
||||||
}
|
}
|
||||||
|
StoragePtr getParentStorage() const { return parent_storage; }
|
||||||
|
|
||||||
NameAndTypePair getColumn(const String & column_name) const override;
|
NameAndTypePair getColumn(const String & column_name) const override;
|
||||||
bool hasColumn(const String & column_name) const override;
|
bool hasColumn(const String & column_name) const override;
|
||||||
@ -139,8 +147,14 @@ public:
|
|||||||
unsigned num_streams) override;
|
unsigned num_streams) override;
|
||||||
|
|
||||||
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
|
std::shared_ptr<BlocksPtr> getBlocksPtr() { return blocks_ptr; }
|
||||||
BlocksPtrs getMergeableBlocks() { return mergeable_blocks; }
|
MergeableBlocksPtr getMergeableBlocks() { return mergeable_blocks; }
|
||||||
void setMergeableBlocks(BlocksPtrs blocks) { mergeable_blocks = blocks; }
|
|
||||||
|
/// Collect mergeable blocks and their sample. Must be called holding mutex
|
||||||
|
MergeableBlocksPtr collectMergeableBlocks(const Context & context);
|
||||||
|
/// Complete query using input streams from mergeable blocks
|
||||||
|
BlockInputStreamPtr completeQuery(BlockInputStreams from);
|
||||||
|
|
||||||
|
void setMergeableBlocks(MergeableBlocksPtr blocks) { mergeable_blocks = blocks; }
|
||||||
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
|
std::shared_ptr<bool> getActivePtr() { return active_ptr; }
|
||||||
|
|
||||||
/// Read new data blocks that store query result
|
/// Read new data blocks that store query result
|
||||||
@ -148,6 +162,9 @@ public:
|
|||||||
|
|
||||||
Block getHeader() const;
|
Block getHeader() const;
|
||||||
|
|
||||||
|
/// convert blocks to input streams
|
||||||
|
static BlockInputStreams blocksToInputStreams(BlocksPtrs blocks, Block & sample_block);
|
||||||
|
|
||||||
static void writeIntoLiveView(
|
static void writeIntoLiveView(
|
||||||
StorageLiveView & live_view,
|
StorageLiveView & live_view,
|
||||||
const Block & block,
|
const Block & block,
|
||||||
@ -160,6 +177,7 @@ private:
|
|||||||
ASTPtr inner_blocks_query; /// query over the mergeable blocks to produce final result
|
ASTPtr inner_blocks_query; /// query over the mergeable blocks to produce final result
|
||||||
Context & global_context;
|
Context & global_context;
|
||||||
std::unique_ptr<Context> live_view_context;
|
std::unique_ptr<Context> live_view_context;
|
||||||
|
StoragePtr parent_storage;
|
||||||
|
|
||||||
bool is_temporary = false;
|
bool is_temporary = false;
|
||||||
/// Mutex to protect access to sample block
|
/// Mutex to protect access to sample block
|
||||||
@ -178,7 +196,7 @@ private:
|
|||||||
std::shared_ptr<BlocksPtr> blocks_ptr;
|
std::shared_ptr<BlocksPtr> blocks_ptr;
|
||||||
/// Current data blocks metadata
|
/// Current data blocks metadata
|
||||||
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
std::shared_ptr<BlocksMetadataPtr> blocks_metadata_ptr;
|
||||||
BlocksPtrs mergeable_blocks;
|
MergeableBlocksPtr mergeable_blocks;
|
||||||
|
|
||||||
/// Background thread for temporary tables
|
/// Background thread for temporary tables
|
||||||
/// which drops this table if there are no users
|
/// which drops this table if there are no users
|
||||||
|
Loading…
Reference in New Issue
Block a user