This commit is contained in:
Konstantin Bogdanov 2024-11-21 07:49:27 +01:00 committed by GitHub
commit aab1ee9cc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 188 additions and 59 deletions

View File

@ -5653,6 +5653,9 @@ Parts virtually divided into segments to be distributed between replicas for par
DECLARE(Bool, parallel_replicas_local_plan, true, R"( DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica Build local plan for local replica
)", BETA) \ )", BETA) \
DECLARE(Bool, parallel_replicas_for_cluster_engines, true, R"(
Replace table function engines with their -Cluster alternatives
)", EXPERIMENTAL) \
\ \
DECLARE(Bool, allow_experimental_analyzer, true, R"( DECLARE(Bool, allow_experimental_analyzer, true, R"(
Allow new query analyzer. Allow new query analyzer.

View File

@ -60,6 +60,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{ {
{"24.12", {"24.12",
{ {
{"parallel_replicas_for_cluster_engines", true, true, "New setting"},
} }
}, },
{"24.11", {"24.11",

View File

@ -125,7 +125,7 @@ void IStorageCluster::read(
storage_snapshot->check(column_names); storage_snapshot->check(column_names);
updateBeforeRead(context); updateBeforeRead(context);
auto cluster = getCluster(context); auto cluster = context->getCluster(cluster_name);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

View File

@ -10,7 +10,7 @@
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <Storages/ObjectStorage/Utils.h> #include <Storages/ObjectStorage/Utils.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h> #include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h> #include <Storages/extractTableFunctionFromSelectQuery.h>
namespace DB namespace DB
@ -86,7 +86,16 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
const DB::StorageSnapshotPtr & storage_snapshot, const DB::StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) const ContextPtr & context)
{ {
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); auto * table_function = extractTableFunctionFromSelectQuery(query);
if (!table_function)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Expected SELECT query from table function {}, got '{}'",
configuration->getEngineName(), queryToString(query));
}
auto * expression_list = table_function->arguments->as<ASTExpressionList>();
if (!expression_list) if (!expression_list)
{ {
throw Exception( throw Exception(
@ -105,10 +114,18 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
configuration->getEngineName()); configuration->getEngineName());
} }
ASTPtr cluster_name_arg = args.front(); if (table_function->name == configuration->getTypeName())
args.erase(args.begin()); configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context);
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context); else if (table_function->name == fmt::format("{}Cluster", configuration->getTypeName()))
args.insert(args.begin(), cluster_name_arg); {
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
configuration->addStructureAndFormatToArgsIfNeeded(args, structure, configuration->format, context);
args.insert(args.begin(), cluster_name_arg);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name);
} }
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension( RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(

View File

@ -9,7 +9,7 @@
#include <Storages/StorageFileCluster.h> #include <Storages/StorageFileCluster.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/StorageFile.h> #include <Storages/StorageFile.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h> #include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/TableFunctionFileCluster.h> #include <TableFunctions/TableFunctionFileCluster.h>
@ -19,11 +19,6 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
StorageFileCluster::StorageFileCluster( StorageFileCluster::StorageFileCluster(
const ContextPtr & context, const ContextPtr & context,
const String & cluster_name_, const String & cluster_name_,
@ -66,12 +61,14 @@ StorageFileCluster::StorageFileCluster(
void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const DB::ContextPtr & context) void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const DB::ContextPtr & context)
{ {
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); auto * table_function = extractTableFunctionFromSelectQuery(query);
if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function fileCluster, got '{}'", queryToString(query));
TableFunctionFileCluster::updateStructureAndFormatArgumentsIfNeeded( TableFunctionFileCluster::updateStructureAndFormatArgumentsIfNeeded(
expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); table_function,
storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(),
format_name,
context
);
} }
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const

View File

@ -20,7 +20,7 @@
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h> #include <Storages/SelectQueryInfo.h>
#include <Storages/StorageURL.h> #include <Storages/StorageURL.h>
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h> #include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/TableFunctionURLCluster.h> #include <TableFunctions/TableFunctionURLCluster.h>
@ -30,16 +30,17 @@
namespace DB namespace DB
{ {
namespace Setting
{
extern const SettingsUInt64 glob_expansion_max_elements;
}
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
namespace Setting
{
extern const SettingsUInt64 glob_expansion_max_elements;
}
StorageURLCluster::StorageURLCluster( StorageURLCluster::StorageURLCluster(
const ContextPtr & context, const ContextPtr & context,
const String & cluster_name_, const String & cluster_name_,
@ -86,12 +87,20 @@ StorageURLCluster::StorageURLCluster(
void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context)
{ {
ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(query); auto * table_function = extractTableFunctionFromSelectQuery(query);
if (!table_function)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query));
auto * expression_list = table_function->arguments->as<ASTExpressionList>();
if (!expression_list) if (!expression_list)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query)); throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function urlCluster, got '{}'", queryToString(query));
TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded( TableFunctionURLCluster::updateStructureAndFormatArgumentsIfNeeded(
expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context); table_function,
storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(),
format_name,
context
);
} }
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const

View File

@ -1,7 +1,6 @@
#include <Storages/extractTableFunctionArgumentsFromSelectQuery.h> #include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
@ -11,7 +10,7 @@
namespace DB namespace DB
{ {
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query) ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query)
{ {
auto * select_query = query->as<ASTSelectQuery>(); auto * select_query = query->as<ASTSelectQuery>();
if (!select_query || !select_query->tables()) if (!select_query || !select_query->tables())
@ -22,8 +21,7 @@ ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query)
if (!table_expression->table_function) if (!table_expression->table_function)
return nullptr; return nullptr;
auto * table_function = table_expression->table_function->as<ASTFunction>(); return table_expression->table_function->as<ASTFunction>();
return table_function->arguments->as<ASTExpressionList>();
} }
} }

