fix LiveView dependencies

This commit is contained in:
Alexander Tokmakov 2020-01-24 23:14:42 +03:00
parent b3d34da57b
commit f9cfc6be3a
6 changed files with 47 additions and 19 deletions

View File

@ -140,12 +140,12 @@ BlockInputStreamPtr StorageLiveView::completeQuery(BlockInputStreams from)
block_context->makeQueryContext();
auto blocks_storage_id = getBlocksStorageID();
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, parent_storage->getColumns(),
auto blocks_storage = StorageBlocks::createStorage(blocks_storage_id, getParentStorage()->getColumns(),
std::move(from), QueryProcessingStage::WithMergeableState);
block_context->addExternalTable(blocks_storage_id.table_name, blocks_storage);
InterpreterSelectQuery select(inner_blocks_query->clone(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
InterpreterSelectQuery select(getInnerBlocksQuery(), *block_context, StoragePtr(), SelectQueryOptions(QueryProcessingStage::Complete));
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the view query can generate a lot of blocks
@ -255,16 +255,12 @@ StorageLiveView::StorageLiveView(
throw Exception("UNION is not supported for LIVE VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW);
inner_query = query.select->list_of_selects->children.at(0);
inner_blocks_query = inner_query->clone();
InterpreterSelectQuery(inner_blocks_query, *live_view_context, SelectQueryOptions().modify().analyze());
select_table_id = extractDependentTable(inner_blocks_query, global_context, table_id_.table_name, inner_subquery);
auto inner_query_tmp = inner_query->clone();
select_table_id = extractDependentTable(inner_query_tmp, global_context, table_id_.table_name, inner_subquery);
global_context.addDependency(select_table_id, table_id_);
parent_storage = local_context.getTable(select_table_id);
is_temporary = query.temporary;
temporary_live_view_timeout = local_context.getSettingsRef().temporary_live_view_timeout.totalSeconds();
@ -310,6 +306,22 @@ Block StorageLiveView::getHeader() const
return sample_block;
}
ASTPtr StorageLiveView::getInnerBlocksQuery()
{
std::lock_guard lock(sample_block_lock);
if (!inner_blocks_query)
{
inner_blocks_query = inner_query->clone();
/// Rewrite inner query with right aliases for JOIN.
/// It cannot be done in constructor or startup() because InterpreterSelectQuery may access table,
/// which is not loaded yet during server startup, so we do it lazily
InterpreterSelectQuery(inner_blocks_query, *live_view_context, SelectQueryOptions().modify().analyze());
auto table_id = getStorageID();
extractDependentTable(inner_blocks_query, global_context, table_id.table_name, inner_subquery);
}
return inner_blocks_query->clone();
}
bool StorageLiveView::getNewBlocks()
{
SipHash hash;
@ -458,6 +470,7 @@ void StorageLiveView::startup()
void StorageLiveView::shutdown()
{
global_context.removeDependency(select_table_id, getStorageID());
bool expected = false;
if (!shutdown_called.compare_exchange_strong(expected, true))
return;

View File

@ -53,7 +53,7 @@ public:
{
return StorageID("", getStorageID().table_name + "_blocks");
}
StoragePtr getParentStorage() const { return parent_storage; }
StoragePtr getParentStorage() const { return global_context.getTable(select_table_id); }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;
@ -65,12 +65,7 @@ public:
return inner_subquery->clone();
return nullptr;
}
ASTPtr getInnerBlocksQuery() const
{
if (inner_blocks_query)
return inner_blocks_query->clone();
return nullptr;
}
ASTPtr getInnerBlocksQuery();
/// It is passed inside the query and solved at its level.
bool supportsSampling() const override { return true; }
@ -177,10 +172,9 @@ private:
ASTPtr inner_blocks_query; /// query over the mergeable blocks to produce final result
Context & global_context;
std::unique_ptr<Context> live_view_context;
StoragePtr parent_storage;
bool is_temporary = false;
/// Mutex to protect access to sample block
/// Mutex to protect access to sample block and inner_blocks_query
mutable std::mutex sample_block_lock;
mutable Block sample_block;

View File

@ -1,3 +1,4 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
@ -6,7 +7,7 @@ from helpers.cluster import ClickHouseCluster
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('dummy', clickhouse_path_dir='clickhouse_path')
instance = cluster.add_instance('dummy', clickhouse_path_dir='clickhouse_path', stay_alive=True)
cluster.start()
cluster_fail = ClickHouseCluster(__file__, name='fail')
@ -34,3 +35,14 @@ def test_partially_dropped_tables(started_cluster):
"./var/lib/clickhouse/metadata/default/sophisticated_default.sql\n"
assert instance.query("SELECT n FROM should_be_restored") == "1\n2\n3\n"
assert instance.query("SELECT count() FROM system.tables WHERE name='should_be_dropped'") == "0\n"
def test_live_view_dependency(started_cluster):
instance = started_cluster.instances['dummy']
instance.query("CREATE DATABASE a_load_first")
instance.query("CREATE DATABASE b_load_second")
instance.query("CREATE TABLE b_load_second.mt (a Int32) Engine=MergeTree order by tuple()")
instance.query("CREATE LIVE VIEW a_load_first.lv AS SELECT sum(a) FROM b_load_second.mt", settings={'allow_experimental_live_view': 1})
instance.restart_clickhouse()
time.sleep(5)
instance.query("SELECT 1")

View File

@ -1,4 +1,5 @@
CREATE TABLE IF NOT EXISTS test Engine = MergeTree ORDER BY number AS SELECT number, toString(rand()) x from numbers(10000000);
DROP TABLE IF EXISTS test;
CREATE TABLE test Engine = MergeTree ORDER BY number AS SELECT number, toString(rand()) x from numbers(10000000);
SELECT count() FROM test;

View File

@ -0,0 +1,8 @@
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test;
DROP TABLE IF EXISTS lv;
CREATE TABLE test (n Int8) ENGINE = Memory;
CREATE LIVE VIEW lv AS SELECT * FROM test;
DETACH TABLE lv;
INSERT INTO test VALUES (42);
DROP TABLE test;