#include "Storages/StorageS3Cluster.h" #include "config.h" #if USE_AWS_S3 #include "Common/Exception.h" #include "Client/Connection.h" #include "Core/QueryProcessingStage.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { StorageS3Cluster::StorageS3Cluster( const Configuration & configuration_, const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, ContextPtr context_, bool structure_argument_was_provided_) : IStorageCluster(table_id_) , s3_configuration{configuration_} , cluster_name(configuration_.cluster_name) , format_name(configuration_.format) , compression_method(configuration_.compression_method) , structure_argument_was_provided(structure_argument_was_provided_) { context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri); StorageInMemoryMetadata storage_metadata; StorageS3::updateS3Configuration(context_, s3_configuration); if (columns_.empty()) { const auto & filename = configuration_.url.uri.getPath(); const bool is_key_with_globs = filename.find_first_of("*?{") != std::string::npos; /// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set /// for asking for the next tasks. /// `format_settings` is set to std::nullopt, because StorageS3Cluster is used only as table function auto columns = StorageS3::getTableStructureFromDataImpl(format_name, s3_configuration, compression_method, /*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_); storage_metadata.setColumns(columns); } else storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); auto default_virtuals = NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList(); virtual_columns = getVirtualsForStorage(columns, default_virtuals); for (const auto & column : virtual_columns) virtual_block.insert({column.type->createColumn(), column.type, column.name}); } /// The code executes on initiator Pipe StorageS3Cluster::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t /*max_block_size*/, size_t /*num_streams*/) { StorageS3::updateS3Configuration(context, s3_configuration); auto cluster = getCluster(context); auto extension = getTaskIteratorExtension(query_info.query, context); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; Pipes pipes; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; ASTPtr query_to_send = interpreter.getQueryInfo().query->clone(); if (!structure_argument_was_provided) addColumnsStructureToQueryWithClusterEngine( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 5, getName()); RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(), /* only_replace_current_database_function_= */false, /* only_replace_in_join_= */true); visitor.visit(query_to_send); const auto & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); for (const auto & shard_info : cluster->getShardsInfo()) { auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY); for (auto & try_result : try_results) { auto remote_query_executor = std::make_shared( shard_info.pool, std::vector{try_result}, queryToString(query_to_send), interpreter.getSampleBlock(), context, /*throttler=*/nullptr, scalars, Tables(), processed_stage, extension); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } } storage_snapshot->check(column_names); return Pipe::unitePipes(std::move(pipes)); } QueryProcessingStage::Enum StorageS3Cluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const { /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) return QueryProcessingStage::Enum::WithMergeableState; /// Follower just reads the data. return QueryProcessingStage::Enum::FetchColumns; } ClusterPtr StorageS3Cluster::getCluster(ContextPtr context) const { return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); } RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr query, ContextPtr context) const { auto iterator = std::make_shared( *s3_configuration.client, s3_configuration.url, query, virtual_block, context); auto callback = std::make_shared>([iterator]() mutable -> String { return iterator->next().key; }); return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } NamesAndTypesList StorageS3Cluster::getVirtuals() const { return virtual_columns; } } #endif