dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-07-21 07:02:55 +00:00
parent d76d37f0fe
commit 5ee98f1937
4 changed files with 39 additions and 8 deletions

View File

@ -105,6 +105,7 @@ private:
SharedPtr<Expression> primary_expr;
SortDescription sort_descr;
Block primary_key_sample;
Increment increment;

View File

@ -143,6 +143,7 @@ void TCPHandler::processOrdinaryQuery()
LOG_DEBUG(log, "Query pipeline:\n" << query_pipeline.rdbuf());
}
Stopwatch watch;
while (true)
{
Block block = state.io.in->read();
@ -150,6 +151,35 @@ void TCPHandler::processOrdinaryQuery()
if (!block)
break;
}
watch.stop();
logProfileInfo(watch, *state.io.in);
}
}
void TCPHandler::logProfileInfo(Stopwatch & watch, IBlockInputStream & in)
{
/// Выведем информацию о том, сколько считано строк и байт.
BlockInputStreams leaves = in.getLeaves();
size_t rows = 0;
size_t bytes = 0;
for (BlockInputStreams::const_iterator it = leaves.begin(); it != leaves.end(); ++it)
{
if (const IProfilingBlockInputStream * profiling = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
{
const BlockStreamProfileInfo & info = profiling->getInfo();
rows += info.rows;
bytes += info.bytes;
}
}
if (rows != 0)
{
LOG_INFO(log, std::fixed << std::setprecision(3)
<< "Read " << rows << " rows, " << bytes / 1048576.0 << " MB in " << watch.elapsedSeconds() << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MB/sec.");
}
}

View File

@ -127,6 +127,9 @@ private:
void sendEndOfStream();
bool isQueryCancelled();
/// Вывести информацию о скорости выполнения SELECT запроса.
void logProfileInfo(Stopwatch & watch, IBlockInputStream & in);
};

View File

@ -315,20 +315,16 @@ public:
String index_path = path + "primary.idx";
ReadBufferFromFile index(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
DataTypes primary_column_types;
for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i)
primary_column_types.push_back(storage.getDataTypeByName(storage.sort_descr[i].column_name));
size_t prefix_size = requested_pk_prefix.size();
Row pk(primary_column_types.size());
Row pk(storage.sort_descr.size());
Row pk_prefix(prefix_size);
for (size_t current_mark_number = 0; !index.eof(); ++current_mark_number)
{
/// Читаем очередное значение PK
Row pk(primary_column_types.size());
for (size_t i = 0, size = primary_column_types.size(); i < size; ++i)
primary_column_types[i]->deserializeBinary(pk[i], index);
Row pk(storage.sort_descr.size());
for (size_t i = 0, size = pk.size(); i < size; ++i)
storage.primary_key_sample.getByPosition(i).type->deserializeBinary(pk[i], index);
pk_prefix.assign(pk.begin(), pk.begin() + pk_prefix.size());
@ -484,6 +480,7 @@ StorageMergeTree::StorageMergeTree(
context.columns = *columns;
primary_expr = new Expression(primary_expr_ast, context);
primary_key_sample = primary_expr->getSampleBlock();
loadDataParts();
}