#include "config.h" #include "Interpreters/Context_fwd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { StorageURLCluster::StorageURLCluster( ContextPtr context_, String cluster_name_, const String & uri_, const StorageID & table_id_, const String & format_name_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const String & compression_method_, const StorageURL::Configuration &configuration_) : IStorageCluster(table_id_) , cluster_name(cluster_name_) , uri(uri_) , format_name(format_name_) , compression_method(compression_method_) { context_->getRemoteHostFilter().checkURL(Poco::URI(uri_)); StorageInMemoryMetadata storage_metadata; if (columns_.empty()) { auto columns = StorageURL::getTableStructureFromData(format_name_, uri_, chooseCompressionMethod(Poco::URI(uri_).getPath(), compression_method), configuration_.headers, std::nullopt, context_); storage_metadata.setColumns(columns); add_columns_structure_to_query = true; } else storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); } /// The code executes on initiator Pipe StorageURLCluster::read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t /*max_block_size*/, size_t /*num_streams*/) { auto cluster = getCluster(context); auto extension = getTaskIteratorExtension(query_info.query, context); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block header = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; Pipes pipes; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; auto query_to_send = query_info.original_query->clone(); if (add_columns_structure_to_query) addColumnsStructureToQueryWithClusterEngine( query_to_send, StorageDictionary::generateNamesAndTypesDescription(storage_snapshot->metadata->getColumns().getAll()), 3, getName()); const auto & current_settings = context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); for (const auto & shard_info : cluster->getShardsInfo()) { auto try_results = shard_info.pool->getMany(timeouts, ¤t_settings, PoolMode::GET_MANY); for (auto & try_result : try_results) { auto remote_query_executor = std::make_shared( shard_info.pool, std::vector{try_result}, queryToString(query_to_send), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage, extension); pipes.emplace_back(std::make_shared(remote_query_executor, add_agg_info, false)); } } storage_snapshot->check(column_names); return Pipe::unitePipes(std::move(pipes)); } QueryProcessingStage::Enum StorageURLCluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const { /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) return QueryProcessingStage::Enum::WithMergeableState; /// Follower just reads the data. return QueryProcessingStage::Enum::FetchColumns; } ClusterPtr StorageURLCluster::getCluster(ContextPtr context) const { return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); } RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(ASTPtr, ContextPtr context) const { auto iterator = std::make_shared(context, uri); auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } NamesAndTypesList StorageURLCluster::getVirtuals() const { return NamesAndTypesList{ {"_path", std::make_shared(std::make_shared())}, {"_file", std::make_shared(std::make_shared())}}; } }