Add setting async_socket_for_remote

This commit is contained in:
Nikolai Kochetov 2020-12-18 16:15:03 +03:00
parent a860f128f5
commit c7ef57c6fd
6 changed files with 36 additions and 25 deletions

View File

@ -405,6 +405,8 @@ class IColumn;
\
M(Bool, use_antlr_parser, false, "Parse incoming queries using ANTLR-generated parser", 0) \
\
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \

View File

@ -199,9 +199,10 @@ Block RemoteQueryExecutor::read()
}
}
#if defined(OS_LINUX)
std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext> & read_context)
{
#if defined(OS_LINUX)
if (!sent_query)
{
sendQuery();
@ -236,8 +237,10 @@ std::variant<Block, int> RemoteQueryExecutor::read(std::unique_ptr<ReadContext>
}
}
while (true);
}
#else
return read();
#endif
}
std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
{

View File

@ -59,11 +59,10 @@ public:
/// Read next block of data. Returns empty block if query is finished.
Block read();
#if defined(OS_LINUX)
/// Async variant of read. Returns ready block or file descriptor which may be used for polling.
/// ReadContext is an internal read state. Pass empty ptr first time, reuse created one for every call.
std::variant<Block, int> read(std::unique_ptr<ReadContext> & read_context);
#endif
/// Receive all remain packets and finish query.
/// It should be cancelled after read returned empty block.
void finish(std::unique_ptr<ReadContext> * read_context = nullptr);

View File

@ -126,6 +126,7 @@ void SelectStreamFactory::createForShard(
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
bool add_extremes = false;
bool async_read = context_ptr->getSettingsRef().async_socket_for_remote;
if (processed_stage == QueryProcessingStage::Complete)
{
add_totals = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
@ -153,7 +154,7 @@ void SelectStreamFactory::createForShard(
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes));
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
remote_pipes.back().addInterpreterContext(context_ptr);
};
@ -249,7 +250,7 @@ void SelectStreamFactory::createForShard(
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = header, modified_query_ast,
&context, context_ptr, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes]()
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes, async_read]()
-> Pipe
{
auto current_settings = context.getSettingsRef();
@ -295,7 +296,7 @@ void SelectStreamFactory::createForShard(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), modified_query, header, context, throttler, scalars, external_tables, stage);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
}
};

View File

@ -7,9 +7,10 @@
namespace DB
{
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_)
RemoteSource::RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_)
: SourceWithProgress(executor->getHeader(), false)
, add_aggregation_info(add_aggregation_info_), query_executor(std::move(executor))
, async_read(async_read_)
{
/// Add AggregatedChunkInfo if we expect DataTypeAggregateFunction as a result.
const auto & sample = getPort().getHeader();
@ -65,21 +66,25 @@ std::optional<Chunk> RemoteSource::tryGenerate()
was_query_sent = true;
}
#if defined(OS_LINUX)
auto res = query_executor->read(read_context);
if (std::holds_alternative<int>(res))
Block block;
if (async_read)
{
fd = std::get<int>(res);
is_async_state = true;
return Chunk();
auto res = query_executor->read(read_context);
if (std::holds_alternative<int>(res))
{
fd = std::get<int>(res);
is_async_state = true;
return Chunk();
}
is_async_state = false;
block = std::get<Block>(std::move(res));
}
is_async_state = false;
auto block = std::get<Block>(std::move(res));
#else
auto block = query_executor->read();
#endif
else
block = query_executor->read();
if (!block)
{
@ -161,9 +166,9 @@ Chunk RemoteExtremesSource::generate()
Pipe createRemoteSourcePipe(
RemoteQueryExecutorPtr query_executor,
bool add_aggregation_info, bool add_totals, bool add_extremes)
bool add_aggregation_info, bool add_totals, bool add_extremes, bool async_read)
{
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info));
Pipe pipe(std::make_shared<RemoteSource>(query_executor, add_aggregation_info, async_read));
if (add_totals)
pipe.addTotalsSource(std::make_shared<RemoteTotalsSource>(query_executor));

View File

@ -20,7 +20,7 @@ public:
/// Flag add_aggregation_info tells if AggregatedChunkInfo should be added to result chunk.
/// AggregatedChunkInfo stores the bucket number used for two-level aggregation.
/// This flag should be typically enabled for queries with GROUP BY which are executed till WithMergeableState.
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_);
RemoteSource(RemoteQueryExecutorPtr executor, bool add_aggregation_info_, bool async_read_);
~RemoteSource() override;
Status prepare() override;
@ -44,6 +44,7 @@ private:
RemoteQueryExecutorPtr query_executor;
RowsBeforeLimitCounterPtr rows_before_limit;
const bool async_read;
bool is_async_state = false;
std::unique_ptr<RemoteQueryExecutorReadContext> read_context;
int fd = -1;
@ -84,6 +85,6 @@ private:
/// Create pipe with remote sources.
Pipe createRemoteSourcePipe(
RemoteQueryExecutorPtr query_executor,
bool add_aggregation_info, bool add_totals, bool add_extremes);
bool add_aggregation_info, bool add_totals, bool add_extremes, bool async_read);
}