mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
remove shard multiplexing code, simplify [#CLICKHOUSE-3151]
This commit is contained in:
parent
133be4d739
commit
ee457eca8a
@ -34,15 +34,6 @@ RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverP
|
||||
init(settings_);
|
||||
}
|
||||
|
||||
RemoteBlockInputStream::RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_,
|
||||
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
|
||||
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
|
||||
: pools(std::move(pools_)), query(query_), throttler(throttler_), external_tables(external_tables_),
|
||||
stage(stage_), context(context_)
|
||||
{
|
||||
init(settings_);
|
||||
}
|
||||
|
||||
RemoteBlockInputStream::~RemoteBlockInputStream()
|
||||
{
|
||||
/** If interrupted in the middle of the loop of communication with replicas, then interrupt
|
||||
@ -233,10 +224,6 @@ void RemoteBlockInputStream::createMultiplexedConnections()
|
||||
multiplexed_connections = std::make_unique<MultiplexedConnections>(
|
||||
*pool, multiplexed_connections_settings, throttler,
|
||||
append_extra_info, pool_mode, main_table_ptr);
|
||||
else if (!pools.empty())
|
||||
multiplexed_connections = std::make_unique<MultiplexedConnections>(
|
||||
pools, multiplexed_connections_settings, throttler,
|
||||
append_extra_info, pool_mode, main_table_ptr);
|
||||
else
|
||||
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
|
@ -30,11 +30,6 @@ public:
|
||||
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
/// Takes a pool for each shard and gets one or several connections from it
|
||||
RemoteBlockInputStream(ConnectionPoolWithFailoverPtrs && pools_, const String & query_, const Settings * settings_,
|
||||
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
|
||||
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
|
||||
|
||||
~RemoteBlockInputStream() override;
|
||||
|
||||
/// Specify how we allocate connections on a shard.
|
||||
@ -101,9 +96,6 @@ private:
|
||||
/// One shard's connections pool
|
||||
ConnectionPoolWithFailoverPtr pool = nullptr;
|
||||
|
||||
/// Connections pools of one or several shards
|
||||
ConnectionPoolWithFailoverPtrs pools;
|
||||
|
||||
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
|
||||
|
||||
const String query;
|
||||
|
@ -6,54 +6,35 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr PoolMode pool_mode = PoolMode::GET_ONE;
|
||||
|
||||
}
|
||||
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
BlockInputStreamPtr AlterStreamFactory::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
|
||||
void AlterStreamFactory::createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res)
|
||||
{
|
||||
/// The ALTER query may be a resharding query that is a part of a distributed
|
||||
/// job. Since the latter heavily relies on synchronization among its participating
|
||||
/// nodes, it is very important to defer the execution of a local query so as
|
||||
/// to prevent any deadlock.
|
||||
auto interpreter = std::make_shared<InterpreterAlterQuery>(query_ast, context);
|
||||
auto stream = std::make_shared<LazyBlockInputStream>(
|
||||
[interpreter]() mutable
|
||||
{
|
||||
return interpreter->execute().in;
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr AlterStreamFactory::createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler);
|
||||
stream->setPoolMode(pool_mode);
|
||||
return stream;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr AlterStreamFactory::createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler);
|
||||
stream->setPoolMode(pool_mode);
|
||||
return stream;
|
||||
}
|
||||
|
||||
PoolMode AlterStreamFactory::getPoolMode() const
|
||||
{
|
||||
return pool_mode;
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
/// The ALTER query may be a resharding query that is a part of a distributed
|
||||
/// job. Since the latter heavily relies on synchronization among its participating
|
||||
/// nodes, it is very important to defer the execution of a local query so as
|
||||
/// to prevent any deadlock.
|
||||
auto interpreter = std::make_shared<InterpreterAlterQuery>(query_ast, context);
|
||||
res.emplace_back(std::make_shared<LazyBlockInputStream>(
|
||||
[interpreter]() mutable
|
||||
{
|
||||
return interpreter->execute().in;
|
||||
}));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, &context.getSettingsRef(), context, throttler);
|
||||
stream->setPoolMode(PoolMode::GET_ONE);
|
||||
res.emplace_back(std::move(stream));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -13,14 +13,11 @@ class AlterStreamFactory final : public IStreamFactory
|
||||
public:
|
||||
AlterStreamFactory() = default;
|
||||
|
||||
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
PoolMode getPoolMode() const override;
|
||||
virtual void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -10,8 +10,6 @@ namespace DB
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr PoolMode pool_mode = PoolMode::GET_ALL;
|
||||
|
||||
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
|
||||
{
|
||||
BlockExtraInfo block_extra_info;
|
||||
@ -28,46 +26,31 @@ BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
BlockInputStreamPtr DescribeStreamFactory::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
|
||||
void DescribeStreamFactory::createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res)
|
||||
{
|
||||
InterpreterDescribeQuery interpreter{query_ast, context};
|
||||
BlockInputStreamPtr stream = interpreter.execute().in;
|
||||
|
||||
/** Materialization is needed, since from remote servers the constants come materialized.
|
||||
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
|
||||
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
|
||||
*/
|
||||
BlockInputStreamPtr materialized_stream = std::make_shared<MaterializingBlockInputStream>(stream);
|
||||
|
||||
return std::make_shared<BlockExtraInfoInputStream>(materialized_stream, toBlockExtraInfo(address));
|
||||
}
|
||||
|
||||
BlockInputStreamPtr DescribeStreamFactory::createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler);
|
||||
stream->setPoolMode(pool_mode);
|
||||
stream->appendExtraInfo();
|
||||
return stream;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr DescribeStreamFactory::createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler);
|
||||
stream->setPoolMode(pool_mode);
|
||||
stream->appendExtraInfo();
|
||||
return stream;
|
||||
}
|
||||
|
||||
PoolMode DescribeStreamFactory::getPoolMode() const
|
||||
{
|
||||
return pool_mode;
|
||||
}
|
||||
for (const Cluster::Address & local_address : shard_info.local_addresses)
|
||||
{
|
||||
InterpreterDescribeQuery interpreter{query_ast, context};
|
||||
BlockInputStreamPtr stream = interpreter.execute().in;
|
||||
|
||||
/** Materialization is needed, since from remote servers the constants come materialized.
|
||||
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
|
||||
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
|
||||
*/
|
||||
BlockInputStreamPtr materialized_stream = std::make_shared<MaterializingBlockInputStream>(stream);
|
||||
res.emplace_back(std::make_shared<BlockExtraInfoInputStream>(materialized_stream, toBlockExtraInfo(local_address)));
|
||||
}
|
||||
|
||||
auto remote_stream = std::make_shared<RemoteBlockInputStream>(
|
||||
shard_info.pool, query, &context.getSettingsRef(), context, throttler);
|
||||
remote_stream->setPoolMode(PoolMode::GET_ALL);
|
||||
remote_stream->appendExtraInfo();
|
||||
res.emplace_back(std::move(remote_stream));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -13,14 +13,11 @@ class DescribeStreamFactory final : public IStreamFactory
|
||||
public:
|
||||
DescribeStreamFactory() = default;
|
||||
|
||||
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
PoolMode getPoolMode() const override;
|
||||
virtual void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res) override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,18 +23,11 @@ class IStreamFactory
|
||||
public:
|
||||
virtual ~IStreamFactory() {}
|
||||
|
||||
/// Create an input stream for local query execution.
|
||||
virtual BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) = 0;
|
||||
/// Create an input stream for remote query execution on one shard.
|
||||
virtual BlockInputStreamPtr createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) = 0;
|
||||
/// Create an input stream for remote query execution on one or more shards.
|
||||
virtual BlockInputStreamPtr createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & new_settings, ThrottlerPtr throttler, const Context & context) = 0;
|
||||
/// Specify how we allocate connections on a shard.
|
||||
virtual PoolMode getPoolMode() const = 0;
|
||||
virtual void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast, const Context & context,
|
||||
const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -6,13 +6,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
constexpr PoolMode pool_mode = PoolMode::GET_MANY;
|
||||
|
||||
}
|
||||
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
@ -26,43 +19,31 @@ SelectStreamFactory::SelectStreamFactory(
|
||||
{
|
||||
}
|
||||
|
||||
BlockInputStreamPtr SelectStreamFactory::createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address)
|
||||
void SelectStreamFactory::createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res)
|
||||
{
|
||||
InterpreterSelectQuery interpreter{query_ast, context, processed_stage};
|
||||
BlockInputStreamPtr stream = interpreter.execute().in;
|
||||
if (shard_info.isLocal())
|
||||
{
|
||||
InterpreterSelectQuery interpreter{query_ast, context, processed_stage};
|
||||
BlockInputStreamPtr stream = interpreter.execute().in;
|
||||
|
||||
/** Materialization is needed, since from remote servers the constants come materialized.
|
||||
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
|
||||
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
|
||||
*/
|
||||
return std::make_shared<MaterializingBlockInputStream>(stream);
|
||||
}
|
||||
|
||||
BlockInputStreamPtr SelectStreamFactory::createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(pool, query, &settings, context, throttler, external_tables, processed_stage);
|
||||
stream->setPoolMode(pool_mode);
|
||||
stream->setMainTable(main_table);
|
||||
return stream;
|
||||
}
|
||||
|
||||
BlockInputStreamPtr SelectStreamFactory::createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context)
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(std::move(pools), query, &settings, context, throttler, external_tables, processed_stage);
|
||||
stream->setPoolMode(pool_mode);
|
||||
stream->setMainTable(main_table);
|
||||
return stream;
|
||||
}
|
||||
|
||||
PoolMode SelectStreamFactory::getPoolMode() const
|
||||
{
|
||||
return pool_mode;
|
||||
/** Materialization is needed, since from remote servers the constants come materialized.
|
||||
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
|
||||
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
|
||||
*/
|
||||
res.emplace_back(std::make_shared<MaterializingBlockInputStream>(stream));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, &context.getSettingsRef(), context, throttler, external_tables, processed_stage);
|
||||
stream->setPoolMode(PoolMode::GET_MANY);
|
||||
stream->setMainTable(main_table);
|
||||
res.emplace_back(std::move(stream));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,14 +18,11 @@ public:
|
||||
QualifiedTableName main_table,
|
||||
const Tables & external_tables);
|
||||
|
||||
BlockInputStreamPtr createLocal(const ASTPtr & query_ast, const Context & context, const Cluster::Address & address) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
const ConnectionPoolWithFailoverPtr & pool, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
BlockInputStreamPtr createRemote(
|
||||
ConnectionPoolWithFailoverPtrs && pools, const std::string & query,
|
||||
const Settings & settings, ThrottlerPtr throttler, const Context & context) override;
|
||||
PoolMode getPoolMode() const override;
|
||||
virtual void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const String & query, const ASTPtr & query_ast,
|
||||
const Context & context, const ThrottlerPtr & throttler,
|
||||
BlockInputStreams & res) override;
|
||||
|
||||
private:
|
||||
QueryProcessingStage::Enum processed_stage;
|
||||
|
@ -16,7 +16,7 @@ namespace ClusterProxy
|
||||
|
||||
BlockInputStreams executeQuery(
|
||||
IStreamFactory & stream_factory, const ClusterPtr & cluster,
|
||||
const ASTPtr & query_ast, const Context & context, const Settings & settings, bool enable_shard_multiplexing)
|
||||
const ASTPtr & query_ast, const Context & context, const Settings & settings)
|
||||
{
|
||||
BlockInputStreams res;
|
||||
|
||||
@ -36,6 +36,9 @@ BlockInputStreams executeQuery(
|
||||
new_settings.limits.max_memory_usage_for_user.changed = false;
|
||||
new_settings.limits.max_memory_usage_for_all_queries.changed = false;
|
||||
|
||||
Context new_context(context);
|
||||
new_context.setSettings(new_settings);
|
||||
|
||||
/// Network bandwidth limit, if needed.
|
||||
ThrottlerPtr throttler;
|
||||
if (settings.limits.max_network_bandwidth || settings.limits.max_network_bytes)
|
||||
@ -44,86 +47,8 @@ BlockInputStreams executeQuery(
|
||||
settings.limits.max_network_bytes,
|
||||
"Limit for bytes to send or receive over network exceeded.");
|
||||
|
||||
/// Spread shards by threads uniformly.
|
||||
|
||||
size_t remote_count = 0;
|
||||
|
||||
if (stream_factory.getPoolMode() == PoolMode::GET_ALL)
|
||||
{
|
||||
for (const auto & shard_info : cluster->getShardsInfo())
|
||||
{
|
||||
if (shard_info.hasRemoteConnections())
|
||||
++remote_count;
|
||||
}
|
||||
}
|
||||
else
|
||||
remote_count = cluster->getRemoteShardCount();
|
||||
|
||||
size_t thread_count;
|
||||
|
||||
if (!enable_shard_multiplexing)
|
||||
thread_count = remote_count;
|
||||
else if (remote_count == 0)
|
||||
thread_count = 0;
|
||||
else if (settings.max_distributed_processing_threads == 0)
|
||||
thread_count = 1;
|
||||
else
|
||||
thread_count = std::min(remote_count, static_cast<size_t>(settings.max_distributed_processing_threads));
|
||||
|
||||
size_t pools_per_thread = (thread_count > 0) ? (remote_count / thread_count) : 0;
|
||||
size_t remainder = (thread_count > 0) ? (remote_count % thread_count) : 0;
|
||||
|
||||
ConnectionPoolWithFailoverPtrs pools;
|
||||
|
||||
/// Loop over shards.
|
||||
size_t current_thread = 0;
|
||||
for (const auto & shard_info : cluster->getShardsInfo())
|
||||
{
|
||||
bool create_local_queries = shard_info.isLocal();
|
||||
|
||||
bool create_remote_queries;
|
||||
if (stream_factory.getPoolMode() == PoolMode::GET_ALL)
|
||||
create_remote_queries = shard_info.hasRemoteConnections();
|
||||
else
|
||||
create_remote_queries = !create_local_queries;
|
||||
|
||||
if (create_local_queries)
|
||||
{
|
||||
/// Add queries to localhost (they are processed in-process, without network communication).
|
||||
|
||||
Context new_context = context;
|
||||
new_context.setSettings(new_settings);
|
||||
|
||||
for (const auto & address : shard_info.local_addresses)
|
||||
{
|
||||
BlockInputStreamPtr stream = stream_factory.createLocal(query_ast, new_context, address);
|
||||
if (stream)
|
||||
res.emplace_back(stream);
|
||||
}
|
||||
}
|
||||
|
||||
if (create_remote_queries)
|
||||
{
|
||||
size_t excess = (current_thread < remainder) ? 1 : 0;
|
||||
size_t actual_pools_per_thread = pools_per_thread + excess;
|
||||
|
||||
if (actual_pools_per_thread == 1)
|
||||
{
|
||||
res.emplace_back(stream_factory.createRemote(shard_info.pool, query, new_settings, throttler, context));
|
||||
++current_thread;
|
||||
}
|
||||
else
|
||||
{
|
||||
pools.push_back(shard_info.pool);
|
||||
if (pools.size() == actual_pools_per_thread)
|
||||
{
|
||||
res.emplace_back(stream_factory.createRemote(std::move(pools), query, new_settings, throttler, context));
|
||||
pools = ConnectionPoolWithFailoverPtrs();
|
||||
++current_thread;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stream_factory.createForShard(shard_info, query, query_ast, new_context, throttler, res);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ class IStreamFactory;
|
||||
/// (currently SELECT, DESCRIBE, or ALTER (for resharding)).
|
||||
BlockInputStreams executeQuery(
|
||||
IStreamFactory & stream_factory, const ClusterPtr & cluster,
|
||||
const ASTPtr & query_ast, const Context & context, const Settings & settings, bool enable_shard_multiplexing);
|
||||
const ASTPtr & query_ast, const Context & context, const Settings & settings);
|
||||
|
||||
}
|
||||
|
||||
|
@ -213,20 +213,11 @@ BlockInputStreams StorageDistributed::read(
|
||||
if (settings.global_subqueries_method == GlobalSubqueriesMethod::PUSH)
|
||||
external_tables = context.getExternalTables();
|
||||
|
||||
/// Disable multiplexing of shards if there is an ORDER BY without GROUP BY.
|
||||
//const ASTSelectQuery & ast = *(static_cast<const ASTSelectQuery *>(modified_query_ast.get()));
|
||||
|
||||
/** The functionality of shard_multiplexing is not completed - turn it off.
|
||||
* (Because connecting to different shards within a single thread is not done in parallel.)
|
||||
*/
|
||||
//bool enable_shard_multiplexing = !(ast.order_expression_list && !ast.group_expression_list);
|
||||
bool enable_shard_multiplexing = false;
|
||||
|
||||
ClusterProxy::SelectStreamFactory select_stream_factory(
|
||||
processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables);
|
||||
|
||||
return ClusterProxy::executeQuery(
|
||||
select_stream_factory, cluster, modified_query_ast, context, settings, enable_shard_multiplexing);
|
||||
select_stream_factory, cluster, modified_query_ast, context, settings);
|
||||
}
|
||||
|
||||
|
||||
@ -351,15 +342,10 @@ void StorageDistributed::reshardPartitions(
|
||||
|
||||
resharding_worker.registerQuery(coordinator_id, queryToString(alter_query_ptr));
|
||||
|
||||
/** The functionality of shard_multiplexing is not completed - turn it off.
|
||||
* (Because connecting to different shards within a single thread is not done in parallel.)
|
||||
*/
|
||||
bool enable_shard_multiplexing = false;
|
||||
|
||||
ClusterProxy::AlterStreamFactory alter_stream_factory;
|
||||
|
||||
BlockInputStreams streams = ClusterProxy::executeQuery(
|
||||
alter_stream_factory, cluster, alter_query_ptr, context, context.getSettingsRef(), enable_shard_multiplexing);
|
||||
alter_stream_factory, cluster, alter_query_ptr, context, context.getSettingsRef());
|
||||
|
||||
/// This callback is called if an exception has occurred while attempting to read
|
||||
/// a block from a shard. This is to avoid a potential deadlock if other shards are
|
||||
@ -429,15 +415,10 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se
|
||||
describe_query.database = remote_database;
|
||||
describe_query.table = remote_table;
|
||||
|
||||
/** The functionality of shard_multiplexing is not completed - turn it off.
|
||||
* (Because connecting connections to different shards within a single thread is not done in parallel.)
|
||||
*/
|
||||
bool enable_shard_multiplexing = false;
|
||||
|
||||
ClusterProxy::DescribeStreamFactory describe_stream_factory;
|
||||
|
||||
return ClusterProxy::executeQuery(
|
||||
describe_stream_factory, cluster, describe_query_ptr, context, settings, enable_shard_multiplexing);
|
||||
describe_stream_factory, cluster, describe_query_ptr, context, settings);
|
||||
}
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user