dbms: improved performance on small blocks [#METR-2944].

This commit is contained in:
Alexey Milovidov 2014-08-22 06:31:54 +04:00
parent aae80fa8c5
commit 5523bdeaca
8 changed files with 55 additions and 68 deletions

View File

@ -1,7 +1,5 @@
#pragma once
#include <statdaemons/Stopwatch.h>
#include <Yandex/logger_useful.h>
#include <DB/Parsers/ASTJoin.h>

View File

@ -2,8 +2,6 @@
#include <set>
#include <statdaemons/Stopwatch.h>
#include <Yandex/logger_useful.h>
#include <DB/Core/ColumnNumbers.h>

View File

@ -111,6 +111,8 @@ void CreatingSetsBlockInputStream::create(SubqueryForSet & subquery)
size_t rows = 0;
size_t bytes = 0;
watch.stop();
subquery.source->getLeafRowsBytes(rows, bytes);
size_t head_rows = 0;

View File

@ -18,7 +18,7 @@ void BlockStreamProfileInfo::read(ReadBuffer & in)
readVarUInt(bytes, in);
readBinary(applied_limit, in);
readVarUInt(rows_before_limit, in);
readBinary(calculated_rows_before_limit, in);
readBinary(calculated_rows_before_limit, in);
}
@ -46,7 +46,7 @@ bool BlockStreamProfileInfo::hasAppliedLimit() const
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
return applied_limit;
}
}
void BlockStreamProfileInfo::update(Block & block)
@ -76,7 +76,7 @@ void BlockStreamProfileInfo::collectInfosForStreamsWithName(const String & name,
void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
{
calculated_rows_before_limit = true;
/// есть ли Limit?
BlockStreamProfileInfos limits;
collectInfosForStreamsWithName("Limit", limits);
@ -105,11 +105,11 @@ void BlockStreamProfileInfo::print(std::ostream & ostr) const
UInt64 nested_elapsed = 0;
double elapsed_seconds = work_stopwatch.elapsedSeconds();
double nested_elapsed_seconds = 0;
UInt64 nested_rows = 0;
UInt64 nested_blocks = 0;
UInt64 nested_bytes = 0;
if (!nested_infos.empty())
{
for (BlockStreamProfileInfos::const_iterator it = nested_infos.begin(); it != nested_infos.end(); ++it)
@ -119,13 +119,13 @@ void BlockStreamProfileInfo::print(std::ostream & ostr) const
nested_elapsed = (*it)->work_stopwatch.elapsed();
nested_elapsed_seconds = (*it)->work_stopwatch.elapsedSeconds();
}
nested_rows += (*it)->rows;
nested_blocks += (*it)->blocks;
nested_bytes += (*it)->bytes;
}
}
ostr << std::fixed << std::setprecision(2)
<< "Columns: " << column_names << std::endl
<< "Elapsed: " << elapsed_seconds << " sec. "
@ -134,7 +134,7 @@ void BlockStreamProfileInfo::print(std::ostream & ostr) const
if (!nested_infos.empty())
{
double self_percents = (elapsed - nested_elapsed) * 100.0 / total_stopwatch.elapsed();
ostr<< "Elapsed (self): " << (elapsed_seconds - nested_elapsed_seconds) << " sec. "
<< "(" << (self_percents >= 50 ? "\033[1;31m" : (self_percents >= 10 ? "\033[1;33m" : "")) /// Раскраска больших значений
<< self_percents << "%"
@ -148,7 +148,7 @@ void BlockStreamProfileInfo::print(std::ostream & ostr) const
ostr << "Rows per second (in, self): " << (nested_rows / (elapsed_seconds - nested_elapsed_seconds))
<< ", " << (elapsed - nested_elapsed) / nested_rows << " ns/row, " << std::endl;
}
ostr << "Rows (out): " << rows << ", per second: " << rows / elapsed_seconds << ", " << std::endl
<< "Blocks (out): " << blocks << ", per second: " << blocks / elapsed_seconds << ", " << std::endl
<< " " << bytes / 1000000.0 << " MB (memory), " << bytes * 1000 / elapsed << " MB/s (memory), " << std::endl
@ -166,7 +166,7 @@ Block IProfilingBlockInputStream::read()
for (BlockInputStreams::const_iterator it = children.begin(); it != children.end(); ++it)
if (const IProfilingBlockInputStream * child = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
info.nested_infos.push_back(&child->info);
info.started = true;
}
@ -194,7 +194,7 @@ Block IProfilingBlockInputStream::read()
std::cerr << ", ";
std::cerr << res.getByPosition(i).name << " (" << res.getByPosition(i).column->size() << ")";
}
std::cerr << std::endl;
}*/
@ -269,7 +269,7 @@ void IProfilingBlockInputStream::updateExtremes(Block & block)
for (size_t i = 0; i < columns; ++i)
{
ColumnPtr & column = extremes.getByPosition(i).column;
Field min_value = (*column)[0];
Field max_value = (*column)[1];
@ -330,9 +330,6 @@ bool IProfilingBlockInputStream::checkLimits()
void IProfilingBlockInputStream::checkQuota(Block & block)
{
time_t current_time = time(0);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
switch (limits.mode)
{
case LIMITS_TOTAL:
@ -340,15 +337,20 @@ void IProfilingBlockInputStream::checkQuota(Block & block)
break;
case LIMITS_CURRENT:
{
time_t current_time = time(0);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
quota->checkAndAddResultRowsBytes(current_time, block.rows(), block.bytes());
quota->checkAndAddExecutionTime(current_time, Poco::Timespan((total_elapsed - prev_elapsed) * 1000000.0));
prev_elapsed = total_elapsed;
break;
}
default:
throw Exception("Logical error: unknown limits mode.", ErrorCodes::LOGICAL_ERROR);
}
prev_elapsed = total_elapsed;
}
@ -366,10 +368,9 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
cancel();
/// Общее количество данных, обработанных во всех листовых источниках, возможно, на удалённых серверах.
size_t total_rows = process_list_elem->rows_processed;
size_t total_bytes = process_list_elem->bytes_processed;
double total_elapsed = info.total_stopwatch.elapsedSeconds();
/** Проверяем ограничения на объём данных для чтения, скорость выполнения запроса, квоту на объём данных для чтения.
* NOTE: Может быть, имеет смысл сделать, чтобы они проверялись прямо в ProcessList?
@ -389,13 +390,17 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
throw Exception("Logical error: unknown overflow mode", ErrorCodes::LOGICAL_ERROR);
}
if (limits.min_execution_speed
&& total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& total_rows / total_elapsed < limits.min_execution_speed)
if (limits.min_execution_speed)
{
throw Exception("Query is executing too slow: " + toString(total_rows / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
double total_elapsed = info.total_stopwatch.elapsedSeconds();
if (total_elapsed > limits.timeout_before_checking_execution_speed.totalMicroseconds() / 1000000.0
&& total_rows / total_elapsed < limits.min_execution_speed)
{
throw Exception("Query is executing too slow: " + toString(total_rows / total_elapsed)
+ " rows/sec., minimum: " + toString(limits.min_execution_speed),
ErrorCodes::TOO_SLOW);
}
}
if (quota != nullptr && limits.mode == LIMITS_TOTAL)
@ -405,7 +410,7 @@ void IProfilingBlockInputStream::progressImpl(size_t rows, size_t bytes)
}
}
}
const BlockStreamProfileInfo & IProfilingBlockInputStream::getInfo() const
{
@ -427,7 +432,7 @@ void IProfilingBlockInputStream::cancel()
void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback)
{
progress_callback = callback;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&**it))
child->setProgressCallback(callback);

View File

@ -18,7 +18,7 @@ Block MergeSortingBlockInputStream::readImpl()
if (has_been_read)
return Block();
has_been_read = true;
Blocks blocks;
@ -38,15 +38,15 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
if (blocks.size() == 1)
return blocks[0];
Stopwatch watch;
LOG_DEBUG(log, "Merge sorting");
CursorImpls cursors(blocks.size());
bool has_collation = false;
size_t i = 0;
for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it, ++i)
{
@ -56,20 +56,22 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
cursors[i] = SortCursorImpl(*it, description);
has_collation |= cursors[i].has_collation;
}
Block merged;
if (has_collation)
merged = mergeImpl<SortCursorWithCollation>(blocks, cursors);
else
merged = mergeImpl<SortCursor>(blocks, cursors);
watch.stop();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows"
<< " in " << watch.elapsedSeconds() << " sec., "
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., "
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec.");
return merged;
}
@ -78,13 +80,13 @@ Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cur
{
Block merged = blocks[0].cloneEmpty();
size_t num_columns = blocks[0].columns();
typedef std::priority_queue<TSortCursor> Queue;
Queue queue;
for (size_t i = 0; i < cursors.size(); ++i)
queue.push(TSortCursor(&cursors[i]));
ColumnPlainPtrs merged_columns;
for (size_t i = 0; i < num_columns; ++i) /// TODO: reserve
merged_columns.push_back(&*merged.getByPosition(i).column);

View File

@ -1,7 +1,5 @@
#include <iomanip>
#include <statdaemons/Stopwatch.h>
#include <DB/Columns/ColumnString.h>
#include <DB/Columns/ColumnFixedString.h>
@ -14,8 +12,6 @@ namespace DB
void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedDataVariants & results)
{
//Stopwatch watch;
/// Читаем все данные
while (Block block = stream->read())
{
@ -90,10 +86,6 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
/// Параллельно вычисляем хэши и ключи.
// LOG_TRACE(log, "Calculating keys and hashes.");
// watch.start();
for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::calculateHashesThread, this,
boost::ref(block),
@ -106,13 +98,8 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
rethrowFirstException(exceptions);
// LOG_TRACE(log, "Calculated keys and hashes in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
// watch.restart();
/// Параллельно агрегируем в независимые хэш-таблицы
// LOG_TRACE(log, "Parallel aggregating.");
for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::aggregateThread, this,
boost::ref(block),
@ -125,8 +112,6 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
rethrowFirstException(exceptions);
// LOG_TRACE(log, "Parallel aggregated in " << std::fixed << std::setprecision(2) << watch.elapsedSeconds() << " sec.");
/// Проверка ограничений
if (max_rows_to_group_by && size_of_all_results > max_rows_to_group_by && group_by_overflow_mode == OverflowMode::BREAK)

View File

@ -370,9 +370,6 @@ int Server::main(const std::vector<std::string> & args)
global_context->setInterserverIOHost(this_host, port);
}
if (config().has("replica_name"))
global_context->setDefaultReplicaName(config().getString("replica_name"));
if (config().has("macros"))
global_context->setMacros(Macros(config(), "macros"));

View File

@ -42,7 +42,7 @@ void TCPHandler::runImpl()
socket().setReceiveTimeout(global_settings.receive_timeout);
socket().setSendTimeout(global_settings.send_timeout);
socket().setNoDelay(true);
in = new ReadBufferFromPocoSocket(socket());
out = new WriteBufferFromPocoSocket(socket());
@ -82,7 +82,7 @@ void TCPHandler::runImpl()
connection_context.setCurrentDatabase(default_database);
}
sendHello();
connection_context.setProgressCallback([this] (const size_t rows, const size_t bytes) {
@ -98,7 +98,7 @@ void TCPHandler::runImpl()
/// Если требуется завершить работу, или клиент отсоединился.
if (Daemon::instance().isCancelled() || in->eof())
break;
Stopwatch watch;
state.reset();
@ -106,7 +106,7 @@ void TCPHandler::runImpl()
* Клиент сможет его принять, если оно не произошло во время отправки другого пакета и клиент ещё не разорвал соединение.
*/
SharedPtr<Exception> exception;
try
{
/// Восстанавливаем контекст запроса.
@ -271,7 +271,7 @@ void TCPHandler::processOrdinaryQuery()
while (true)
{
Block block;
while (true)
{
if (isQueryCancelled())
@ -288,7 +288,7 @@ void TCPHandler::processOrdinaryQuery()
after_send_progress.restart();
sendProgress();
}
if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000))
{
/// Есть следующий блок результата.
@ -308,8 +308,8 @@ void TCPHandler::processOrdinaryQuery()
sendProfileInfo();
sendProgress();
}
sendData(block);
sendData(block);
if (!block)
break;
}