dbms: added calculation of min/max [#CONV-8691].

This commit is contained in:
Alexey Milovidov 2013-09-07 02:03:13 +00:00
parent 35a6b621f4
commit c69f353a17
22 changed files with 291 additions and 46 deletions

View File

@ -152,7 +152,6 @@ private:
SharedPtr<Exception> receiveException(); SharedPtr<Exception> receiveException();
Progress receiveProgress(); Progress receiveProgress();
BlockStreamProfileInfo receiveProfileInfo(); BlockStreamProfileInfo receiveProfileInfo();
Block receiveTotals();
void initBlockInput(); void initBlockInput();
}; };

View File

@ -32,4 +32,4 @@
#define DBMS_MIN_REVISION_WITH_PROFILING_PACKET 32029 #define DBMS_MIN_REVISION_WITH_PROFILING_PACKET 32029
#define DBMS_MIN_REVISION_WITH_HEADER_BLOCK 32881 #define DBMS_MIN_REVISION_WITH_HEADER_BLOCK 32881
#define DBMS_MIN_REVISION_WITH_USER_PASSWORD 34482 #define DBMS_MIN_REVISION_WITH_USER_PASSWORD 34482
#define DBMS_MIN_REVISION_WITH_TOTALS 35265 #define DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES 35265

View File

@ -63,6 +63,7 @@ namespace Protocol
EndOfStream = 5, /// Все пакеты были переданы. EndOfStream = 5, /// Все пакеты были переданы.
ProfileInfo = 6, /// Пакет с профайлинговой информацией. ProfileInfo = 6, /// Пакет с профайлинговой информацией.
Totals = 7, /// Блок данных с тотальными значениями, со сжатием или без. Totals = 7, /// Блок данных с тотальными значениями, со сжатием или без.
Extremes = 8, /// Блок данных с минимумами и максимумами, аналогично.
}; };
/** NOTE: Если бы в качестве типа агрумента функции был бы Enum, то сравнение packet >= 0 && packet < 7 /** NOTE: Если бы в качестве типа агрумента функции был бы Enum, то сравнение packet >= 0 && packet < 7
@ -71,8 +72,8 @@ namespace Protocol
*/ */
inline const char * toString(UInt64 packet) inline const char * toString(UInt64 packet)
{ {
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo" }; static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", "Extremes" };
return packet >= 0 && packet < 7 return packet >= 0 && packet < 9
? data[packet] ? data[packet]
: "Unknown packet"; : "Unknown packet";
} }

View File

@ -20,6 +20,7 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit); void setRowsBeforeLimit(size_t rows_before_limit);
void setTotals(const Block & totals); void setTotals(const Block & totals);
void setExtremes(const Block & extremes);
private: private:
RowOutputStreamPtr row_output; RowOutputStreamPtr row_output;

View File

@ -36,6 +36,7 @@ public:
*/ */
virtual void setRowsBeforeLimit(size_t rows_before_limit) {} virtual void setRowsBeforeLimit(size_t rows_before_limit) {}
virtual void setTotals(const Block & totals) {} virtual void setTotals(const Block & totals) {}
virtual void setExtremes(const Block & extremes) {}
virtual ~IBlockOutputStream() {} virtual ~IBlockOutputStream() {}

View File

