ClickHouse/src/Storages/IStorageCluster.cpp
2024-05-20 22:55:42 +00:00

243 lines
8.2 KiB
C++

#include <Storages/IStorageCluster.h>
#include <Common/Exception.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDictionary.h>
#include <memory>
#include <string>
namespace DB
{
IStorageCluster::IStorageCluster(
const String & cluster_name_,
const StorageID & table_id_,
LoggerPtr log_)
: IStorage(table_id_)
, log(log_)
, cluster_name(cluster_name_)
{
}
class ReadFromCluster : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromCluster"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters(ActionDAGNodes added_filter_nodes) override;
ReadFromCluster(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
Block sample_block,
std::shared_ptr<IStorageCluster> storage_,
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
LoggerPtr log_)
: SourceStepWithFilter(
DataStream{.header = std::move(sample_block)},
column_names_,
query_info_,
storage_snapshot_,
context_)
, storage(std::move(storage_))
, query_to_send(std::move(query_to_send_))
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
{
}
private:
std::shared_ptr<IStorageCluster> storage;
ASTPtr query_to_send;
QueryProcessingStage::Enum processed_stage;
ClusterPtr cluster;
LoggerPtr log;
std::optional<RemoteQueryExecutor::Extension> extension;
void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
};
void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
{
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
const ActionsDAG::Node * predicate = nullptr;
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);
createExtension(predicate);
}
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
{
if (extension)
return;
extension = storage->getTaskIteratorExtension(predicate, context);
}
/// The code executes on initiator
void IStorageCluster::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/)
{
storage_snapshot->check(column_names);
updateBeforeRead(context);
auto cluster = getCluster(context);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block sample_block;
ASTPtr query_to_send = query_info.query;
if (context->getSettingsRef().allow_experimental_analyzer)
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
else
{
auto interpreter = InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze());
sample_block = interpreter.getSampleBlock();
query_to_send = interpreter.getQueryInfo().query->clone();
}
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
data.remote_table.table = getName();
RestoreQualifiedNamesVisitor(data).visit(query_to_send);
AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(),
/* only_replace_current_database_function_= */false,
/* only_replace_in_join_= */true);
visitor.visit(query_to_send);
auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());
auto reading = std::make_unique<ReadFromCluster>(
column_names,
query_info,
storage_snapshot,
context,
sample_block,
std::move(this_ptr),
std::move(query_to_send),
processed_stage,
cluster,
log);
query_plan.addStep(std::move(reading));
}
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
createExtension(nullptr);
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
Pipes pipes;
auto new_context = updateSettings(context->getSettingsRef());
const auto & current_settings = new_context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard_info : cluster->getShardsInfo())
{
auto try_results = shard_info.pool->getMany(timeouts, current_settings, PoolMode::GET_MANY);
for (auto & try_result : try_results)
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
queryToString(query_to_send),
getOutputStream().header,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
extension);
remote_query_executor->setLogger(log);
pipes.emplace_back(std::make_shared<RemoteSource>(
remote_query_executor,
add_agg_info,
current_settings.async_socket_for_remote,
current_settings.async_query_sending_for_remote));
}
}
auto pipe = Pipe::unitePipes(std::move(pipes));
if (pipe.empty())
pipe = Pipe(std::make_shared<NullSource>(getOutputStream().header));
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);
pipeline.init(std::move(pipe));
}
QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr &, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (to_stage >= QueryProcessingStage::Enum::WithMergeableState)
return QueryProcessingStage::Enum::WithMergeableState;
/// Follower just reads the data.
return QueryProcessingStage::Enum::FetchColumns;
}
ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
{
Settings new_settings = settings;
/// Cluster table functions should always skip unavailable shards.
new_settings.skip_unavailable_shards = true;
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
return new_context;
}
ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
{
return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
}
}