Modify Materialized View query

This commit is contained in:
Nicolae Vartolomei 2020-01-31 17:12:18 +00:00
parent c2c5b81b70
commit 0939a9460f
9 changed files with 205 additions and 29 deletions

View File

@ -8,7 +8,6 @@
#include <Parsers/ASTInsertQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
@ -51,8 +50,10 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
ASTPtr query;
BlockOutputStreamPtr out;
if (auto * materialized_view = dynamic_cast<const StorageMaterializedView *>(dependent_table.get()))
if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(dependent_table.get()))
{
addTableLock(materialized_view->lockStructureForShare(true, context.getInitialQueryId()));
StoragePtr inner_table = materialized_view->getTargetTable();
auto inner_table_id = inner_table->getStorageID();
query = materialized_view->getInnerQuery();

View File

@ -252,20 +252,23 @@ void DatabaseOrdinary::alterTable(
ast->replace(ast_create_query.select, metadata.select);
}
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
/// MaterializedView is one type of CREATE query without storage.
if (ast_create_query.storage)
{
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
}
statement = getObjectDefinitionFromCreateQuery(ast);
{

View File

@ -37,7 +37,7 @@ static inline String generateInnerTableName(const String & table_name)
return ".inner." + table_name;
}
static StorageID extractDependentTableFromSelectQuery(ASTSelectQuery & query, Context & context, bool add_default_db = true)
static StorageID extractDependentTableFromSelectQuery(ASTSelectQuery & query, const Context & context, bool add_default_db = true)
{
if (add_default_db)
{
@ -117,22 +117,23 @@ StorageMaterializedView::StorageMaterializedView(
inner_query = query.select->list_of_selects->children.at(0);
auto & select_query = inner_query->as<ASTSelectQuery &>();
select_table_id = extractDependentTableFromSelectQuery(select_query, local_context);
checkAllowedQueries(select_query);
select_table_id = extractDependentTableFromSelectQuery(select_query, local_context);
if (!has_inner_table)
target_table_id = StorageID(query.to_database, query.to_table);
else if (attach_)
{
/// If there is an ATTACH request, then the internal table must already be created.
target_table_id = StorageID(table_id_.database_name, generateInnerTableName(table_id_.table_name));
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID().table_name));
}
else
{
/// We will create a query to create an internal table.
auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = table_id_.database_name;
manual_create_query->table = generateInnerTableName(table_id_.table_name);
manual_create_query->database = getStorageID().database_name;
manual_create_query->table = generateInnerTableName(getStorageID().table_name);
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());
@ -148,7 +149,7 @@ StorageMaterializedView::StorageMaterializedView(
}
if (!select_table_id.empty())
global_context.addDependency(select_table_id, table_id_);
global_context.addDependency(select_table_id, getStorageID());
}
NameAndTypePair StorageMaterializedView::getColumn(const String & column_name) const
@ -261,6 +262,37 @@ void StorageMaterializedView::alter(
auto table_id = getStorageID();
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
/// start modify query
if (context.getSettingsRef().allow_experimental_alter_materialized_view_structure)
{
auto & new_select = metadata.select->as<ASTSelectWithUnionQuery &>();
if (new_select.list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for MATERIALIZED VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_MATERIALIZED_VIEW);
auto & new_inner_query = new_select.list_of_selects->children.at(0);
auto & select_query = new_inner_query->as<ASTSelectQuery &>();
checkAllowedQueries(select_query);
auto new_select_table_id = extractDependentTableFromSelectQuery(select_query, context);
{
auto context_lock = global_context.getLock();
if (!select_table_id.empty())
global_context.removeDependency(select_table_id, getStorageID());
if (!new_select_table_id.empty())
global_context.addDependency(new_select_table_id, getStorageID());
select_table_id = new_select_table_id;
select = metadata.select;
inner_query = new_inner_query;
}
}
/// end modify query
context.getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
setColumns(std::move(metadata.columns));
}
@ -270,7 +302,13 @@ void StorageMaterializedView::checkAlterIsPossible(const AlterCommands & command
{
if (settings.allow_experimental_alter_materialized_view_structure)
{
throw Exception("work in progress", ErrorCodes::NOT_IMPLEMENTED);
for (const auto & command : commands)
{
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
else
{

View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery <<EOF
DROP TABLE IF EXISTS src;
DROP TABLE IF EXISTS mv;
CREATE TABLE src(v UInt64) ENGINE = Null;
CREATE MATERIALIZED VIEW mv(v UInt8) Engine = MergeTree() ORDER BY v AS SELECT v FROM src;
EOF
# Test that ALTER doesn't cause data loss or duplication.
#
# Idea for future:
#
# null
# / \
# mv1 mv2
# \ /
# \ /
# mv sink
#
# Insert N times into null while altering sink query and switching it from mv1 to mv2.
function alter_thread()
{
trap 'exit' INT
ALTERS[0]="ALTER TABLE mv MODIFY QUERY SELECT v FROM src;"
ALTERS[1]="ALTER TABLE mv MODIFY QUERY SELECT v * 2 as v FROM src;"
while true; do
$CLICKHOUSE_CLIENT --allow_experimental_alter_materialized_view_structure=1 -q "${ALTERS[$RANDOM % 2]}"
sleep `echo 0.$RANDOM`;
done
}
alter_thread &
alter_pid=$!
for i in $(seq 1 100); do
(
# Retry (hopefully retriable (deadlock avoided)) errors.
until false; do
$CLICKHOUSE_CLIENT -q "INSERT INTO src VALUES (1);" 2>/dev/null && break
done
)
done
# Enough alters.
kill -INT $alter_pid
wait
# This was a fun ride.
$CLICKHOUSE_CLIENT -q "SELECT count() FROM mv;"

View File

@ -0,0 +1 @@
inconsistencies 0

View File

@ -0,0 +1,75 @@
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --multiquery <<EOF
DROP TABLE IF EXISTS src_a;
DROP TABLE IF EXISTS src_b;
DROP TABLE IF EXISTS mv;
CREATE TABLE src_a(v UInt64) ENGINE = Null;
CREATE TABLE src_b(v UInt64) ENGINE = Null;
CREATE MATERIALIZED VIEW mv(test UInt8, case UInt8)
Engine = MergeTree()
ORDER BY test AS
SELECT v == 1 as test, v as case FROM src_a;
EOF
# The purpose of this test is to ensure that MV query A is always used against source table A. Same for query/table B.
# Also, helps detect data races.
function insert_thread() {
trap 'exit' INT
INSERT[0]="INSERT INTO TABLE src_a VALUES (1);"
INSERT[1]="INSERT INTO TABLE src_b VALUES (2);"
while true; do
# trigger 100 concurrent inserts at a time
for i in $(seq 1 100); do
# ignore `Possible deadlock avoided. Client should retry`
$CLICKHOUSE_CLIENT -q "${INSERT[$RANDOM % 2]}" 2>/dev/null &
done
wait
done
}
function alter_thread() {
trap 'exit' INT
ALTER[0]="ALTER TABLE mv MODIFY QUERY SELECT v == 1 as test, v as case FROM src_a;"
ALTER[1]="ALTER TABLE mv MODIFY QUERY SELECT v == 2 as test, v as case FROM src_b;"
while true; do
$CLICKHOUSE_CLIENT --allow_experimental_alter_materialized_view_structure=1 \
-q "${ALTER[$RANDOM % 2]}"
sleep "0.0$RANDOM"
done
}
insert_thread &
insert_thread_pid=$!
alter_thread &
alter_thread_pid=$!
while true; do
is_done=$($CLICKHOUSE_CLIENT -q "SELECT countIf(case = 1) > 0 AND countIf(case = 2) > 0 FROM mv;")
if [ "$is_done" -eq "1" ]; then
break
fi
done
kill -INT $insert_thread_pid
kill -INT $alter_thread_pid
wait
$CLICKHOUSE_CLIENT -q "SELECT 'inconsistencies', count() FROM mv WHERE test == 0;"

View File

@ -1,13 +1,13 @@
1
1
2
2
3
3
1 0
4
6
1 0
2 0
2 0
3 0
3 0
42 0
4 0
6 0
84 1

View File

@ -1,5 +1,3 @@
-- Just testing syntax for now.
DROP TABLE IF EXISTS src;
DROP TABLE IF EXISTS dest;
DROP TABLE IF EXISTS pipe;
@ -15,13 +13,12 @@ INSERT INTO src VALUES (1), (2), (3);
SET allow_experimental_alter_materialized_view_structure = 1;
-- Live alter which changes query logic and adds an extra column.
-- This is not implemented yet and this test is just a draft.
ALTER TABLE pipe
MODIFY QUERY
SELECT
v * 2 as v,
1 as v2
FROM src; -- { serverError 48 }
FROM src;
INSERT INTO src VALUES (1), (2), (3);