@ -70,7 +70,7 @@ class IProfilingBlockInputStream : public IBlockInputStream
{ {
public: public:
IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr()) IProfilingBlockInputStream(StoragePtr owned_storage_ = StoragePtr())
: IBlockInputStream(owned_storage_), is_cancelled(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {} : IBlockInputStream(owned_storage_), is_cancelled(false), enabled_extremes(false), quota(NULL), quota_mode(QUOTA_READ), prev_elapsed(0) {}
Block read(); Block read();
@ -79,6 +79,8 @@ public:
/// Получить "тотальные" значения. Берёт их из себя или из первого дочернего источника, в котором они есть. Их может не быть. /// Получить "тотальные" значения. Берёт их из себя или из первого дочернего источника, в котором они есть. Их может не быть.
const Block & getTotals() const; const Block & getTotals() const;
/// То же самое для минимумов и максимумов.
const Block & getExtremes() const;
/** Установить колбэк прогресса выполнения. /** Установить колбэк прогресса выполнения.
@ -150,15 +152,22 @@ public:
quota_mode = quota_mode_; quota_mode = quota_mode_;
} }
/// Включить рассчёт минимумов и максимумов по столбцам результата.
void enableExtremes() { enabled_extremes = true; }
protected: protected:
BlockStreamProfileInfo info; BlockStreamProfileInfo info;
volatile bool is_cancelled; volatile bool is_cancelled;
ProgressCallback progress_callback; ProgressCallback progress_callback;
bool enabled_extremes;
/// Дополнительная информация, которая может образоваться в процессе работы. /// Дополнительная информация, которая может образоваться в процессе работы.
/// Тотальные значения при агрегации. /// Тотальные значения при агрегации.
Block totals; Block totals;
/// Минимумы и максимумы. Первая строчка блока - минимумы, вторая - максимумы.
Block extremes;
/// Ограничения и квоты. /// Ограничения и квоты.
@ -170,6 +179,11 @@ protected:
/// Наследники должны реализовать эту функцию. /// Наследники должны реализовать эту функцию.
virtual Block readImpl() = 0; virtual Block readImpl() = 0;
void updateExtremes(Block & block);
bool checkLimits();
void checkQuota(Block & block);
}; };
} }

View File

@ -36,6 +36,7 @@ public:
*/ */
virtual void setRowsBeforeLimit(size_t rows_before_limit) {} virtual void setRowsBeforeLimit(size_t rows_before_limit) {}
virtual void setTotals(const Block & totals) {} virtual void setTotals(const Block & totals) {}
virtual void setExtremes(const Block & extremes) {}
virtual ~IRowOutputStream() {} virtual ~IRowOutputStream() {}
}; };

View File

@ -25,6 +25,7 @@ public:
protected: protected:
void writeTotals(); void writeTotals();
void writeExtremes();
}; };
} }

View File

@ -32,15 +32,14 @@ public:
rows_before_limit = rows_before_limit_; rows_before_limit = rows_before_limit_;
} }
void setTotals(const Block & totals_) void setTotals(const Block & totals_) { totals = totals_; }
{ void setExtremes(const Block & extremes_) { extremes = extremes_; }
totals = totals_;
}
protected: protected:
void writeRowsBeforeLimitAtLeast(); void writeRowsBeforeLimitAtLeast();
virtual void writeTotals(); virtual void writeTotals();
virtual void writeExtremes();
typedef std::vector<NameAndTypePair> NamesAndTypesVector; typedef std::vector<NameAndTypePair> NamesAndTypesVector;
@ -51,6 +50,7 @@ protected:
size_t rows_before_limit; size_t rows_before_limit;
NamesAndTypesVector fields; NamesAndTypesVector fields;
Block totals; Block totals;
Block extremes;
}; };
} }

View File

@ -123,6 +123,7 @@ public:
case Protocol::Server::Progress: case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo: case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals: case Protocol::Server::Totals:
case Protocol::Server::Extremes:
break; break;
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
@ -186,6 +187,10 @@ protected:
totals = packet.block; totals = packet.block;
break; break;
case Protocol::Server::Extremes:
extremes = packet.block;
break;
default: default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER); throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
} }

View File

@ -37,9 +37,11 @@ struct Settings
/// Максимальное количество попыток соединения с репликами. /// Максимальное количество попыток соединения с репликами.
size_t connections_with_failover_max_tries; size_t connections_with_failover_max_tries;
/** Переписывать запросы SELECT из CollapsingMergeTree с агрегатными функциями /** Переписывать запросы SELECT из CollapsingMergeTree с агрегатными функциями
* для автоматического учета поля Sign * для автоматического учета поля Sign
*/ */
bool sign_rewrite; bool sign_rewrite;
/// Считать минимумы и максимумы столбцов результата. Они могут выводиться в JSON-форматах.
bool extremes;
/// Всевозможные ограничения на выполнение запроса. /// Всевозможные ограничения на выполнение запроса.
Limits limits; Limits limits;
@ -58,7 +60,7 @@ struct Settings
poll_interval(DBMS_DEFAULT_POLL_INTERVAL), poll_interval(DBMS_DEFAULT_POLL_INTERVAL),
distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE), distributed_connections_pool_size(DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE),
connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES), connections_with_failover_max_tries(DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES),
sign_rewrite(false) sign_rewrite(false), extremes(false)
{ {
} }

View File

