Merge pull request #8143 from Akazz/race_condition/process_ordinary_query

Resolved data race in DB::BlockStreamProfileInfo::calculateRowsBeforeLimit()
This commit is contained in:
alexey-milovidov 2019-12-12 15:32:47 +03:00 committed by GitHub
commit 604a31bb66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -484,31 +484,22 @@ void TCPHandler::processOrdinaryQuery()
/// Pull query execution result, if exists, and send it to network.
if (state.io.in)
{
/// Send header-block, to allow client to prepare output format for data to send.
{
Block header = state.io.in->getHeader();
if (header)
/// This allows the client to prepare output format
if (Block header = state.io.in->getHeader())
sendData(header);
}
/// Use of async mode here enables reporting progress and monitoring client cancelling the query
AsynchronousBlockInputStream async_in(state.io.in);
async_in.readPrefix();
while (true)
{
Block block;
while (true)
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
async_in.cancel(false);
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
@ -520,44 +511,36 @@ void TCPHandler::processOrdinaryQuery()
if (async_in.poll(query_context->getSettingsRef().interactive_delay / 1000))
{
/// There is the following result block.
block = async_in.read();
const auto block = async_in.read();
if (!block)
break;
}
}
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
if (!state.io.null_format)
sendData(block);
}
}
async_in.readSuffix();
/** When the data has run out, we send the profiling data and totals up to the terminating empty block,
* so that this information can be used in the suffix output of stream.
* If the request has been interrupted, then sendTotals and other methods should not be called,
* because we have not read all the data.
*/
if (!block && !isQueryCancelled())
if (!isQueryCancelled())
{
/// Wait till inner thread finish to avoid possible race with getTotals.
async_in.waitInnerThread();
sendTotals(state.io.in->getTotals());
sendExtremes(state.io.in->getExtremes());
sendProfileInfo(state.io.in->getProfileInfo());
sendProgress();
sendLogs();
}
if (!block || !state.io.null_format)
sendData(block);
if (!block)
break;
}
async_in.readSuffix();
sendData({});
}
state.io.onFinish();
}
void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
auto & pipeline = state.io.pipeline;