dbms: better synchronization of sending data [#METR-2944].

This commit is contained in:
Alexey Milovidov 2013-11-02 21:18:54 +00:00
parent 790f224e56
commit 831ec21083
2 changed files with 25 additions and 40 deletions

View File

@ -72,7 +72,7 @@ void TCPHandler::runImpl()
sendHello(); sendHello();
connection_context.setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2)); connection_context.setProgressCallback(boost::bind(&TCPHandler::updateProgress, this, _1, _2));
while (1) while (1)
{ {
@ -233,14 +233,22 @@ void TCPHandler::processOrdinaryQuery()
{ {
if (isQueryCancelled()) if (isQueryCancelled())
{ {
/// Получен пакет с просьбой прекратить выполнение запроса.
async_in.cancel(); async_in.cancel();
break; break;
} }
else if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000)) else if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000))
{ {
/// Есть следующий блок результата.
block = async_in.read(); block = async_in.read();
break; break;
} }
else if (state.rows_processed && after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay)
{
/// Прошло некоторое время, пока нет следующего блока результата, но есть прогресс.
after_send_progress.restart();
sendProgress();
}
} }
/// Если закончились данные, то отправим данные профайлинга и тотальные значения до /// Если закончились данные, то отправим данные профайлинга и тотальные значения до
@ -271,8 +279,6 @@ void TCPHandler::sendProfileInfo()
if (client_revision < DBMS_MIN_REVISION_WITH_PROFILING_PACKET) if (client_revision < DBMS_MIN_REVISION_WITH_PROFILING_PACKET)
return; return;
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in)) if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
{ {
writeVarUInt(Protocol::Server::ProfileInfo, *out); writeVarUInt(Protocol::Server::ProfileInfo, *out);
@ -287,8 +293,6 @@ void TCPHandler::sendTotals()
if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES)
return; return;
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(&*state.io.in)) if (IProfilingBlockInputStream * input = dynamic_cast<IProfilingBlockInputStream *>(&*state.io.in))
{ {
const Block & totals = input->getTotals(); const Block & totals = input->getTotals();
@ -312,8 +316,6 @@ void TCPHandler::sendExtremes()
if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES)
return; return;
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in)) if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
{ {
const Block & extremes = input->getExtremes(); const Block & extremes = input->getExtremes();
@ -557,8 +559,6 @@ bool TCPHandler::isQueryCancelled()
void TCPHandler::sendData(Block & block) void TCPHandler::sendData(Block & block)
{ {
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
initBlockOutput(); initBlockOutput();
writeVarUInt(Protocol::Server::Data, *out); writeVarUInt(Protocol::Server::Data, *out);
@ -571,8 +571,6 @@ void TCPHandler::sendData(Block & block)
void TCPHandler::sendException(const Exception & e) void TCPHandler::sendException(const Exception & e)
{ {
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
writeVarUInt(Protocol::Server::Exception, *out); writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out); writeException(e, *out);
out->next(); out->next();
@ -581,41 +579,28 @@ void TCPHandler::sendException(const Exception & e)
void TCPHandler::sendEndOfStream() void TCPHandler::sendEndOfStream()
{ {
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
state.sent_all_data = true; state.sent_all_data = true;
writeVarUInt(Protocol::Server::EndOfStream, *out); writeVarUInt(Protocol::Server::EndOfStream, *out);
out->next(); out->next();
} }
void TCPHandler::sendProgress(size_t rows, size_t bytes) void TCPHandler::updateProgress(size_t rows, size_t bytes)
{ {
/// Не отправляем прогресс, если сейчас отправляются данные. __sync_fetch_and_add(&state.rows_processed, rows);
Poco::ScopedTry<Poco::FastMutex> lock; __sync_fetch_and_add(&state.bytes_processed, bytes);
}
if (!lock.lock(&send_mutex))
return;
state.rows_processed += rows; void TCPHandler::sendProgress()
state.bytes_processed += bytes; {
size_t rows_processed = __sync_fetch_and_and(&state.rows_processed, 0);
/// Не будем отправлять прогресс после того, как отправлены все данные. size_t bytes_processed = __sync_fetch_and_and(&state.bytes_processed, 0);
if (state.sent_all_data)
return;
if (after_send_progress.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay)
return;
after_send_progress.restart();
writeVarUInt(Protocol::Server::Progress, *out); writeVarUInt(Protocol::Server::Progress, *out);
Progress progress(state.rows_processed, state.bytes_processed); Progress progress(rows_processed, bytes_processed);
progress.write(*out); progress.write(*out);
out->next(); out->next();
state.rows_processed = 0;
state.bytes_processed = 0;
} }

View File

@ -46,8 +46,8 @@ struct QueryState
bool sent_all_data; bool sent_all_data;
/// Для вывода прогресса - разница после предыдущей отправки прогресса. /// Для вывода прогресса - разница после предыдущей отправки прогресса.
size_t rows_processed; volatile size_t rows_processed;
size_t bytes_processed; volatile size_t bytes_processed;
QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable), QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable),
@ -89,9 +89,6 @@ private:
SharedPtr<ReadBufferFromPocoSocket> in; SharedPtr<ReadBufferFromPocoSocket> in;
SharedPtr<WriteBufferFromPocoSocket> out; SharedPtr<WriteBufferFromPocoSocket> out;
/// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока).
Poco::FastMutex send_mutex;
/// Время после последней проверки остановки запроса и отправки прогресса. /// Время после последней проверки остановки запроса и отправки прогресса.
Stopwatch after_check_cancelled; Stopwatch after_check_cancelled;
Stopwatch after_send_progress; Stopwatch after_send_progress;
@ -118,7 +115,7 @@ private:
void sendHello(); void sendHello();
void sendData(Block & block); /// Записать в сеть блок. void sendData(Block & block); /// Записать в сеть блок.
void sendException(const Exception & e); void sendException(const Exception & e);
void sendProgress(size_t rows, size_t bytes); void sendProgress();
void sendEndOfStream(); void sendEndOfStream();
void sendProfileInfo(); void sendProfileInfo();
void sendTotals(); void sendTotals();
@ -130,6 +127,9 @@ private:
bool isQueryCancelled(); bool isQueryCancelled();
/// Эта функция вызывается из разных потоков.
void updateProgress(size_t rows, size_t bytes);
/// Вывести информацию о скорости выполнения SELECT запроса. /// Вывести информацию о скорости выполнения SELECT запроса.
void logProfileInfo(Stopwatch & watch, IBlockInputStream & in); void logProfileInfo(Stopwatch & watch, IBlockInputStream & in);
}; };