Check table status for parallel replicas

This commit is contained in:
Igor Nikonov 2024-04-17 16:47:52 +00:00
parent 912592ca78
commit 1e43e3515e
10 changed files with 25 additions and 35 deletions

View File

@ -617,7 +617,6 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time
out->next();
fiu_do_on(FailPoints::receive_timeout_on_table_status_response, {
sleepForSeconds(10);
throw NetException(ErrorCodes::SOCKET_TIMEOUT, "Injected timeout exceeded while reading from socket ({}:{})", host, port);
});

View File

@ -364,7 +364,9 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
SelectStreamFactory & stream_factory,
const StorageID & storage_id,
const Block & header,
QueryProcessingStage::Enum processed_stage,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits)
@ -453,9 +455,10 @@ void executeQueryWithParallelReplicas(
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,
new_cluster,
storage_id,
std::move(coordinator),
stream_factory.header,
stream_factory.processed_stage,
header,
processed_stage,
new_context,
getThrottler(new_context),
std::move(scalars),

View File

@ -68,7 +68,9 @@ void executeQuery(
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
SelectStreamFactory & stream_factory,
const StorageID & storage_id,
const Block & header,
QueryProcessingStage::Enum processed_stage,
const ASTPtr & query_ast,
ContextPtr context,
std::shared_ptr<const StorageLimitsList> storage_limits);

View File

@ -412,17 +412,12 @@ JoinTreeQueryPlan buildQueryPlanForParallelReplicas(
Block header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, context, SelectQueryOptions(processed_stage).analyze());
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
{},
processed_stage);
QueryPlan query_plan;
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
select_stream_factory,
StorageID::createEmpty(),
header,
processed_stage,
modified_query_ast,
context,
storage_limits);

View File

@ -360,6 +360,7 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B
ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
@ -372,6 +373,7 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
: ISourceStep(DataStream{.header = std::move(header_)})
, cluster(cluster_)
, query_ast(query_ast_)
, storage_id(storage_id_)
, coordinator(std::move(coordinator_))
, stage(std::move(stage_))
, context(context_)
@ -419,7 +421,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
all_replicas_count = shard.getAllNodeCount();
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())
{
@ -452,7 +453,6 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
processor->setStorageLimits(storage_limits);
pipeline.init(std::move(pipe));
}
@ -488,6 +488,7 @@ void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);
remote_query_executor->setMainTable(storage_id);
pipes.emplace_back(createRemoteSourcePipe(std::move(remote_query_executor), add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header);

View File

@ -69,6 +69,7 @@ public:
ReadFromParallelRemoteReplicasStep(
ASTPtr query_ast_,
ClusterPtr cluster_,
const StorageID & storage_id_,
ParallelReplicasReadingCoordinatorPtr coordinator_,
Block header_,
QueryProcessingStage::Enum stage_,
@ -91,6 +92,7 @@ private:
ClusterPtr cluster;
ASTPtr query_ast;
StorageID storage_id;
ParallelReplicasReadingCoordinatorPtr coordinator;
QueryProcessingStage::Enum stage;
ContextMutablePtr context;

View File

@ -2,7 +2,6 @@
#if defined(OS_LINUX)
#include <mutex>
#include <atomic>
#include <Common/Fiber.h>
#include <Common/TimerDescriptor.h>

View File

@ -235,16 +235,11 @@ void StorageMergeTree::read(
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
select_stream_factory,
getStorageID(),
header,
processed_stage,
modified_query_ast,
local_context,
query_info.storage_limits);

View File

@ -5449,7 +5449,7 @@ void StorageReplicatedMergeTree::read(
return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams);
if (local_context->canUseParallelReplicasOnInitiator())
return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage);
return readParallelReplicasImpl(query_plan, column_names, query_info, local_context, processed_stage);
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, max_block_size, num_streams);
}
@ -5476,13 +5476,13 @@ void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl(
void StorageReplicatedMergeTree::readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & /*column_names*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage)
{
ASTPtr modified_query_ast;
Block header;
const auto table_id = getStorageID();
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
@ -5496,22 +5496,17 @@ void StorageReplicatedMergeTree::readParallelReplicasImpl(
}
else
{
const auto table_id = getStorageID();
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan,
select_stream_factory,
table_id,
header,
processed_stage,
modified_query_ast,
local_context,
query_info.storage_limits);

View File

@ -567,7 +567,6 @@ private:
void readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage);