dbms: sending totals as separate packet [#CONV-8366].

This commit is contained in:
Alexey Milovidov 2013-09-05 20:22:43 +00:00
parent a933ed4e43
commit c7d8723a54
8 changed files with 114 additions and 35 deletions

View File

@ -152,6 +152,9 @@ private:
SharedPtr<Exception> receiveException();
Progress receiveProgress();
BlockStreamProfileInfo receiveProfileInfo();
Block receiveTotals();
void initBlockInput();
};

View File

@ -32,3 +32,4 @@
#define DBMS_MIN_REVISION_WITH_PROFILING_PACKET 32029
#define DBMS_MIN_REVISION_WITH_HEADER_BLOCK 32881
#define DBMS_MIN_REVISION_WITH_USER_PASSWORD 34482
#define DBMS_MIN_REVISION_WITH_TOTALS 35265

View File

@ -36,8 +36,9 @@ namespace DB
* но клиент всё равно должен читать все пакеты до EndOfStream.
*
* Перед пакетом EndOfStream, если есть профайлинговая информация и ревизия клиента достаточно новая,
* может быть отправлен пакет ProfileInfo. После него передаются данные профайлинга -
* сериализованная структура BlockStreamProfileInfo.
* может быть отправлен пакет Totals и/или ProfileInfo.
* Totals - блок с тотальными значениями.
* ProfileInfo - данные профайлинга - сериализованная структура BlockStreamProfileInfo.
*
* При запросах, которые возвращают данные, сервер, перед обработкой запроса,
* отправляет заголовочный блок, содержащий описание столбцов из запроса, но с нулем строк.
@ -60,7 +61,8 @@ namespace Protocol
Progress = 3, /// Прогресс выполнения запроса: строк считано, байт считано.
Pong = 4, /// Ответ на Ping.
EndOfStream = 5, /// Все пакеты были переданы.
ProfileInfo = 6, /// Пакет с профайлинговой информацией
ProfileInfo = 6, /// Пакет с профайлинговой информацией.
Totals = 7, /// Блок данных с тотальными значениями, со сжатием или без.
};
/** NOTE: Если бы в качестве типа агрумента функции был бы Enum, то сравнение packet >= 0 && packet < 7

View File

@ -122,6 +122,7 @@ public:
case Protocol::Server::Data:
case Protocol::Server::Progress:
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
break;
case Protocol::Server::EndOfStream:
@ -180,6 +181,10 @@ protected:
case Protocol::Server::ProfileInfo:
break;
case Protocol::Server::Totals:
totals = packet.block;
break;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}

View File

@ -671,6 +671,10 @@ private:
onProfileInfo(packet.profile_info);
return true;
case Protocol::Server::Totals:
onTotals(packet.block);
return true;
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = packet.exception;
@ -743,13 +747,12 @@ private:
std_out.next();
}
else
{
if (block_std_out)
block_std_out->writeSuffix();
std_out.next();
}
void onTotals(Block & block)
{
block_std_out->setTotals(block);
}
@ -811,6 +814,11 @@ private:
void onEndOfStream()
{
if (block_std_out)
block_std_out->writeSuffix();
std_out.next();
if (is_interactive && !written_first_block)
std::cout << "Ok." << std::endl;
}

View File

@ -288,6 +288,10 @@ Connection::Packet Connection::receivePacket()
res.profile_info = receiveProfileInfo();
return res;
case Protocol::Server::Totals:
res.block = receiveTotals();
return res;
case Protocol::Server::EndOfStream:
return res;
@ -305,6 +309,15 @@ Block Connection::receiveData()
{
//LOG_TRACE(log, "Receiving data (" << getServerAddress() << ")");
initBlockInput();
/// Прочитать из сети один блок
return block_in->read();
}
void Connection::initBlockInput()
{
if (!block_in)
{
if (compression == Protocol::Compression::Enable)
@ -314,9 +327,6 @@ Block Connection::receiveData()
block_in = new NativeBlockInputStream(*maybe_compressed_in, data_type_factory);
}
/// Прочитать из сети один блок
return block_in->read();
}
@ -353,4 +363,11 @@ BlockStreamProfileInfo Connection::receiveProfileInfo()
return profile_info;
}
Block Connection::receiveTotals()
{
/// Блок с тотальными значениями передаётся так же, как обычный блок данных. Разница только в идентификаторе пакета.
return receiveData();
}
}

View File

@ -239,11 +239,14 @@ void TCPHandler::processOrdinaryQuery()
}
}
/// Если закончились данные, то отправим данные профайлинга до
/// Если закончились данные, то отправим данные профайлинга и тотальные значения до
/// последнего нулевого блока, чтобы иметь возможность использовать
/// эту информацию в выводе суффикса output stream'а
if (!block)
{
sendTotals();
sendProfileInfo();
}
sendData(block);
if (!block)
@ -270,6 +273,29 @@ void TCPHandler::sendProfileInfo()
}
void TCPHandler::sendTotals()
{
if (client_revision < DBMS_MIN_REVISION_WITH_TOTALS)
return;
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
{
const Block & totals = input->getTotals();
if (totals)
{
initBlockOutput();
writeVarUInt(Protocol::Server::Totals, *out);
state.block_out->write(input->getTotals());
state.maybe_compressed_out->next();
out->next();
}
}
}
void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in)
{
/// Выведем информацию о том, сколько считано строк и байт.
@ -412,6 +438,22 @@ void TCPHandler::receiveQuery()
bool TCPHandler::receiveData()
{
initBlockInput();
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
Block block = state.block_in->read();
if (block)
{
state.io.out->write(block);
return true;
}
else
return false;
}
void TCPHandler::initBlockInput()
{
if (!state.block_in)
{
@ -427,16 +469,23 @@ bool TCPHandler::receiveData()
query_context.getSettingsRef().max_block_size,
query_context.getDataTypeFactory());
}
/// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а)
Block block = state.block_in->read();
if (block)
{
state.io.out->write(block);
return true;
}
void TCPHandler::initBlockOutput()
{
if (!state.block_out)
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = new CompressedWriteBuffer(*out);
else
return false;
state.maybe_compressed_out = out;
state.block_out = query_context.getFormatFactory().getOutput(
"Native",
*state.maybe_compressed_out,
state.io.in_sample);
}
}
@ -478,18 +527,7 @@ void TCPHandler::sendData(Block & block)
{
Poco::ScopedLock<Poco::FastMutex> lock(send_mutex);
if (!state.block_out)
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = new CompressedWriteBuffer(*out);
else
state.maybe_compressed_out = out;
state.block_out = query_context.getFormatFactory().getOutput(
"Native",
*state.maybe_compressed_out,
state.io.in_sample);
}
initBlockOutput();
writeVarUInt(Protocol::Server::Data, *out);

View File

@ -121,6 +121,11 @@ private:
void sendProgress(size_t rows, size_t bytes);
void sendEndOfStream();
void sendProfileInfo();
void sendTotals();
/// Создаёт state.block_in/block_out для чтения/записи блоков, в зависимости от того, включено ли сжатие.
void initBlockInput();
void initBlockOutput();
bool isQueryCancelled();