dbms: fixing error [#METR-19767].

This commit is contained in:
Alexey Milovidov 2016-01-26 00:40:13 +03:00
parent 525f961110
commit 869a521aed
4 changed files with 58 additions and 14 deletions

View File

@ -42,10 +42,13 @@ struct BlockStreamProfileInfo
void update(Block & block);
/// Методы для бинарной [де]сериализации
/// Методы для бинарной [де]сериализации. Передаются не все поля.
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
/// Установить поля из другого объекта, но только те, которые передаются по сети (методами выше).
void setFrom(const BlockStreamProfileInfo & rhs);
private:
void calculateRowsBeforeLimit() const;

View File

@ -31,6 +31,17 @@ void BlockStreamProfileInfo::write(WriteBuffer & out) const
}
void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs)
{
rows = rhs.rows;
blocks = rhs.blocks;
bytes = rhs.bytes;
applied_limit = rhs.applied_limit;
rows_before_limit = rhs.rows_before_limit;
calculated_rows_before_limit = rhs.calculated_rows_before_limit;
}
size_t BlockStreamProfileInfo::getRowsBeforeLimit() const
{
if (!calculated_rows_before_limit)
@ -75,9 +86,9 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
/// есть ли Limit?
BlockStreamProfileInfos limits;
collectInfosForStreamsWithName("Limit", limits);
if (limits.empty())
return;
if (!limits.empty())
{
applied_limit = true;
/** Берём количество строчек, прочитанных ниже PartialSorting-а, если есть, или ниже Limit-а.
@ -92,5 +103,31 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
for (const auto & nested_info : info_limit_or_sort->nested_infos)
rows_before_limit += nested_info->rows;
}
else
{
/// Тогда данные о rows_before_limit могут быть в RemoteBlockInputStream-е (приехать с удалённого сервера).
std::cerr << "Has no Limit\n";
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
if (remotes.empty())
return;
std::cerr << "Found Remote\n";
for (const auto & info : remotes)
{
if (info->applied_limit)
{
std::cerr << "Has info->applied_limit\n";
applied_limit = true;
rows_before_limit += info->rows_before_limit;
}
}
}
}
}

View File

@ -175,7 +175,8 @@ Block RemoteBlockInputStream::readImpl()
break;
case Protocol::Server::ProfileInfo:
info = packet.profile_info;
info.setFrom(packet.profile_info);
std::cerr << "received, has_applied_limit: " << info.hasAppliedLimit() << ", rows_before_limit: " << info.getRowsBeforeLimit() << "\n";
break;
case Protocol::Server::Totals:

View File

@ -372,6 +372,9 @@ void TCPHandler::sendProfileInfo()
if (const IProfilingBlockInputStream * input = dynamic_cast<const IProfilingBlockInputStream *>(&*state.io.in))
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
std::cerr << "Sending profile info, rows_before_limit: " << input->getInfo().getRowsBeforeLimit() << "\n";
input->getInfo().write(*out);
out->next();
}