View File

@ -2,10 +2,11 @@
#include <Parsers/IAST_fwd.h> #include <Parsers/IAST_fwd.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
namespace DB namespace DB
{ {
ASTExpressionList * extractTableFunctionArgumentsFromSelectQuery(ASTPtr & query); ASTFunction * extractTableFunctionFromSelectQuery(ASTPtr & query);
} }

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/checkAndGetLiteralArgument.h>
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionObjectStorage.h> #include <TableFunctions/TableFunctionObjectStorage.h>
@ -24,15 +25,25 @@ class ITableFunctionCluster : public Base
public: public:
String getName() const override = 0; String getName() const override = 0;
static void updateStructureAndFormatArgumentsIfNeeded(ASTs & args, const String & structure_, const String & format_, const ContextPtr & context) static void updateStructureAndFormatArgumentsIfNeeded(ASTFunction * table_function, const String & structure_, const String & format_, const ContextPtr & context)
{ {
auto * expression_list = table_function->arguments->as<ASTExpressionList>();
ASTs args = expression_list->children;
if (args.empty()) if (args.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected empty list of arguments for {}Cluster table function", Base::name);
ASTPtr cluster_name_arg = args.front(); if (table_function->name == Base::name)
args.erase(args.begin()); Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context);
Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context); else if (table_function->name == fmt::format("{}Cluster", Base::name))
args.insert(args.begin(), cluster_name_arg); {
ASTPtr cluster_name_arg = args.front();
args.erase(args.begin());
Base::updateStructureAndFormatArgumentsIfNeeded(args, structure_, format_, context);
args.insert(args.begin(), cluster_name_arg);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected table function name: {}", table_function->name);
} }
protected: protected:

View File

