diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index fef6fde2958..e286a254f0e 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -152,7 +152,6 @@ private: SharedPtr receiveException(); Progress receiveProgress(); BlockStreamProfileInfo receiveProfileInfo(); - Block receiveTotals(); void initBlockInput(); }; diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index 62d1b20ef38..8c0ea6ef5fd 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -32,4 +32,4 @@ #define DBMS_MIN_REVISION_WITH_PROFILING_PACKET 32029 #define DBMS_MIN_REVISION_WITH_HEADER_BLOCK 32881 #define DBMS_MIN_REVISION_WITH_USER_PASSWORD 34482 -#define DBMS_MIN_REVISION_WITH_TOTALS 35265 +#define DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES 35265 diff --git a/dbms/include/DB/Core/Protocol.h b/dbms/include/DB/Core/Protocol.h index d84aedcf748..8bcb5822da5 100644 --- a/dbms/include/DB/Core/Protocol.h +++ b/dbms/include/DB/Core/Protocol.h @@ -63,6 +63,7 @@ namespace Protocol EndOfStream = 5, /// Все пакеты были переданы. ProfileInfo = 6, /// Пакет с профайлинговой информацией. Totals = 7, /// Блок данных с тотальными значениями, со сжатием или без. + Extremes = 8, /// Блок данных с минимумами и максимумами, аналогично. }; /** NOTE: Если бы в качестве типа агрумента функции был бы Enum, то сравнение packet >= 0 && packet < 7 @@ -71,8 +72,8 @@ namespace Protocol */ inline const char * toString(UInt64 packet) { - static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo" }; - return packet >= 0 && packet < 7 + static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", "Extremes" }; + return packet >= 0 && packet < 9 ? data[packet] : "Unknown packet"; } diff --git a/dbms/include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h b/dbms/include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h index 449f9a2b4ee..70213b5d2d4 100644 --- a/dbms/include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h +++ b/dbms/include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h @@ -20,6 +20,7 @@ public: void setRowsBeforeLimit(size_t rows_before_limit); void setTotals(const Block & totals); + void setExtremes(const Block & extremes); private: RowOutputStreamPtr row_output; diff --git a/dbms/include/DB/DataStreams/IBlockOutputStream.h b/dbms/include/DB/DataStreams/IBlockOutputStream.h index 3a76436e57e..f9100557d1f 100644 --- a/dbms/include/DB/DataStreams/IBlockOutputStream.h +++ b/dbms/include/DB/DataStreams/IBlockOutputStream.h @@ -36,6 +36,7 @@ public: */ virtual void setRowsBeforeLimit(size_t rows_before_limit) {} virtual void setTotals(const Block & totals) {} + virtual void setExtremes(const Block & extremes) {} virtual ~IBlockOutputStream() {} diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index 20a6714525f..7f283d05888 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -70,7 +70,7 @@ class IProfilingBlockInputStream : public IBlockInputStream { public: IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) - : IBlockInputStream(owned_storage_), is_cancelled(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {} + : IBlockInputStream(owned_storage_), is_cancelled(false), enabled_extremes(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {} Block read(); @@ -79,6 +79,8 @@ public: /// Получить "тотальные" значения. Берёт их из себя или из первого дочернего источника, в котором они есть. Их может не быть. const Block & getTotals() const; + /// То же самое для минимумов и максимумов. + const Block & getExtremes() const; /** Установить колбэк прогресса выполнения. @@ -150,15 +152,22 @@ public: quota_mode = quota_mode_; } + /// Включить рассчёт минимумов и максимумов по столбцам результата. + void enableExtremes() { enabled_extremes = true; } + protected: BlockStreamProfileInfo info; volatile bool is_cancelled; ProgressCallback progress_callback; + bool enabled_extremes; + /// Дополнительная информация, которая может образоваться в процессе работы. /// Тотальные значения при агрегации. Block totals; + /// Минимумы и максимумы. Первая строчка блока - минимумы, вторая - максимумы. + Block extremes; /// Ограничения и квоты. @@ -170,6 +179,11 @@ protected: /// Наследники должны реализовать эту функцию. virtual Block readImpl() = 0; + + + void updateExtremes(Block & block); + bool checkLimits(); + void checkQuota(Block & block); }; } diff --git a/dbms/include/DB/DataStreams/IRowOutputStream.h b/dbms/include/DB/DataStreams/IRowOutputStream.h index 394578f86a9..d58fb50ae64 100644 --- a/dbms/include/DB/DataStreams/IRowOutputStream.h +++ b/dbms/include/DB/DataStreams/IRowOutputStream.h @@ -36,6 +36,7 @@ public: */ virtual void setRowsBeforeLimit(size_t rows_before_limit) {} virtual void setTotals(const Block & totals) {} + virtual void setExtremes(const Block & extremes) {} virtual ~IRowOutputStream() {} }; diff --git a/dbms/include/DB/DataStreams/JSONCompactRowOutputStream.h b/dbms/include/DB/DataStreams/JSONCompactRowOutputStream.h index 6f1f9263efe..c1db0a9c59e 100644 --- a/dbms/include/DB/DataStreams/JSONCompactRowOutputStream.h +++ b/dbms/include/DB/DataStreams/JSONCompactRowOutputStream.h @@ -25,6 +25,7 @@ public: protected: void writeTotals(); + void writeExtremes(); }; } diff --git a/dbms/include/DB/DataStreams/JSONRowOutputStream.h b/dbms/include/DB/DataStreams/JSONRowOutputStream.h index 1606a74e85b..d993edc6905 100644 --- a/dbms/include/DB/DataStreams/JSONRowOutputStream.h +++ b/dbms/include/DB/DataStreams/JSONRowOutputStream.h @@ -32,15 +32,14 @@ public: rows_before_limit = rows_before_limit_; } - void setTotals(const Block & totals_) - { - totals = totals_; - } + void setTotals(const Block & totals_) { totals = totals_; } + void setExtremes(const Block & extremes_) { extremes = extremes_; } protected: void writeRowsBeforeLimitAtLeast(); virtual void writeTotals(); + virtual void writeExtremes(); typedef std::vector NamesAndTypesVector; @@ -51,6 +50,7 @@ protected: size_t rows_before_limit; NamesAndTypesVector fields; Block totals; + Block extremes; }; } diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index 05a1acc3825..4db3eb54385 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -123,6 +123,7 @@ public: case Protocol::Server::Progress: case Protocol::Server::ProfileInfo: case Protocol::Server::Totals: + case Protocol::Server::Extremes: break; case Protocol::Server::EndOfStream: @@ -186,6 +187,10 @@ protected: totals = packet.block; break; + case Protocol::Server::Extremes: + extremes = packet.block; + break; + default: throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); } diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 4533df6808f..25fc6ea6923 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -37,9 +37,11 @@ struct Settings /// Максимальное количество попыток соединения с репликами. size_t connections_with_failover_max_tries; /** Переписывать запросы SELECT из CollapsingMergeTree с агрегатными функциями - * для автоматического учета поля Sign - */ + * для автоматического учета поля Sign + */ bool sign_rewrite; + /// Считать минимумы и максимумы столбцов результата. Они могут выводиться в JSON-форматах. + bool extremes; /// Всевозможные ограничения на выполнение запроса. Limits limits; @@ -58,7 +60,7 @@ struct Settings poll_interval(DBMS_DEFAULT_POLL_INTERVAL), distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE), connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES), - sign_rewrite(false) + sign_rewrite(false), extremes(false) { } diff --git a/dbms/src/Client/Client.cpp b/dbms/src/Client/Client.cpp index 569dabdd18c..4655d4be980 100644 --- a/dbms/src/Client/Client.cpp +++ b/dbms/src/Client/Client.cpp @@ -675,6 +675,10 @@ private: onTotals(packet.block); return true; + case Protocol::Server::Extremes: + onExtremes(packet.block); + return true; + case Protocol::Server::Exception: onException(*packet.exception); last_exception = packet.exception; @@ -755,6 +759,11 @@ private: block_std_out->setTotals(block); } + void onExtremes(Block & block) + { + block_std_out->setExtremes(block); + } + void onProgress(const Progress & progress) { diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index e7c152a444a..2f62abbf7e3 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -289,7 +289,13 @@ Connection::Packet Connection::receivePacket() return res; case Protocol::Server::Totals: - res.block = receiveTotals(); + /// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета. + res.block = receiveData(); + return res; + + case Protocol::Server::Extremes: + /// Аналогично. + res.block = receiveData(); return res; case Protocol::Server::EndOfStream: @@ -364,10 +370,4 @@ BlockStreamProfileInfo Connection::receiveProfileInfo() } -Block Connection::receiveTotals() -{ - /// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета. - return receiveData(); -} - } diff --git a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp index be985614f45..bdbfab5548d 100644 --- a/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp +++ b/dbms/src/DataStreams/BlockOutputStreamFromRowOutputStream.cpp @@ -43,4 +43,9 @@ void BlockOutputStreamFromRowOutputStream::setTotals(const Block & totals) row_output->setTotals(totals); } +void BlockOutputStreamFromRowOutputStream::setExtremes(const Block & extremes) +{ + row_output->setExtremes(extremes); +} + } diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index b1492364b1d..21f33ba647e 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -3,6 +3,7 @@ /*#include #include */ +#include #include @@ -174,7 +175,18 @@ Block IProfilingBlockInputStream::read() }*/ if (res) + { info.update(res); + + if (enabled_extremes) + updateExtremes(res); + + if (!checkLimits()) + return Block(); + + if (quota != NULL) + checkQuota(res); + } else { /** Если поток закончился, то ещё попросим всех детей прервать выполнение. @@ -188,6 +200,63 @@ Block IProfilingBlockInputStream::read() progress(res); + return res; +} + + +void IProfilingBlockInputStream::updateExtremes(Block & block) +{ + size_t columns = block.columns(); + + if (!extremes) + { + extremes = block.cloneEmpty(); + + for (size_t i = 0; i < columns; ++i) + { + Field min_value; + Field max_value; + + block.getByPosition(i).column->getExtremes(min_value, max_value); + + ColumnPtr & column = extremes.getByPosition(i).column; + + if (column->isConst()) + column = dynamic_cast(*column).convertToFullColumn(); + + column->insert(min_value); + column->insert(max_value); + } + } + else + { + for (size_t i = 0; i < columns; ++i) + { + ColumnPtr & column = extremes.getByPosition(i).column; + + Field min_value = (*column)[0]; + Field max_value = (*column)[1]; + + Field cur_min_value; + Field cur_max_value; + + block.getByPosition(i).column->getExtremes(cur_min_value, cur_max_value); + + if (cur_min_value < min_value) + min_value = cur_min_value; + if (cur_max_value > max_value) + max_value = cur_max_value; + + column = column->cloneEmpty(); + column->insert(min_value); + column->insert(max_value); + } + } +} + + +bool IProfilingBlockInputStream::checkLimits() +{ /// Проверка ограничений. if ((limits.max_rows_to_read && info.rows > limits.max_rows_to_read) || (limits.max_bytes_to_read && info.bytes > limits.max_bytes_to_read)) @@ -198,7 +267,7 @@ Block IProfilingBlockInputStream::read() ErrorCodes::TOO_MUCH_ROWS); if (limits.read_overflow_mode == Limits::BREAK) - return Block(); + return false; throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); } @@ -212,7 +281,7 @@ Block IProfilingBlockInputStream::read() ErrorCodes::TIMEOUT_EXCEEDED); if (limits.timeout_overflow_mode == Limits::BREAK) - return Block(); + return false; throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); } @@ -226,31 +295,31 @@ Block IProfilingBlockInputStream::read() ErrorCodes::TOO_SLOW); } - /// Проверка квоты. - if (quota != NULL) + return true; +} + + +void IProfilingBlockInputStream::checkQuota(Block & block) +{ + time_t current_time = time(0); + double total_elapsed = info.total_stopwatch.elapsedSeconds(); + + switch (quota_mode) { - time_t current_time = time(0); - double total_elapsed = info.total_stopwatch.elapsedSeconds(); + case QUOTA_READ: + quota->checkAndAddReadRowsBytes(current_time, block.rows(), block.bytes()); + break; - switch (quota_mode) - { - case QUOTA_READ: - quota->checkAndAddReadRowsBytes(current_time, res.rows(), res.bytes()); - break; + case QUOTA_RESULT: + quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes()); + quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0)); + break; - case QUOTA_RESULT: - quota->checkAndAddResultRowsBytes(current_time, res.rows(), res.bytes()); - quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0)); - break; - - default: - throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR); - } - - prev_elapsed = total_elapsed; + default: + throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR); } - - return res; + + prev_elapsed = total_elapsed; } @@ -306,5 +375,23 @@ const Block & IProfilingBlockInputStream::getTotals() const return totals; } +const Block & IProfilingBlockInputStream::getExtremes() const +{ + if (extremes) + return extremes; + + for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it) + { + if (const IProfilingBlockInputStream * child = dynamic_cast(&**it)) + { + const Block & res = child->getExtremes(); + if (res) + return res; + } + } + + return extremes; +} + } diff --git a/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp b/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp index 3bbef181689..41874dede7b 100644 --- a/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp +++ b/dbms/src/DataStreams/JSONCompactRowOutputStream.cpp @@ -67,4 +67,42 @@ void JSONCompactRowOutputStream::writeTotals() } +static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr) +{ + writeCString("\t\t\"", ostr); + writeCString(title, ostr); + writeCString("\": [", ostr); + + size_t extremes_columns = extremes.columns(); + for (size_t i = 0; i < extremes_columns; ++i) + { + if (i != 0) + writeChar(',', ostr); + + const ColumnWithNameAndType & column = extremes.getByPosition(i); + column.type->serializeTextJSON((*column.column)[row_num], ostr); + } + + writeChar(']', ostr); +} + +void JSONCompactRowOutputStream::writeExtremes() +{ + if (extremes) + { + writeCString(",\n", ostr); + writeChar('\n', ostr); + writeCString("\t\"extremes\":\n", ostr); + writeCString("\t{\n", ostr); + + writeExtremesElement("min", extremes, 0, ostr); + writeCString(",\n", ostr); + writeExtremesElement("max", extremes, 1, ostr); + + writeChar('\n', ostr); + writeCString("\t}", ostr); + } +} + + } diff --git a/dbms/src/DataStreams/JSONRowOutputStream.cpp b/dbms/src/DataStreams/JSONRowOutputStream.cpp index 664e01919a9..d0128204a83 100644 --- a/dbms/src/DataStreams/JSONRowOutputStream.cpp +++ b/dbms/src/DataStreams/JSONRowOutputStream.cpp @@ -88,6 +88,7 @@ void JSONRowOutputStream::writeSuffix() writeCString("\t]", ostr); writeTotals(); + writeExtremes(); writeCString(",\n\n", ostr); writeCString("\t\"rows\": ", ostr); @@ -138,4 +139,48 @@ void JSONRowOutputStream::writeTotals() } } + +static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr) +{ + writeCString("\t\t\"", ostr); + writeCString(title, ostr); + writeCString("\":\n", ostr); + writeCString("\t\t{\n", ostr); + + size_t extremes_columns = extremes.columns(); + for (size_t i = 0; i < extremes_columns; ++i) + { + const ColumnWithNameAndType & column = extremes.getByPosition(i); + + if (i != 0) + writeCString(",\n", ostr); + + writeCString("\t\t\t", ostr); + writeDoubleQuotedString(column.name, ostr); + writeCString(": ", ostr); + column.type->serializeTextJSON((*column.column)[row_num], ostr); + } + + writeChar('\n', ostr); + writeCString("\t\t}", ostr); +} + +void JSONRowOutputStream::writeExtremes() +{ + if (extremes) + { + writeCString(",\n", ostr); + writeChar('\n', ostr); + writeCString("\t\"extremes\":\n", ostr); + writeCString("\t{\n", ostr); + + writeExtremesElement("min", extremes, 0, ostr); + writeCString(",\n", ostr); + writeExtremesElement("max", extremes, 1, ostr); + + writeChar('\n', ostr); + writeCString("\t}", ostr); + } +} + } diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index a604599fdc7..8249e28e2aa 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -22,6 +22,7 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to) to.setRowsBeforeLimit(input->getInfo().getRowsBeforeLimit()); to.setTotals(input->getTotals()); + to.setExtremes(input->getExtremes()); } from.readSuffix(); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 9939d135d5c..a1d1436a70b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -278,6 +278,12 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() /// Сначала выполняем DISTINCT во всех источниках. executeDistinct(streams, true); + /// На этой стадии можно считать минимумы и максимумы, если надо. + if (settings.extremes) + for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it) + if (IProfilingBlockInputStream * stream = dynamic_cast(&**it)) + stream->enableExtremes(); + /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, * ограничивающий число записей в каждом до offset + limit. */ diff --git a/dbms/src/Interpreters/Settings.cpp b/dbms/src/Interpreters/Settings.cpp index 32750c22559..47c2d028022 100644 --- a/dbms/src/Interpreters/Settings.cpp +++ b/dbms/src/Interpreters/Settings.cpp @@ -29,6 +29,7 @@ void Settings::set(const String & name, const Field & value) else if (name == "distributed_connections_pool_size") distributed_connections_pool_size = safeGet(value); else if (name == "connections_with_failover_max_tries") connections_with_failover_max_tries = safeGet(value); else if (name == "sign_rewrite") sign_rewrite = safeGet(value); + else if (name == "extremes") extremes = safeGet(value); else if (name == "profile") setProfile(get(value)); else if (!limits.trySet(name, value)) throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); @@ -49,7 +50,8 @@ void Settings::set(const String & name, ReadBuffer & buf) || name == "max_distributed_connections" || name == "distributed_connections_pool_size" || name == "connections_with_failover_max_tries" - || name == "sign_rewrite") + || name == "sign_rewrite" + || name == "extremes") { UInt64 value = 0; readVarUInt(value, buf); @@ -80,7 +82,8 @@ void Settings::set(const String & name, const String & value) || name == "max_distributed_connections" || name == "distributed_connections_pool_size" || name == "connections_with_failover_max_tries" - || name == "sign_rewrite") + || name == "sign_rewrite" + || name == "extremes") { set(name, parse(value)); } @@ -137,7 +140,8 @@ void Settings::serialize(WriteBuffer & buf) const writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf); writeStringBinary("distributed_connections_pool_size", buf); writeVarUInt(distributed_connections_pool_size, buf); writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf); - writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf); + writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf); + writeStringBinary("extremes", buf); writeVarUInt(extremes, buf); limits.serialize(buf); diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 5bb5769b23a..5376839c59f 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -245,6 +245,7 @@ void TCPHandler::processOrdinaryQuery() if (!block) { sendTotals(); + sendExtremes(); sendProfileInfo(); } @@ -275,7 +276,7 @@ void TCPHandler::sendProfileInfo() void TCPHandler::sendTotals() { - if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS) + if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) return; if (const IProfilingBlockInputStream * input = dynamic_cast(&*state.io.in)) @@ -296,6 +297,29 @@ void TCPHandler::sendTotals() } +void TCPHandler::sendExtremes() +{ + if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES) + return; + + if (const IProfilingBlockInputStream * input = dynamic_cast(&*state.io.in)) + { + const Block & extremes = input->getExtremes(); + + if (extremes) + { + initBlockOutput(); + + writeVarUInt(Protocol::Server::Extremes, *out); + + state.block_out->write(extremes); + state.maybe_compressed_out->next(); + out->next(); + } + } +} + + void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in) { /// Выведем информацию о том, сколько считано строк и байт. diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 7897293c973..d42469d3d4f 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -122,6 +122,7 @@ private: void sendEndOfStream(); void sendProfileInfo(); void sendTotals(); + void sendExtremes(); /// Создаёт state.block_in/block_out для чтения/записи блоков, в зависимости от того, включено ли сжатие. void initBlockInput();