#include "Storages/StorageS3Cluster.h" #include "config.h" #if USE_AWS_S3 #include "Common/Exception.h" #include "Client/Connection.h" #include "Core/QueryProcessingStage.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } static ASTPtr addColumnsStructureToQuery(const ASTPtr & query, const String & structure) { /// Add argument with table structure to s3Cluster table function in select query. auto result_query = query->clone(); ASTExpressionList * expression_list = extractTableFunctionArgumentsFromSelectQuery(result_query); if (!expression_list) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query from table function from s3Cluster, got '{}'", queryToString(query)); auto structure_literal = std::make_shared(structure); if (expression_list->children.size() < 2 || expression_list->children.size() > 5) throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 2 to 5 arguments in s3Cluster table functions, got {}", expression_list->children.size()); if (expression_list->children.size() == 2 || expression_list->children.size() == 4) { auto format_literal = std::make_shared("auto"); expression_list->children.push_back(format_literal); } expression_list->children.push_back(structure_literal); return result_query; } StorageS3Cluster::StorageS3Cluster( const StorageS3ClusterConfiguration & configuration_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_) : IStorage(table_id_) , s3_configuration{configuration_.url, configuration_.auth_settings, configuration_.rw_settings, configuration_.headers} , filename(configuration_.url) , cluster_name(configuration_.cluster_name) , format_name(configuration_.format) , compression_method(configuration_.compression_method) { context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename}); StorageInMemoryMetadata storage_metadata; StorageS3::updateS3Configuration(context_, s3_configuration); if (columns_.empty()) { const bool is_key_with_globs = filename.find_first_of("*?{") != std::string::npos; /// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set /// for asking for the next tasks. /// `format_settings` is set to std::nullopt, because StorageS3Cluster is used only as table function auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method, /*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_); storage_metadata.setColumns(columns); need_to_add_structure_to_query = true; } else storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); auto default_virtuals = NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList(); virtual_columns = getVirtualsForStorage(columns, default_virtuals); for (const auto & column : virtual_columns) virtual_block.insert({column.type->createColumn(), column.type, column.name}); } /// The code executes on initiator Pipe StorageS3Cluster::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t /*max_block_size*/, unsigned /*num_streams*/) { StorageS3::updateS3Configuration(context, s3_configuration); auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); auto iterator = std::make_shared( *s3_configuration.client, s3_configuration.uri, query_info.query, virtual_block, context); auto callback = std::make_shared([iterator]() mutable -> String { return iterator->next(); }); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block header = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; Pipes pipes; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; ASTPtr query_to_send = query_info.original_query; if (need_to_add_structure_to_query) query_to_send = addColumnsStructureToQuery( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll())); for (const auto & replicas : cluster->getShardsAddresses()) { /// There will be only one replica, because we consider each replica as a shard for (const auto & node : replicas) { auto connection = std::make_shared( node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(), node.user, node.password, node.quota_key, node.cluster, node.cluster_secret, "S3ClusterInititiator", node.compression, node.secure ); /// For unknown reason global context is passed to IStorage::read() method /// So, task_identifier is passed as constructor argument. It is more obvious. auto remote_query_executor = std::make_shared( connection, queryToString(query_to_send), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage, RemoteQueryExecutor::Extension{.task_iterator = callback}); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } } storage_snapshot->check(column_names); return Pipe::unitePipes(std::move(pipes)); } QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const { /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) return QueryProcessingStage::Enum::WithMergeableState; /// Follower just reads the data. return QueryProcessingStage::Enum::FetchColumns; } NamesAndTypesList StorageS3Cluster::getVirtuals() const { return virtual_columns; } } #endif