@ -1,5 +1,8 @@
#include "config.h" #include "config.h"
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
#include <Access/Common/AccessFlags.h> #include <Access/Common/AccessFlags.h>
#include <Analyzer/FunctionNode.h> #include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h> #include <Analyzer/TableFunctionNode.h>
@ -19,11 +22,20 @@
#include <Storages/ObjectStorage/Local/Configuration.h> #include <Storages/ObjectStorage/Local/Configuration.h>
#include <Storages/ObjectStorage/S3/Configuration.h> #include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
namespace DB namespace DB
{ {
namespace Setting
{
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
}
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
@ -100,8 +112,9 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration>::executeImpl(
ColumnsDescription cached_columns, ColumnsDescription cached_columns,
bool is_insert_query) const bool is_insert_query) const
{ {
ColumnsDescription columns;
chassert(configuration); chassert(configuration);
ColumnsDescription columns;
if (configuration->structure != "auto") if (configuration->structure != "auto")
columns = parseColumnsListFromString(configuration->structure, context); columns = parseColumnsListFromString(configuration->structure, context);
else if (!structure_hint.empty()) else if (!structure_hint.empty())
@ -109,18 +122,43 @@ StoragePtr TableFunctionObjectStorage<Definition, Configuration>::executeImpl(
else if (!cached_columns.empty()) else if (!cached_columns.empty())
columns = cached_columns; columns = cached_columns;
StoragePtr storage = std::make_shared<StorageObjectStorage>( StoragePtr storage;
const auto & settings = context->getSettingsRef();
auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString();
auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0
&& settings[Setting::parallel_replicas_for_cluster_engines]
&& settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS
&& !parallel_replicas_cluster_name.empty()
&& !context->isDistributed();
if (can_use_parallel_replicas)
{
storage = std::make_shared<StorageObjectStorageCluster>(
parallel_replicas_cluster_name,
configuration,
getObjectStorage(context, !is_insert_query),
StorageID(getDatabaseName(), table_name),
columns,
ConstraintsDescription{},
context);
storage->startup();
return storage;
}
storage = std::make_shared<StorageObjectStorage>(
configuration, configuration,
getObjectStorage(context, !is_insert_query), getObjectStorage(context, !is_insert_query),
context, context,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
columns, columns,
ConstraintsDescription{}, ConstraintsDescription{},
String{}, /* comment */ String{},
/* format_settings */ std::nullopt, /* format_settings */ std::nullopt,
/* mode */ LoadingStrictnessLevel::CREATE, /* mode */ LoadingStrictnessLevel::CREATE,
/* distributed_processing */ false, /* distributed_processing */ false,
nullptr); /* partition_by */ nullptr);
storage->startup(); storage->startup();
return storage; return storage;

View File

@ -6,8 +6,10 @@
#include <Storages/ObjectStorage/StorageObjectStorage.h> #include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/VirtualColumnUtils.h> #include <Storages/VirtualColumnUtils.h>
#include <TableFunctions/ITableFunction.h> #include <TableFunctions/ITableFunction.h>
#include "config.h" #include "config.h"
namespace DB namespace DB
{ {

View File

@ -2,22 +2,34 @@
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
#include <Access/Common/AccessFlags.h> #include <Access/Common/AccessFlags.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Core/Settings.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <Storages/NamedCollectionsHelpers.h> #include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StorageURLCluster.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromVector.h> #include <IO/WriteBufferFromVector.h>
namespace DB namespace DB
{ {
namespace Setting
{
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
}
std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const std::vector<size_t> TableFunctionURL::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
{ {
auto & table_function_node = query_node_table_function->as<TableFunctionNode &>(); auto & table_function_node = query_node_table_function->as<TableFunctionNode &>();
@ -120,6 +132,28 @@ StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context, const String & source, const String & format_, const ColumnsDescription & columns, ContextPtr global_context,
const std::string & table_name, const String & compression_method_) const const std::string & table_name, const String & compression_method_) const
{ {
const auto & settings = global_context->getSettingsRef();
auto parallel_replicas_cluster_name = settings[Setting::cluster_for_parallel_replicas].toString();
auto can_use_parallel_replicas = settings[Setting::allow_experimental_parallel_reading_from_replicas] > 0
&& settings[Setting::parallel_replicas_for_cluster_engines]
&& settings[Setting::parallel_replicas_mode] == ParallelReplicasMode::READ_TASKS
&& !parallel_replicas_cluster_name.empty()
&& !global_context->isDistributed();
if (can_use_parallel_replicas)
{
return std::make_shared<StorageURLCluster>(
global_context,
settings[Setting::cluster_for_parallel_replicas],
filename,
format,
compression_method,
StorageID(getDatabaseName(), table_name),
getActualTableStructure(global_context, /* is_insert_query */ true),
ConstraintsDescription{},
configuration);
}
return std::make_shared<StorageURL>( return std::make_shared<StorageURL>(
source, source,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),

View File

@ -10,11 +10,10 @@ StoragePtr TableFunctionURLCluster::getStorage(
const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context, const String & /*source*/, const String & /*format_*/, const ColumnsDescription & columns, ContextPtr context,
const std::string & table_name, const String & /*compression_method_*/) const const std::string & table_name, const String & /*compression_method_*/) const
{ {
StoragePtr storage;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{ {
//On worker node this uri won't contain globs //On worker node this uri won't contain globs
storage = std::make_shared<StorageURL>( return std::make_shared<StorageURL>(
filename, filename,
StorageID(getDatabaseName(), table_name), StorageID(getDatabaseName(), table_name),
format, format,
@ -29,20 +28,17 @@ StoragePtr TableFunctionURLCluster::getStorage(
nullptr, nullptr,
/*distributed_processing=*/ true); /*distributed_processing=*/ true);
} }
else
{ return std::make_shared<StorageURLCluster>(
storage = std::make_shared<StorageURLCluster>( context,
context, cluster_name,
cluster_name, filename,
filename, format,
format, compression_method,
compression_method, StorageID(getDatabaseName(), table_name),
StorageID(getDatabaseName(), table_name), getActualTableStructure(context, /* is_insert_query */ true),
getActualTableStructure(context, /* is_insert_query */ true), ConstraintsDescription{},
ConstraintsDescription{}, configuration);
configuration);
}
return storage;
} }
void registerTableFunctionURLCluster(TableFunctionFactory & factory) void registerTableFunctionURLCluster(TableFunctionFactory & factory)

View File

@ -0,0 +1,8 @@
Expression (Project names)
ReadFromCluster
Expression (Project names)
ReadFromCluster
Expression ((Project names + (Projection + Change column names to column identifiers)))
ReadFromURL
Expression ((Project names + (Projection + Change column names to column identifiers)))
S3Source

View File

@ -0,0 +1,14 @@
-- Tags: no-fasttest
-- Tag no-fasttest: Depends on AWS
SET enable_parallel_replicas=1;
SET cluster_for_parallel_replicas='default';
SET parallel_replicas_for_cluster_engines=true;
EXPLAIN SELECT * FROM url('http://localhost:8123');
EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV');
SET parallel_replicas_for_cluster_engines=false;
EXPLAIN SELECT * FROM url('http://localhost:8123');
EXPLAIN SELECT * FROM s3('http://localhost:11111/test/a.tsv', 'TSV');