@ -675,6 +675,10 @@ private:
onTotals(packet.block); onTotals(packet.block);
return true; return true;
case Protocol::Server::Extremes:
onExtremes(packet.block);
return true;
case Protocol::Server::Exception: case Protocol::Server::Exception:
onException(*packet.exception); onException(*packet.exception);
last_exception = packet.exception; last_exception = packet.exception;
@ -755,6 +759,11 @@ private:
block_std_out->setTotals(block); block_std_out->setTotals(block);
} }
void onExtremes(Block & block)
{
block_std_out->setExtremes(block);
}
void onProgress(const Progress & progress) void onProgress(const Progress & progress)
{ {

View File

@ -289,7 +289,13 @@ Connection::Packet Connection::receivePacket()
return res; return res;
case Protocol::Server::Totals: case Protocol::Server::Totals:
res.block = receiveTotals(); /// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета.
res.block = receiveData();
return res;
case Protocol::Server::Extremes:
/// Аналогично.
res.block = receiveData();
return res; return res;
case Protocol::Server::EndOfStream: case Protocol::Server::EndOfStream:
@ -364,10 +370,4 @@ BlockStreamProfileInfo Connection::receiveProfileInfo()
} }
Block Connection::receiveTotals()
{
/// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета.
return receiveData();
}
} }

View File

@ -43,4 +43,9 @@ void BlockOutputStreamFromRowOutputStream::setTotals(const Block & totals)
row_output->setTotals(totals); row_output->setTotals(totals);
} }
void BlockOutputStreamFromRowOutputStream::setExtremes(const Block & extremes)
{
row_output->setExtremes(extremes);
}
} }

View File

