Remove leftovers of old _shard_num via identifier implementation

This commit is contained in:
Azat Khuzhin 2022-01-10 21:21:24 +03:00
parent 97acf190ed
commit 1637c41d42
3 changed files with 10 additions and 24 deletions

View File

@ -35,11 +35,9 @@ namespace ClusterProxy
SelectStreamFactory::SelectStreamFactory( SelectStreamFactory::SelectStreamFactory(
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_)
bool has_virtual_shard_num_column_) : header(header_)
: header(header_), , processed_stage{processed_stage_}
processed_stage{processed_stage_},
has_virtual_shard_num_column(has_virtual_shard_num_column_)
{ {
} }
@ -102,19 +100,15 @@ void SelectStreamFactory::createForShard(
Shards & remote_shards, Shards & remote_shards,
UInt32 shard_count) UInt32 shard_count)
{ {
auto modified_query_ast = query_ast->clone();
if (has_virtual_shard_num_column)
VirtualColumnUtils::rewriteEntityInAst(modified_query_ast, "_shard_num", shard_info.shard_num, "toUInt32");
auto emplace_local_stream = [&]() auto emplace_local_stream = [&]()
{ {
local_plans.emplace_back(createLocalPlan(modified_query_ast, header, context, processed_stage, shard_info.shard_num, shard_count)); local_plans.emplace_back(createLocalPlan(query_ast, header, context, processed_stage, shard_info.shard_num, shard_count));
}; };
auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0) auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0)
{ {
remote_shards.emplace_back(Shard{ remote_shards.emplace_back(Shard{
.query = modified_query_ast, .query = query_ast,
.header = header, .header = header,
.shard_num = shard_info.shard_num, .shard_num = shard_info.shard_num,
.num_replicas = shard_info.getAllNodeCount(), .num_replicas = shard_info.getAllNodeCount(),

View File

@ -16,8 +16,7 @@ class SelectStreamFactory final : public IStreamFactory
public: public:
SelectStreamFactory( SelectStreamFactory(
const Block & header_, const Block & header_,
QueryProcessingStage::Enum processed_stage_, QueryProcessingStage::Enum processed_stage_);
bool has_virtual_shard_num_column_);
void createForShard( void createForShard(
const Cluster::ShardInfo & shard_info, const Cluster::ShardInfo & shard_info,
@ -32,8 +31,6 @@ public:
private: private:
const Block header; const Block header;
QueryProcessingStage::Enum processed_stage; QueryProcessingStage::Enum processed_stage;
bool has_virtual_shard_num_column = false;
}; };
} }

View File

@ -308,7 +308,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()), NameAndTypePair("_part_uuid", std::make_shared<DataTypeUUID>()),
NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()), NameAndTypePair("_partition_id", std::make_shared<DataTypeString>()),
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()), NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), /// deprecated
}; };
} }
@ -605,8 +605,8 @@ Pipe StorageDistributed::read(
void StorageDistributed::read( void StorageDistributed::read(
QueryPlan & query_plan, QueryPlan & query_plan,
const Names & column_names, const Names &,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr &,
SelectQueryInfo & query_info, SelectQueryInfo & query_info,
ContextPtr local_context, ContextPtr local_context,
QueryProcessingStage::Enum processed_stage, QueryProcessingStage::Enum processed_stage,
@ -635,10 +635,6 @@ void StorageDistributed::read(
return; return;
} }
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", metadata_snapshot))
has_virtual_shard_num_column = false;
StorageID main_table = StorageID::createEmpty(); StorageID main_table = StorageID::createEmpty();
if (!remote_table_function_ptr) if (!remote_table_function_ptr)
main_table = StorageID{remote_database, remote_table}; main_table = StorageID{remote_database, remote_table};
@ -646,8 +642,7 @@ void StorageDistributed::read(
ClusterProxy::SelectStreamFactory select_stream_factory = ClusterProxy::SelectStreamFactory select_stream_factory =
ClusterProxy::SelectStreamFactory( ClusterProxy::SelectStreamFactory(
header, header,
processed_stage, processed_stage);
has_virtual_shard_num_column);
ClusterProxy::executeQuery( ClusterProxy::executeQuery(
query_plan, header, processed_stage, query_plan, header, processed_stage,