Merge pull request #35132 from azat/parallel_distributed_insert_select-view

Support view() for parallel_distributed_insert_select
This commit is contained in:
Nikolai Kochetov 2022-03-09 09:10:34 +01:00 committed by GitHub
commit 6bfee7aca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 100 additions and 22 deletions

View File

@ -144,7 +144,6 @@ list (APPEND dbms_sources
AggregateFunctions/AggregateFunctionState.cpp AggregateFunctions/AggregateFunctionState.cpp
AggregateFunctions/AggregateFunctionCount.cpp AggregateFunctions/AggregateFunctionCount.cpp
AggregateFunctions/parseAggregateFunctionParameters.cpp) AggregateFunctions/parseAggregateFunctionParameters.cpp)
list (APPEND dbms_headers list (APPEND dbms_headers
AggregateFunctions/IAggregateFunction.h AggregateFunctions/IAggregateFunction.h
AggregateFunctions/IAggregateFunctionCombinator.h AggregateFunctions/IAggregateFunctionCombinator.h
@ -155,10 +154,25 @@ list (APPEND dbms_headers
AggregateFunctions/FactoryHelpers.h AggregateFunctions/FactoryHelpers.h
AggregateFunctions/parseAggregateFunctionParameters.h) AggregateFunctions/parseAggregateFunctionParameters.h)
list (APPEND dbms_sources TableFunctions/ITableFunction.cpp TableFunctions/TableFunctionFactory.cpp) list (APPEND dbms_sources
list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/TableFunctionFactory.h) TableFunctions/ITableFunction.cpp
list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp) TableFunctions/TableFunctionView.cpp
list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h) TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers
TableFunctions/ITableFunction.h
TableFunctions/TableFunctionView.h
TableFunctions/TableFunctionFactory.h)
list (APPEND dbms_sources
Dictionaries/DictionaryFactory.cpp
Dictionaries/DictionarySourceFactory.cpp
Dictionaries/DictionaryStructure.cpp
Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers
Dictionaries/DictionaryFactory.h
Dictionaries/DictionarySourceFactory.h
Dictionaries/DictionaryStructure.h
Dictionaries/getDictionaryConfigurationFromAST.h)
if (NOT ENABLE_SSL) if (NOT ENABLE_SSL)
list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp) list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp)
@ -253,18 +267,16 @@ if (TARGET ch_contrib::nuraft)
add_object_library(clickhouse_coordination Coordination) add_object_library(clickhouse_coordination Coordination)
endif() endif()
set (DBMS_COMMON_LIBRARIES)
if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES) if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library (dbms STATIC ${dbms_headers} ${dbms_sources}) add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES}) target_link_libraries (dbms PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc) if (TARGET ch_contrib::jemalloc)
target_link_libraries (dbms PRIVATE ch_contrib::jemalloc) target_link_libraries (dbms PRIVATE ch_contrib::jemalloc)
endif() endif()
set (all_modules dbms) set (all_modules dbms)
else() else()
add_library (dbms SHARED ${dbms_headers} ${dbms_sources}) add_library (dbms SHARED ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PUBLIC ${all_modules} ${DBMS_COMMON_LIBRARIES}) target_link_libraries (dbms PUBLIC ${all_modules})
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::libdivide) target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc) if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::jemalloc) target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::jemalloc)

View File

@ -56,6 +56,8 @@
#include <Interpreters/getClusterName.h> #include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h> #include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Processors/Executors/PushingPipelineExecutor.h> #include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
@ -723,15 +725,27 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage()); storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src) if (storage_src)
{ {
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>(); /// Unwrap view() function.
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>(); if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone()); auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query); select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName()); new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query; new_query->select = select_with_union_query;
}
} }
} }
} }

View File

@ -4,14 +4,18 @@ if (TARGET ch_contrib::hivemetastore)
add_headers_and_sources(clickhouse_table_functions Hive) add_headers_and_sources(clickhouse_table_functions Hive)
endif () endif ()
list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunctionFactory.cpp) list(REMOVE_ITEM clickhouse_table_functions_sources
list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h) ITableFunction.cpp
TableFunctionView.cpp
TableFunctionFactory.cpp)
list(REMOVE_ITEM clickhouse_table_functions_headers
ITableFunction.h
TableFunctionView.h
TableFunctionFactory.h)
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources}) add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
if (TARGET ch_contrib::hivemetastore) if (TARGET ch_contrib::hivemetastore)
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms ch_contrib::hivemetastore ch_contrib::hdfs) target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs)
else ()
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
endif () endif ()

View File

@ -15,6 +15,12 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
const ASTSelectWithUnionQuery & TableFunctionView::getSelectQuery() const
{
return *create.select;
}
void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/) void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/)
{ {
const auto * function = ast_function->as<ASTFunction>(); const auto * function = ast_function->as<ASTFunction>();

View File

@ -16,6 +16,9 @@ class TableFunctionView : public ITableFunction
public: public:
static constexpr auto name = "view"; static constexpr auto name = "view";
std::string getName() const override { return name; } std::string getName() const override { return name; }
const ASTSelectWithUnionQuery & getSelectQuery() const;
private: private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override; StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "View"; } const char * getStorageTypeName() const override { return "View"; }

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# NOTE: sh test is required since view() does not have current database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists dst_02225;
drop table if exists src_02225;
create table dst_02225 (key Int) engine=Memory();
create table src_02225 (key Int) engine=Memory();
insert into src_02225 values (1);
"
$CLICKHOUSE_CLIENT --param_database=$CLICKHOUSE_DATABASE -nm -q "
truncate table dst_02225;
insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key)
select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225), key)
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02225;
-- w/o sharding key
truncate table dst_02225;
insert into function remote('127.{1,2}', currentDatabase(), dst_02225, key)
select * from remote('127.{1,2}', view(select * from {database:Identifier}.src_02225))
settings parallel_distributed_insert_select=2, max_distributed_depth=1;
select * from dst_02225;
"
$CLICKHOUSE_CLIENT -nm -q "
drop table src_02225;
drop table dst_02225;
"