#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { IStorageCluster::IStorageCluster( const String & cluster_name_, const StorageID & table_id_, LoggerPtr log_) : IStorage(table_id_) , log(log_) , cluster_name(cluster_name_) { } class ReadFromCluster : public SourceStepWithFilter { public: std::string getName() const override { return "ReadFromCluster"; } void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; void applyFilters(ActionDAGNodes added_filter_nodes) override; ReadFromCluster( const Names & column_names_, const SelectQueryInfo & query_info_, const StorageSnapshotPtr & storage_snapshot_, const ContextPtr & context_, Block sample_block, std::shared_ptr storage_, ASTPtr query_to_send_, QueryProcessingStage::Enum processed_stage_, ClusterPtr cluster_, LoggerPtr log_) : SourceStepWithFilter( DataStream{.header = std::move(sample_block)}, column_names_, query_info_, storage_snapshot_, context_) , storage(std::move(storage_)) , query_to_send(std::move(query_to_send_)) , processed_stage(processed_stage_) , cluster(std::move(cluster_)) , log(log_) { } private: std::shared_ptr storage; ASTPtr query_to_send; QueryProcessingStage::Enum processed_stage; ClusterPtr cluster; LoggerPtr log; std::optional extension; void createExtension(const ActionsDAG::Node * predicate); ContextPtr updateSettings(const Settings & settings); }; void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes) { filter_actions_dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes); const ActionsDAG::Node * predicate = nullptr; if (filter_actions_dag) predicate = filter_actions_dag->getOutputs().at(0); createExtension(predicate); } void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate) { if (extension) return; extension = storage->getTaskIteratorExtension(predicate, context); } /// The code executes on initiator void IStorageCluster::read( QueryPlan & query_plan, const Names & column_names, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info, ContextPtr context, QueryProcessingStage::Enum processed_stage, size_t /*max_block_size*/, size_t /*num_streams*/) { storage_snapshot->check(column_names); updateBeforeRead(context); auto cluster = getCluster(context); /// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*) Block sample_block; ASTPtr query_to_send = query_info.query; if (context->getSettingsRef().allow_experimental_analyzer) { sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage)); } else { auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()); sample_block = interpreter.getSampleBlock(); query_to_send = interpreter.getQueryInfo().query->clone(); } updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context); RestoreQualifiedNamesVisitor::Data data; data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0)); data.remote_table.database = context->getCurrentDatabase(); data.remote_table.table = getName(); RestoreQualifiedNamesVisitor(data).visit(query_to_send); AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(), /* only_replace_current_database_function_= */false, /* only_replace_in_join_= */true); visitor.visit(query_to_send); auto this_ptr = std::static_pointer_cast(shared_from_this()); auto reading = std::make_unique( column_names, query_info, storage_snapshot, context, sample_block, std::move(this_ptr), std::move(query_to_send), processed_stage, cluster, log); query_plan.addStep(std::move(reading)); } void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { createExtension(nullptr); const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{}; const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState; Pipes pipes; auto new_context = updateSettings(context->getSettingsRef()); const auto & current_settings = new_context->getSettingsRef(); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); for (const auto & shard_info : cluster->getShardsInfo()) { auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY); for (auto & try_result : try_results) { auto remote_query_executor = std::make_shared( std::vector{try_result}, queryToString(query_to_send), getOutputStream().header, new_context, /*throttler=*/nullptr, scalars, Tables(), processed_stage, extension); remote_query_executor->setLogger(log); pipes.emplace_back( std::make_shared(remote_query_executor, add_agg_info, current_settings.async_socket_for_remote, false)); } } auto pipe = Pipe::unitePipes(std::move(pipes)); if (pipe.empty()) pipe = Pipe(std::make_shared(getOutputStream().header)); for (const auto & processor : pipe.getProcessors()) processors.emplace_back(processor); pipeline.init(std::move(pipe)); } QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const { /// Initiator executes query on remote node. if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) if (to_stage >= QueryProcessingStage::Enum::WithMergeableState) return QueryProcessingStage::Enum::WithMergeableState; /// Follower just reads the data. return QueryProcessingStage::Enum::FetchColumns; } ContextPtr ReadFromCluster::updateSettings(const Settings & settings) { Settings new_settings = settings; /// Cluster table functions should always skip unavailable shards. new_settings.skip_unavailable_shards = true; auto new_context = Context::createCopy(context); new_context->setSettings(new_settings); return new_context; } ClusterPtr IStorageCluster::getCluster(ContextPtr context) const { return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef()); } }