From 4e4b3832146354082b3e6488f6813b963f7caca4 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 24 Mar 2021 21:36:31 +0300 Subject: [PATCH] added hash of itiator address --- src/DataStreams/RemoteQueryExecutor.cpp | 22 +- src/IO/S3Common.cpp | 1 - src/Interpreters/Cluster.cpp | 11 + src/Interpreters/Cluster.h | 3 + src/Interpreters/DatabaseAndTableWithAlias.h | 8 +- .../ASTFunctionWithKeyValueArguments.h | 4 +- src/Parsers/ExpressionElementParsers.h | 12 +- src/Storages/StorageS3Distributed.cpp | 208 +++++++++++------- src/Storages/StorageS3Distributed.h | 19 +- .../TableFunctionS3Distributed.cpp | 55 ++--- .../TableFunctionS3Distributed.h | 2 +- 11 files changed, 192 insertions(+), 153 deletions(-) diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index 847baf555ee..4aa659854b9 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -114,7 +114,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor() /** If we receive a block with slightly different column types, or with excessive columns, * we will adapt it to expected structure. */ -[[maybe_unused]] static Block adaptBlockStructure(const Block & block, const Block & header) +static Block adaptBlockStructure(const Block & block, const Block & header) { /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. if (!header) @@ -123,9 +123,6 @@ RemoteQueryExecutor::~RemoteQueryExecutor() Block res; res.info = block.info; - std::cout << "block " << block.dumpStructure() << std::endl; - std::cout << "header " << header.dumpStructure() << std::endl; - for (const auto & elem : header) { ColumnPtr column; @@ -156,17 +153,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor() column = elem.column->cloneResized(block.rows()); } else - { - // if (!block.has(elem.name)) - // { - // column = elem.type->createColumn(); - // } - // else - // { - // column = castColumn(block.getByName(elem.name), elem.type); - // } column = castColumn(block.getByName(elem.name), elem.type); - } res.insert({column, elem.type, elem.name}); } @@ -327,12 +314,7 @@ std::optional RemoteQueryExecutor::processPacket(Packet packet) case Protocol::Server::Data: /// If the block is not empty and is not a header block if (packet.block && (packet.block.rows() > 0)) - { - // return packet.block; - Block anime = adaptBlockStructure(packet.block, header); - std::cout << "RemoteQueryExecutor " << anime.dumpStructure() << std::endl; - return anime; - } + return adaptBlockStructure(packet.block, header); break; /// If the block is empty - we will receive other packets before EndOfStream. case Protocol::Server::Exception: diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index e0d0709bbab..f9962735ddc 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -399,7 +399,6 @@ namespace S3 else throw Exception("Bucket or key name are invalid in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS); } - } } diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp index bac688fe81e..20ec3a794d1 100644 --- a/src/Interpreters/Cluster.cpp +++ b/src/Interpreters/Cluster.cpp @@ -138,6 +138,17 @@ String Cluster::Address::toString() const return toString(host_name, port); } + +String Cluster::Address::getHash() const +{ + SipHash hash; + hash.update(host_name); + hash.update(std::to_string(port)); + hash.update(user); + hash.update(password); + return std::to_string(hash.get64()); +} + String Cluster::Address::toString(const String & host_name, UInt16 port) { return escapeForFileName(host_name) + ':' + DB::toString(port); diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h index 5976074ec7a..89d508396ad 100644 --- a/src/Interpreters/Cluster.h +++ b/src/Interpreters/Cluster.h @@ -122,6 +122,9 @@ public: /// Returns 'escaped_host_name:port' String toString() const; + + /// Returns hash of all fields + String getHash() const; /// Returns 'host_name:port' String readableString() const; diff --git a/src/Interpreters/DatabaseAndTableWithAlias.h b/src/Interpreters/DatabaseAndTableWithAlias.h index d2b1d655de7..a4773ec435b 100644 --- a/src/Interpreters/DatabaseAndTableWithAlias.h +++ b/src/Interpreters/DatabaseAndTableWithAlias.h @@ -26,9 +26,9 @@ struct DatabaseAndTableWithAlias UUID uuid = UUIDHelpers::Nil; DatabaseAndTableWithAlias() = default; - DatabaseAndTableWithAlias(const ASTPtr & identifier_node, const String & current_database = ""); - DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database = ""); - DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = ""); + explicit DatabaseAndTableWithAlias(const ASTPtr & identifier_node, const String & current_database = ""); + explicit DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database = ""); + explicit DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = ""); /// "alias." or "table." if alias is empty String getQualifiedNamePrefix(bool with_dot = true) const; @@ -80,7 +80,7 @@ private: void addAdditionalColumns(NamesAndTypesList & target, const NamesAndTypesList & addition) { target.insert(target.end(), addition.begin(), addition.end()); - for (auto & col : addition) + for (const auto & col : addition) names.insert(col.name); } diff --git a/src/Parsers/ASTFunctionWithKeyValueArguments.h b/src/Parsers/ASTFunctionWithKeyValueArguments.h index 88ab712cc04..f5eaa33bfc7 100644 --- a/src/Parsers/ASTFunctionWithKeyValueArguments.h +++ b/src/Parsers/ASTFunctionWithKeyValueArguments.h @@ -20,7 +20,7 @@ public: bool second_with_brackets; public: - ASTPair(bool second_with_brackets_) + explicit ASTPair(bool second_with_brackets_) : second_with_brackets(second_with_brackets_) { } @@ -49,7 +49,7 @@ public: /// Has brackets around arguments bool has_brackets; - ASTFunctionWithKeyValueArguments(bool has_brackets_ = true) + explicit ASTFunctionWithKeyValueArguments(bool has_brackets_ = true) : has_brackets(has_brackets_) { } diff --git a/src/Parsers/ExpressionElementParsers.h b/src/Parsers/ExpressionElementParsers.h index cbbbd3f6d3b..f8b2408ac16 100644 --- a/src/Parsers/ExpressionElementParsers.h +++ b/src/Parsers/ExpressionElementParsers.h @@ -45,7 +45,7 @@ protected: class ParserIdentifier : public IParserBase { public: - ParserIdentifier(bool allow_query_parameter_ = false) : allow_query_parameter(allow_query_parameter_) {} + explicit ParserIdentifier(bool allow_query_parameter_ = false) : allow_query_parameter(allow_query_parameter_) {} protected: const char * getName() const override { return "identifier"; } bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; @@ -59,7 +59,7 @@ protected: class ParserCompoundIdentifier : public IParserBase { public: - ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false) + explicit ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false) : table_name_with_optional_uuid(table_name_with_optional_uuid_), allow_query_parameter(allow_query_parameter_) { } @@ -85,7 +85,7 @@ public: using ColumnTransformers = MultiEnum; static constexpr auto AllTransformers = ColumnTransformers{ColumnTransformer::APPLY, ColumnTransformer::EXCEPT, ColumnTransformer::REPLACE}; - ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false) + explicit ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false) : allowed_transformers(allowed_transformers_) , is_strict(is_strict_) {} @@ -103,7 +103,7 @@ class ParserAsterisk : public IParserBase { public: using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers; - ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) + explicit ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) : allowed_transformers(allowed_transformers_) {} @@ -129,7 +129,7 @@ class ParserColumnsMatcher : public IParserBase { public: using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers; - ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) + explicit ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers) : allowed_transformers(allowed_transformers_) {} @@ -149,7 +149,7 @@ protected: class ParserFunction : public IParserBase { public: - ParserFunction(bool allow_function_parameters_ = true, bool is_table_function_ = false) + explicit ParserFunction(bool allow_function_parameters_ = true, bool is_table_function_ = false) : allow_function_parameters(allow_function_parameters_), is_table_function(is_table_function_) { } diff --git a/src/Storages/StorageS3Distributed.cpp b/src/Storages/StorageS3Distributed.cpp index f64e6fb3622..6254b7f15df 100644 --- a/src/Storages/StorageS3Distributed.cpp +++ b/src/Storages/StorageS3Distributed.cpp @@ -31,10 +31,12 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -61,6 +63,19 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +struct StorageS3SourceBuilder +{ + bool need_path; + bool need_file; + String format; + String name; + Block sample_block; + const Context & context; + const ColumnsDescription & columns; + UInt64 max_block_size; + String compression_method; +}; + class StorageS3SequentialSource : public SourceWithProgress { public: @@ -77,37 +92,23 @@ public: StorageS3SequentialSource( String initial_query_id_, - bool need_path_, - bool need_file_, - const String & format_, - String name_, - const Block & sample_block_, - const Context & context_, - const ColumnsDescription & columns_, - UInt64 max_block_size_, - const CompressionMethod compression_method_, - StorageS3::ClientAuthentificaiton & client_auth_) - : SourceWithProgress(getHeader(sample_block_, need_path_, need_file_)) - , need_path(need_path_) - , need_file(need_file_) - , format(format_) - , name(name_) - , sample_block(sample_block_) - , context(context_) - , columns(columns_) - , max_block_size(max_block_size_) - , compression_method(compression_method_) - , client_auth(client_auth_) + Cluster::Address initiator, + const ClientAuthentificationBuilder & client_auth_builder_, + const StorageS3SourceBuilder & s3_builder_) + : SourceWithProgress(getHeader(s3_builder_.sample_block, s3_builder_.need_path, s3_builder_.need_file)) , initial_query_id(initial_query_id_) + , s3_source_builder(s3_builder_) + , cli_builder(client_auth_builder_) { - initiator_connection = std::make_shared( - /*host*/"127.0.0.1", - /*port*/9000, - /*default_database=*/context.getGlobalContext().getCurrentDatabase(), - /*user=*/context.getClientInfo().initial_user, - /*password=*/"", - /*cluster=*/"", - /*cluster_secret=*/"" + connections = std::make_shared( + /*max_connections*/3, + /*host*/initiator.host_name, + /*port*/initiator.port, + /*default_database=*/s3_builder_.context.getGlobalContext().getCurrentDatabase(), + /*user=*/s3_builder_.context.getClientInfo().initial_user, + /*password=*/initiator.password, + /*cluster=*/initiator.cluster, + /*cluster_secret=*/initiator.cluster_secret ); createOrUpdateInnerSource(); @@ -115,7 +116,7 @@ public: String getName() const override { - return name; + return "StorageS3SequentialSource"; } Chunk generate() override @@ -131,7 +132,6 @@ public: else chunk = inner->generate(); } - std::cout << "generate() " << chunk.dumpStructure() << std::endl; return chunk; } @@ -141,9 +141,9 @@ private: { try { - initiator_connection->connect(timeouts); - initiator_connection->sendNextTaskRequest(initial_query_id); - auto packet = initiator_connection->receivePacket(); + auto connection = connections->get(timeouts); + connection->sendNextTaskRequest(initial_query_id); + auto packet = connection->receivePacket(); assert(packet.type = Protocol::Server::NextTaskReply); LOG_TRACE(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task); return packet.next_task; @@ -155,28 +155,32 @@ private: } } - bool createOrUpdateInnerSource() { auto next_string = askAboutNextKey(); - std::cout << "createOrUpdateInnerSource " << next_string << std::endl; if (next_string.empty()) return false; auto next_uri = S3::URI(Poco::URI(next_string)); - assert(next_uri.bucket == client_auth.uri.bucket); + auto client_auth = StorageS3::ClientAuthentificaiton{ + next_uri, + cli_builder.access_key_id, + cli_builder.secret_access_key, + cli_builder.max_connections, + {}, {}}; + StorageS3::updateClientAndAuthSettings(s3_source_builder.context, client_auth); inner = std::make_unique( - need_path, - need_file, - format, - name, - sample_block, - context, - columns, - max_block_size, - compression_method, + s3_source_builder.need_path, + s3_source_builder.need_file, + s3_source_builder.format, + s3_source_builder.name, + s3_source_builder.sample_block, + s3_source_builder.context, + s3_source_builder.columns, + s3_source_builder.max_block_size, + chooseCompressionMethod(client_auth.uri.key, s3_source_builder.compression_method), client_auth.client, client_auth.uri.bucket, next_uri.key @@ -184,30 +188,24 @@ private: return true; } - bool need_path; - bool need_file; - String format; - String name; - Block sample_block; - const Context & context; - const ColumnsDescription & columns; - UInt64 max_block_size; - const CompressionMethod compression_method; + /// This is used to ask about next task + String initial_query_id; + + StorageS3SourceBuilder s3_source_builder; + ClientAuthentificationBuilder cli_builder; std::unique_ptr inner; - StorageS3::ClientAuthentificaiton client_auth; /// One second just in case ConnectionTimeouts timeouts{{1, 0}, {1, 0}, {1, 0}}; - std::shared_ptr initiator_connection; - /// This is used to ask about next task - String initial_query_id; + std::shared_ptr connections; }; StorageS3Distributed::StorageS3Distributed( - const S3::URI & uri_, + IAST::Hash tree_hash_, + const String & address_hash_or_filename_, const String & access_key_id_, const String & secret_access_key_, const StorageID & table_id_, @@ -219,17 +217,18 @@ StorageS3Distributed::StorageS3Distributed( const Context & context_, const String & compression_method_) : IStorage(table_id_) + , tree_hash(tree_hash_) + , address_hash_or_filename(address_hash_or_filename_) , cluster_name(cluster_name_) , cluster(context_.getCluster(cluster_name)->getClusterWithReplicasAsShards(context_.getSettings())) - , client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} , format_name(format_name_) , compression_method(compression_method_) + , cli_builder{access_key_id_, secret_access_key_, max_connections_} { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); - StorageS3::updateClientAndAuthSettings(context_, client_auth); } @@ -246,7 +245,16 @@ Pipe StorageS3Distributed::read( /// Secondary query, need to read from S3 if (context.getCurrentQueryId() != context.getInitialQueryId()) { - StorageS3::updateClientAndAuthSettings(context, client_auth); + /// Find initiator in cluster + Cluster::Address initiator; + for (const auto & replicas : cluster->getShardsAddresses()) + for (const auto & node : replicas) + if (node.getHash() == address_hash_or_filename) + { + initiator = node; + break; + } + bool need_path_column = false; bool need_file_column = false; @@ -258,13 +266,8 @@ Pipe StorageS3Distributed::read( need_file_column = true; } - std::cout << need_file_column << std::boolalpha << need_file_column << std::endl; - std::cout << need_path_column << std::boolalpha << need_path_column << std::endl; - - std::cout << "metadata_snapshot->getSampleBlock().dumpStructure() " << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl; - - return Pipe(std::make_shared( - context.getInitialQueryId(), + StorageS3SourceBuilder s3builder + { need_path_column, need_file_column, format_name, @@ -273,24 +276,65 @@ Pipe StorageS3Distributed::read( context, metadata_snapshot->getColumns(), max_block_size, - chooseCompressionMethod(client_auth.uri.key, compression_method), - client_auth + compression_method + }; + + return Pipe(std::make_shared( + context.getInitialQueryId(), + /*initiator*/initiator, + cli_builder, + s3builder )); } - Pipes pipes; - connections.reserve(cluster->getShardCount()); + /// This part of code executes on initiator - std::cout << "StorageS3Distributed::read" << std::endl; - std::cout << "QueryProcessingStage " << processed_stage << std::endl; + String hash_of_address; + for (const auto & replicas : cluster->getShardsAddresses()) + for (const auto & node : replicas) + if (node.is_local && node.port == context.getTCPPort()) + { + hash_of_address = node.getHash(); + break; + } + /// FIXME: better exception + if (hash_of_address.empty()) + throw Exception(fmt::format("Could not find outself in cluster {}", ""), ErrorCodes::LOGICAL_ERROR); + + auto remote_query_ast = query_info.query->clone(); + auto table_expressions_from_whole_query = getTableExpressions(remote_query_ast->as()); + + String remote_query; + for (const auto & table_expression : table_expressions_from_whole_query) + { + const auto & table_function_ast = table_expression->table_function; + if (table_function_ast->getTreeHash() == tree_hash) + { + std::cout << table_function_ast->dumpTree() << std::endl; + auto & arguments = table_function_ast->children.at(0)->children; + auto & bucket = arguments[1]->as().value.safeGet(); + /// We rewrite query, and insert a port to connect as a first parameter + /// So, we write hash_of_address here as buckey name to find initiator node + /// in cluster from config on remote replica + bucket = hash_of_address; + remote_query = queryToString(remote_query_ast); + break; + } + } + + if (remote_query.empty()) + throw Exception("No table function", ErrorCodes::LOGICAL_ERROR); Block header = - InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + InterpreterSelectQuery(remote_query_ast, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{}; + Pipes pipes; + connections.reserve(cluster->getShardCount()); + for (const auto & replicas : cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard for (const auto & node : replicas) @@ -306,13 +350,13 @@ Pipe StorageS3Distributed::read( )); auto stream = std::make_shared( /*connection=*/*connections.back(), - /*query=*/queryToString(query_info.query), + /*query=*/remote_query, /*header=*/header, /*context=*/context, nullptr, scalars, Tables(), - QueryProcessingStage::FetchColumns + processed_stage ); pipes.emplace_back(std::make_shared(std::move(stream))); } @@ -322,11 +366,5 @@ Pipe StorageS3Distributed::read( metadata_snapshot->check(column_names, getVirtuals(), getStorageID()); return Pipe::unitePipes(std::move(pipes)); } - - - - - - } diff --git a/src/Storages/StorageS3Distributed.h b/src/Storages/StorageS3Distributed.h index e7c5c96900e..ba7dfb88330 100644 --- a/src/Storages/StorageS3Distributed.h +++ b/src/Storages/StorageS3Distributed.h @@ -20,6 +20,12 @@ namespace ErrorCodes class Context; +struct ClientAuthentificationBuilder +{ + String access_key_id; + String secret_access_key; + UInt64 max_connections; +}; class StorageS3Distributed : public ext::shared_ptr_helper, public IStorage { @@ -39,7 +45,8 @@ public: protected: StorageS3Distributed( - const S3::URI & uri_, + IAST::Hash tree_hash_, + const String & address_hash_or_filename_, const String & access_key_id_, const String & secret_access_key_, const StorageID & table_id_, @@ -49,21 +56,19 @@ protected: const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, - const String & compression_method_ = ""); + const String & compression_method_); private: /// Connections from initiator to other nodes std::vector> connections; + IAST::Hash tree_hash; + String address_hash_or_filename; std::string cluster_name; ClusterPtr cluster; - /// This will be used on non-initiator nodes. - std::optional initiator; - std::shared_ptr initiator_connection; - StorageS3::ClientAuthentificaiton client_auth; - String format_name; String compression_method; + ClientAuthentificationBuilder cli_builder; }; diff --git a/src/TableFunctions/TableFunctionS3Distributed.cpp b/src/TableFunctions/TableFunctionS3Distributed.cpp index 8717a5aa5bc..3c17faff456 100644 --- a/src/TableFunctions/TableFunctionS3Distributed.cpp +++ b/src/TableFunctions/TableFunctionS3Distributed.cpp @@ -1,6 +1,7 @@ #include #include #include "DataStreams/RemoteBlockInputStream.h" +#include "Parsers/ASTExpressionList.h" #include "Parsers/ASTFunction.h" #include "Parsers/IAST_fwd.h" #include "Processors/Sources/SourceFromInputStream.h" @@ -49,7 +50,7 @@ void TableFunctionS3Distributed::parseArguments(const ASTPtr & ast_function, con arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context); cluster_name = args[0]->as().value.safeGet(); - filename = args[1]->as().value.safeGet(); + filename_or_initiator_hash = args[1]->as().value.safeGet(); if (args.size() < 5) { @@ -78,38 +79,38 @@ StoragePtr TableFunctionS3Distributed::executeImpl( const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const { - Poco::URI uri (filename); - S3::URI s3_uri (uri); - // UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size; - // UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size; UInt64 max_connections = context.getSettingsRef().s3_max_connections; - StorageS3::ClientAuthentificaiton client_auth{s3_uri, access_key_id, secret_access_key, max_connections, {}, {}}; - StorageS3::updateClientAndAuthSettings(context, client_auth); - - auto lists = StorageS3::listFilesWithRegexpMatching(*client_auth.client, client_auth.uri); - Strings tasks; - tasks.reserve(lists.size()); - - for (auto & value : lists) + /// Initiator specific logic + while (context.getInitialQueryId() == context.getCurrentQueryId()) { - tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value); - std::cout << tasks.back() << std::endl; + auto poco_uri = Poco::URI{filename_or_initiator_hash}; + + /// This is needed, because secondary query on local replica has the same query-id + if (poco_uri.getHost().empty() || poco_uri.getPort() == 0) + break; + + S3::URI s3_uri(poco_uri); + StorageS3::ClientAuthentificaiton client_auth{s3_uri, access_key_id, secret_access_key, max_connections, {}, {}}; + StorageS3::updateClientAndAuthSettings(context, client_auth); + + auto lists = StorageS3::listFilesWithRegexpMatching(*client_auth.client, client_auth.uri); + Strings tasks; + tasks.reserve(lists.size()); + + for (auto & value : lists) + tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value); + + /// Register resolver, which will give other nodes a task to execute + TaskSupervisor::instance().registerNextTaskResolver( + std::make_unique(context.getCurrentQueryId(), std::move(tasks))); + + break; } - std::cout << "query_id " << context.getCurrentQueryId() << std::endl; - - std::cout << ast_function->dumpTree() << std::endl; - auto * func = ast_function->as(); - - std::cout << func->arguments->dumpTree() << std::endl; - - /// Register resolver, which will give other nodes a task to execute - TaskSupervisor::instance().registerNextTaskResolver( - std::make_unique(context.getCurrentQueryId(), std::move(tasks))); - StoragePtr storage = StorageS3Distributed::create( - s3_uri, + ast_function->getTreeHash(), + filename_or_initiator_hash, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name), diff --git a/src/TableFunctions/TableFunctionS3Distributed.h b/src/TableFunctions/TableFunctionS3Distributed.h index d531ef175bb..2fab786dee6 100644 --- a/src/TableFunctions/TableFunctionS3Distributed.h +++ b/src/TableFunctions/TableFunctionS3Distributed.h @@ -37,7 +37,7 @@ protected: void parseArguments(const ASTPtr & ast_function, const Context & context) override; String cluster_name; - String filename; + String filename_or_initiator_hash; String format; String structure; String access_key_id;