Add ability to pass table for connections checks per-shard to ReadFromRemote

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-06-23 14:19:08 +02:00
parent 67095d2150
commit b222476090
4 changed files with 8 additions and 2 deletions

View File

@ -124,6 +124,7 @@ void SelectStreamFactory::createForShard(
{ {
remote_shards.emplace_back(Shard{ remote_shards.emplace_back(Shard{
.query = query_ast, .query = query_ast,
.main_table = main_table,
.header = header, .header = header,
.shard_info = shard_info, .shard_info = shard_info,
.lazy = lazy, .lazy = lazy,

View File

@ -50,6 +50,8 @@ public:
{ {
/// Query and header may be changed depending on shard. /// Query and header may be changed depending on shard.
ASTPtr query; ASTPtr query;
/// Used to check the table existence on remote node
StorageID main_table;
Block header; Block header;
Cluster::ShardInfo shard_info; Cluster::ShardInfo shard_info;

View File

@ -162,7 +162,9 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
if (my_table_func_ptr) if (my_table_func_ptr)
try_results = my_shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY); try_results = my_shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else else
try_results = my_shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, my_main_table.getQualifiedName()); try_results = my_shard.shard_info.pool->getManyChecked(
timeouts, &current_settings, PoolMode::GET_MANY,
my_shard.main_table ? my_shard.main_table.getQualifiedName() : my_main_table.getQualifiedName());
} }
catch (const Exception & ex) catch (const Exception & ex)
{ {
@ -241,7 +243,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
remote_query_executor->setPoolMode(PoolMode::GET_MANY); remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr) if (!table_func_ptr)
remote_query_executor->setMainTable(main_table); remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending)); pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending));
addConvertingActions(pipes.back(), output_stream->header); addConvertingActions(pipes.back(), output_stream->header);

View File

@ -22,6 +22,7 @@ using ThrottlerPtr = std::shared_ptr<Throttler>;
class ReadFromRemote final : public ISourceStep class ReadFromRemote final : public ISourceStep
{ {
public: public:
/// @param main_table_ if Shards contains main_table then this parameter will be ignored
ReadFromRemote( ReadFromRemote(
ClusterProxy::SelectStreamFactory::Shards shards_, ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_, Block header_,