diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index cbfe703a843..5ed6b09e6f0 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -72,7 +72,7 @@ void TCPHandler::runImpl() sendHello(); - connection_context.setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2)); + connection_context.setProgressCallback(boost::bind(&TCPHandler::updateProgress, this, _1, _2)); while (1) { @@ -233,14 +233,22 @@ void TCPHandler::processOrdinaryQuery() { if (isQueryCancelled()) { + /// Получен пакет с просьбой прекратить выполнение запроса. async_in.cancel(); break; } else if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000)) { + /// Есть следующий блок результата. block = async_in.read(); break; } + else if (state.rows_processed && after_send_progress.elapsed() / 1000 >= query_context.getSettingsRef().interactive_delay) + { + /// Прошло некоторое время, пока нет следующего блока результата, но есть прогресс. + after_send_progress.restart(); + sendProgress(); + } } /// Если закончились данные, то отправим данные профайлинга и тотальные значения до @@ -271,8 +279,6 @@ void TCPHandler::sendProfileInfo() if (client_revision < DBMS_MIN_REVISION_WITH_PROFILING_PACKET) return; - Poco::ScopedLock lock(send_mutex); - if (const IProfilingBlockInputStream * input = dynamic_cast(&*state.io.in)) { writeVarUInt(Protocol::Server::ProfileInfo, *out); @@ -287,8 +293,6 @@ void TCPHandler::sendTotals() if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) return; - Poco::ScopedLock lock(send_mutex); - if (IProfilingBlockInputStream * input = dynamic_cast(&*state.io.in)) { const Block & totals = input->getTotals(); @@ -312,8 +316,6 @@ void TCPHandler::sendExtremes() if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) return; - Poco::ScopedLock lock(send_mutex); - if (const IProfilingBlockInputStream * input = dynamic_cast(&*state.io.in)) { const Block & extremes = input->getExtremes(); @@ -557,8 +559,6 @@ bool TCPHandler::isQueryCancelled() void TCPHandler::sendData(Block & block) { - Poco::ScopedLock lock(send_mutex); - initBlockOutput(); writeVarUInt(Protocol::Server::Data, *out); @@ -571,8 +571,6 @@ void TCPHandler::sendData(Block & block) void TCPHandler::sendException(const Exception & e) { - Poco::ScopedLock lock(send_mutex); - writeVarUInt(Protocol::Server::Exception, *out); writeException(e, *out); out->next(); @@ -581,41 +579,28 @@ void TCPHandler::sendException(const Exception & e) void TCPHandler::sendEndOfStream() { - Poco::ScopedLock lock(send_mutex); - state.sent_all_data = true; writeVarUInt(Protocol::Server::EndOfStream, *out); out->next(); } -void TCPHandler::sendProgress(size_t rows, size_t bytes) +void TCPHandler::updateProgress(size_t rows, size_t bytes) { - /// Не отправляем прогресс, если сейчас отправляются данные. - Poco::ScopedTry lock; + __sync_fetch_and_add(&state.rows_processed, rows); + __sync_fetch_and_add(&state.bytes_processed, bytes); +} - if (!lock.lock(&send_mutex)) - return; - state.rows_processed += rows; - state.bytes_processed += bytes; - - /// Не будем отправлять прогресс после того, как отправлены все данные. - if (state.sent_all_data) - return; - - if (after_send_progress.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay) - return; - - after_send_progress.restart(); +void TCPHandler::sendProgress() +{ + size_t rows_processed = __sync_fetch_and_and(&state.rows_processed, 0); + size_t bytes_processed = __sync_fetch_and_and(&state.bytes_processed, 0); writeVarUInt(Protocol::Server::Progress, *out); - Progress progress(state.rows_processed, state.bytes_processed); + Progress progress(rows_processed, bytes_processed); progress.write(*out); out->next(); - - state.rows_processed = 0; - state.bytes_processed = 0; } diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 038f146e824..ca70eff4238 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -46,9 +46,9 @@ struct QueryState bool sent_all_data; /// Для вывода прогресса - разница после предыдущей отправки прогресса. - size_t rows_processed; - size_t bytes_processed; - + volatile size_t rows_processed; + volatile size_t bytes_processed; + QueryState() : query_id(0), stage(QueryProcessingStage::Complete), compression(Protocol::Compression::Disable), is_cancelled(false), sent_all_data(false), rows_processed(0), bytes_processed(0) {} @@ -89,9 +89,6 @@ private: SharedPtr in; SharedPtr out; - /// Для сериализации пакетов "данные" и "прогресс" (пакет типа "прогресс" может отправляться из другого потока). - Poco::FastMutex send_mutex; - /// Время после последней проверки остановки запроса и отправки прогресса. Stopwatch after_check_cancelled; Stopwatch after_send_progress; @@ -118,7 +115,7 @@ private: void sendHello(); void sendData(Block & block); /// Записать в сеть блок. void sendException(const Exception & e); - void sendProgress(size_t rows, size_t bytes); + void sendProgress(); void sendEndOfStream(); void sendProfileInfo(); void sendTotals(); @@ -130,6 +127,9 @@ private: bool isQueryCancelled(); + /// Эта функция вызывается из разных потоков. + void updateProgress(size_t rows, size_t bytes); + /// Вывести информацию о скорости выполнения SELECT запроса. void logProfileInfo(Stopwatch & watch, IBlockInputStream & in); };