@ -3,6 +3,7 @@
/*#include <Poco/Mutex.h> /*#include <Poco/Mutex.h>
#include <Poco/Ext/ThreadNumber.h>*/ #include <Poco/Ext/ThreadNumber.h>*/
#include <DB/Columns/ColumnConst.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h> #include <DB/DataStreams/IProfilingBlockInputStream.h>
@ -174,7 +175,18 @@ Block IProfilingBlockInputStream::read()
}*/ }*/
if (res) if (res)
{
info.update(res); info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (!checkLimits())
return Block();
if (quota != NULL)
checkQuota(res);
}
else else
{ {
/** Если поток закончился, то ещё попросим всех детей прервать выполнение. /** Если поток закончился, то ещё попросим всех детей прервать выполнение.
@ -188,6 +200,63 @@ Block IProfilingBlockInputStream::read()
progress(res); progress(res);
return res;
}
void IProfilingBlockInputStream::updateExtremes(Block & block)
{
size_t columns = block.columns();
if (!extremes)
{
extremes = block.cloneEmpty();
for (size_t i = 0; i < columns; ++i)
{
Field min_value;
Field max_value;
block.getByPosition(i).column->getExtremes(min_value, max_value);
ColumnPtr & column = extremes.getByPosition(i).column;
if (column->isConst())
column = dynamic_cast<const IColumnConst &>(*column).convertToFullColumn();
column->insert(min_value);
column->insert(max_value);
}
}
else
{
for (size_t i = 0; i < columns; ++i)
{
ColumnPtr & column = extremes.getByPosition(i).column;
Field min_value = (*column)[0];
Field max_value = (*column)[1];
Field cur_min_value;
Field cur_max_value;
block.getByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
column = column->cloneEmpty();
column->insert(min_value);
column->insert(max_value);
}
}
}
bool IProfilingBlockInputStream::checkLimits()
{
/// Проверка ограничений. /// Проверка ограничений.
if ((limits.max_rows_to_read && info.rows > limits.max_rows_to_read) if ((limits.max_rows_to_read && info.rows > limits.max_rows_to_read)
|| (limits.max_bytes_to_read && info.bytes > limits.max_bytes_to_read)) || (limits.max_bytes_to_read && info.bytes > limits.max_bytes_to_read))
@ -198,7 +267,7 @@ Block IProfilingBlockInputStream::read()
ErrorCodes::TOO_MUCH_ROWS); ErrorCodes::TOO_MUCH_ROWS);
if (limits.read_overflow_mode == Limits::BREAK) if (limits.read_overflow_mode == Limits::BREAK)
return Block(); return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
} }
@ -212,7 +281,7 @@ Block IProfilingBlockInputStream::read()
ErrorCodes::TIMEOUT_EXCEEDED); ErrorCodes::TIMEOUT_EXCEEDED);
if (limits.timeout_overflow_mode == Limits::BREAK) if (limits.timeout_overflow_mode == Limits::BREAK)
return Block(); return false;
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR); throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
} }
@ -226,31 +295,31 @@ Block IProfilingBlockInputStream::read()
ErrorCodes::TOO_SLOW); ErrorCodes::TOO_SLOW);
} }
/// Проверка квоты. return true;
if (quota != NULL) }
void IProfilingBlockInputStream::checkQuota(Block & block)
{
time_t current_time = time(0);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
switch (quota_mode)
{ {
time_t current_time = time(0); case QUOTA_READ:
double total_elapsed = info.total_stopwatch.elapsedSeconds(); quota->checkAndAddReadRowsBytes(current_time, block.rows(), block.bytes());
break;
switch (quota_mode) case QUOTA_RESULT:
{ quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
case QUOTA_READ: quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
quota->checkAndAddReadRowsBytes(current_time, res.rows(), res.bytes()); break;
break;
case QUOTA_RESULT: default:
quota->checkAndAddResultRowsBytes(current_time, res.rows(), res.bytes()); throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR);
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
break;
default:
throw Exception("Logical error: unknown quota mode.", ErrorCodes::LOGICAL_ERROR);
}
prev_elapsed = total_elapsed;
} }
return res; prev_elapsed = total_elapsed;
} }
@ -306,5 +375,23 @@ const Block & IProfilingBlockInputStream::getTotals() const
return totals; return totals;
} }
const Block & IProfilingBlockInputStream::getExtremes() const
{
if (extremes)
return extremes;
for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (const IProfilingBlockInputStream * child = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
{
const Block & res = child->getExtremes();
if (res)
return res;
}
}
return extremes;
}
} }

View File

@ -67,4 +67,42 @@ void JSONCompactRowOutputStream::writeTotals()
} }
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr)
{
writeCString("\t\t\"", ostr);
writeCString(title, ostr);
writeCString("\": [", ostr);
size_t extremes_columns = extremes.columns();
for (size_t i = 0; i < extremes_columns; ++i)
{
if (i != 0)
writeChar(',', ostr);
const ColumnWithNameAndType & column = extremes.getByPosition(i);
column.type->serializeTextJSON((*column.column)[row_num], ostr);
}
writeChar(']', ostr);
}
void JSONCompactRowOutputStream::writeExtremes()
{
if (extremes)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"extremes\":\n", ostr);
writeCString("\t{\n", ostr);
writeExtremesElement("min", extremes, 0, ostr);
writeCString(",\n", ostr);
writeExtremesElement("max", extremes, 1, ostr);
writeChar('\n', ostr);
writeCString("\t}", ostr);
}
}
} }

View File

@ -88,6 +88,7 @@ void JSONRowOutputStream::writeSuffix()
writeCString("\t]", ostr); writeCString("\t]", ostr);
writeTotals(); writeTotals();
writeExtremes();
writeCString(",\n\n", ostr); writeCString(",\n\n", ostr);
writeCString("\t\"rows\": ", ostr); writeCString("\t\"rows\": ", ostr);
@ -138,4 +139,48 @@ void JSONRowOutputStream::writeTotals()
} }
} }
static void writeExtremesElement(const char * title, const Block & extremes, size_t row_num, WriteBuffer & ostr)
{
writeCString("\t\t\"", ostr);
writeCString(title, ostr);
writeCString("\":\n", ostr);
writeCString("\t\t{\n", ostr);
size_t extremes_columns = extremes.columns();
for (size_t i = 0; i < extremes_columns; ++i)
{
const ColumnWithNameAndType & column = extremes.getByPosition(i);
if (i != 0)
writeCString(",\n", ostr);
writeCString("\t\t\t", ostr);
writeDoubleQuotedString(column.name, ostr);
writeCString(": ", ostr);
column.type->serializeTextJSON((*column.column)[row_num], ostr);
}
writeChar('\n', ostr);
writeCString("\t\t}", ostr);
}
void JSONRowOutputStream::writeExtremes()
{
if (extremes)
{
writeCString(",\n", ostr);
writeChar('\n', ostr);
writeCString("\t\"extremes\":\n", ostr);
writeCString("\t{\n", ostr);
writeExtremesElement("min", extremes, 0, ostr);
writeCString(",\n", ostr);
writeExtremesElement("max", extremes, 1, ostr);
writeChar('\n', ostr);
writeCString("\t}", ostr);
}
}
} }

View File

@ -22,6 +22,7 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to)
to.setRowsBeforeLimit(input->getInfo().getRowsBeforeLimit()); to.setRowsBeforeLimit(input->getInfo().getRowsBeforeLimit());
to.setTotals(input->getTotals()); to.setTotals(input->getTotals());
to.setExtremes(input->getExtremes());
} }
from.readSuffix(); from.readSuffix();

View File

@ -278,6 +278,12 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Сначала выполняем DISTINCT во всех источниках. /// Сначала выполняем DISTINCT во всех источниках.
executeDistinct(streams, true); executeDistinct(streams, true);
/// На этой стадии можно считать минимумы и максимумы, если надо.
if (settings.extremes)
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
if (IProfilingBlockInputStream * stream = dynamic_cast<IProfilingBlockInputStream *>(&**it))
stream->enableExtremes();
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit. * ограничивающий число записей в каждом до offset + limit.
*/ */

View File

@ -29,6 +29,7 @@ void Settings::set(const String & name, const Field & value)
else if (name == "distributed_connections_pool_size") distributed_connections_pool_size = safeGet<UInt64>(value); else if (name == "distributed_connections_pool_size") distributed_connections_pool_size = safeGet<UInt64>(value);
else if (name == "connections_with_failover_max_tries") connections_with_failover_max_tries = safeGet<UInt64>(value); else if (name == "connections_with_failover_max_tries") connections_with_failover_max_tries = safeGet<UInt64>(value);
else if (name == "sign_rewrite") sign_rewrite = safeGet<UInt64>(value); else if (name == "sign_rewrite") sign_rewrite = safeGet<UInt64>(value);
else if (name == "extremes") extremes = safeGet<UInt64>(value);
else if (name == "profile") setProfile(get<const String &>(value)); else if (name == "profile") setProfile(get<const String &>(value));
else if (!limits.trySet(name, value)) else if (!limits.trySet(name, value))
throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING);
@ -49,7 +50,8 @@ void Settings::set(const String & name, ReadBuffer & buf)
|| name == "max_distributed_connections" || name == "max_distributed_connections"
|| name == "distributed_connections_pool_size" || name == "distributed_connections_pool_size"
|| name == "connections_with_failover_max_tries" || name == "connections_with_failover_max_tries"
|| name == "sign_rewrite") || name == "sign_rewrite"
|| name == "extremes")
{ {
UInt64 value = 0; UInt64 value = 0;
readVarUInt(value, buf); readVarUInt(value, buf);
@ -80,7 +82,8 @@ void Settings::set(const String & name, const String & value)
|| name == "max_distributed_connections" || name == "max_distributed_connections"
|| name == "distributed_connections_pool_size" || name == "distributed_connections_pool_size"
|| name == "connections_with_failover_max_tries" || name == "connections_with_failover_max_tries"
|| name == "sign_rewrite") || name == "sign_rewrite"
|| name == "extremes")
{ {
set(name, parse<UInt64>(value)); set(name, parse<UInt64>(value));
} }
@ -137,7 +140,8 @@ void Settings::serialize(WriteBuffer & buf) const
writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf); writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf);
writeStringBinary("distributed_connections_pool_size", buf); writeVarUInt(distributed_connections_pool_size, buf); writeStringBinary("distributed_connections_pool_size", buf); writeVarUInt(distributed_connections_pool_size, buf);
writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf); writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf);
writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf); writeStringBinary("sign_rewrite", buf); writeVarUInt(sign_rewrite, buf);
writeStringBinary("extremes", buf); writeVarUInt(extremes, buf);
limits.serialize(buf); limits.serialize(buf);

