Split StorageReplicatedMergeTree reading methods

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-06-20 17:31:45 +02:00
parent 323128df6f
commit 4a33e027c5
2 changed files with 119 additions and 54 deletions

View File

@ -4902,67 +4902,102 @@ void StorageReplicatedMergeTree::read(
snapshot_data.alter_conversions = {};
});
/** The `select_sequential_consistency` setting has two meanings:
* 1. To throw an exception if on a replica there are not all parts which have been written down on quorum of remaining replicas.
* 2. Do not read parts that have not yet been written to the quorum of the replicas.
* For this you have to synchronously go to ZooKeeper.
*/
if (local_context->getSettingsRef().select_sequential_consistency)
{
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
if (auto plan = reader.read(
column_names, storage_snapshot, query_info, local_context,
max_block_size, num_streams, processed_stage, std::move(max_added_blocks), /*enable_parallel_reading*/false))
query_plan = std::move(*plan);
return;
}
const auto & settings = local_context->getSettingsRef();
/// The `select_sequential_consistency` setting has two meanings:
/// 1. To throw an exception if on a replica there are not all parts which have been written down on quorum of remaining replicas.
/// 2. Do not read parts that have not yet been written to the quorum of the replicas.
/// For this you have to synchronously go to ZooKeeper.
if (settings.select_sequential_consistency)
return readLocalSequentialConsistencyImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
if (local_context->canUseParallelReplicasOnInitiator())
return readParallelReplicasImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
readLocalImpl(query_plan, column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);
}
void StorageReplicatedMergeTree::readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
auto max_added_blocks = std::make_shared<ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock>(getMaxAddedBlocks());
auto plan = reader.read(column_names, storage_snapshot, query_info, local_context,
max_block_size, num_streams, processed_stage, std::move(max_added_blocks),
/* enable_parallel_reading= */false);
if (plan)
query_plan = std::move(*plan);
}
void StorageReplicatedMergeTree::readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & /*column_names*/,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t /*max_block_size*/,
const size_t /*num_streams*/)
{
auto table_id = getStorageID();
auto parallel_replicas_cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto table_id = getStorageID();
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
ASTPtr modified_query_ast;
Block header;
if (local_context->getSettingsRef().allow_experimental_analyzer)
{
auto modified_query_tree = buildQueryTreeForShard(query_info, query_info.query_tree);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
{
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
auto cluster = local_context->getCluster(local_context->getSettingsRef().cluster_for_parallel_replicas);
ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(), /*remove_table_function_ptr*/ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info, cluster);
header = InterpreterSelectQueryAnalyzer::getSampleBlock(
modified_query_tree, local_context, SelectQueryOptions(processed_stage).analyze());
modified_query_ast = queryNodeToSelectQuery(modified_query_tree);
}
else
{
if (auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage, nullptr, /*enable_parallel_reading*/local_context->canUseParallelReplicasOnFollower()))
query_plan = std::move(*plan);
modified_query_ast = ClusterProxy::rewriteSelectQuery(local_context, query_info.query,
table_id.database_name, table_id.table_name, /*remote_table_function_ptr*/nullptr);
header
= InterpreterSelectQuery(modified_query_ast, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory(
header,
{},
storage_snapshot,
processed_stage);
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, getStorageID(),
/* table_func_ptr= */ nullptr,
select_stream_factory, modified_query_ast,
local_context, query_info, parallel_replicas_cluster);
}
void StorageReplicatedMergeTree::readLocalImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const size_t num_streams)
{
auto plan = reader.read(
column_names, storage_snapshot, query_info,
local_context, max_block_size, num_streams,
processed_stage,
/* max_block_numbers_to_read= */ nullptr,
/* enable_parallel_reading= */ local_context->canUseParallelReplicasOnFollower());
if (plan)
query_plan = std::move(*plan);
}
template <class Func>

View File

@ -130,7 +130,7 @@ public:
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
@ -513,6 +513,36 @@ private:
static std::optional<QueryPipeline> distributedWriteFromClusterStorage(const std::shared_ptr<IStorageCluster> & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
void readLocalImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);
void readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);
void readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams);
template <class Func>
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;