View File

@ -245,6 +245,7 @@ void TCPHandler::processOrdinaryQuery()
if (!block) if (!block)
{ {
sendTotals(); sendTotals();
sendExtremes();
sendProfileInfo(); sendProfileInfo();
} }
@ -275,7 +276,7 @@ void TCPHandler::sendProfileInfo()
void TCPHandler::sendTotals() void TCPHandler::sendTotals()
{ {
if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS) if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES)
return; return;
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in)) if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
@ -296,6 +297,29 @@ void TCPHandler::sendTotals()
} }
void TCPHandler::sendExtremes()
{
if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS_EXTREMES)
return;
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
{
const Block & extremes = input->getExtremes();
if (extremes)
{
initBlockOutput();
writeVarUInt(Protocol::Server::Extremes, *out);
state.block_out->write(extremes);
state.maybe_compressed_out->next();
out->next();
}
}
}
void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in) void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in)
{ {
/// Выведем информацию о том, сколько считано строк и байт. /// Выведем информацию о том, сколько считано строк и байт.

View File

@ -122,6 +122,7 @@ private:
void sendEndOfStream(); void sendEndOfStream();
void sendProfileInfo(); void sendProfileInfo();
void sendTotals(); void sendTotals();
void sendExtremes();
/// Создаёт state.block_in/block_out для чтения/записи блоков, в зависимости от того, включено ли сжатие. /// Создаёт state.block_in/block_out для чтения/записи блоков, в зависимости от того, включено ли сжатие.
void initBlockInput(); void initBlockInput();