dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-03-05 02:34:20 +00:00
parent 7d9f303599
commit 075f636bb3
10 changed files with 158 additions and 28 deletions

View File

@ -47,6 +47,12 @@ public:
BlockInputStreamPtr clone() { return new AsynchronousBlockInputStream(in); }
~AsynchronousBlockInputStream()
{
if (started)
pool.wait();
}
protected:
BlockInputStreamPtr in;
boost::threadpool::pool pool;

View File

@ -35,6 +35,7 @@ public:
/** Для вывода дерева преобразований потока данных (плана выполнения запроса).
*/
virtual String getName() const = 0;
virtual String getShortName() const; /// То же самое, но без BlockInputStream на конце.
/** Создать копию объекта.
* Предполагается, что функция вызывается только до использования объекта (сразу после создания, до вызова других методов),
@ -43,7 +44,9 @@ public:
virtual BlockInputStreamPtr clone() = 0;
BlockInputStreams & getChildren() { return children; }
void dumpTree(std::ostream & ostr, size_t indent = 0);
void dumpTreeWithProfile(std::ostream & ostr, size_t indent = 0);
protected:
BlockInputStreams children;

View File

@ -14,7 +14,7 @@ class MergeSortingBlockInputStream : public IProfilingBlockInputStream
{
public:
MergeSortingBlockInputStream(BlockInputStreamPtr input_, SortDescription & description_)
: input(input_), description(description_)
: input(input_), description(description_), has_been_read(false)
{
children.push_back(input);
}
@ -29,6 +29,9 @@ private:
BlockInputStreamPtr input;
SortDescription description;
/// Всё было прочитано
bool has_been_read;
void merge(Block & left, Block & right);
};

View File

@ -42,14 +42,17 @@ public:
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
// std::cerr << "Starting initial threads" << std::endl;
if (isEnd())
return res;
// std::cerr << "Starting initial threads" << std::endl;
/// Запустим вычисления для как можно большего количества источников, которые ещё ни разу не брались
for (size_t i = 0; i < threads_data.size(); ++i)
{
if (0 == threads_data[i].count)
{
// std::cerr << "Scheduling " << i << std::endl;
// std::cerr << "Scheduling " << i << std::endl;
++threads_data[i].count;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data[i])/*, i*/));
}
@ -58,28 +61,22 @@ public:
while (1)
{
// std::cerr << "Waiting for one thread to finish" << std::endl;
// std::cerr << "Waiting for one thread to finish" << std::endl;
ready_any.wait();
/* std::cerr << std::endl << "pool.pending: " << pool.pending() << ", pool.active: " << pool.active() << ", pool.size: " << pool.size() << std::endl;
for (size_t i = 0; i < threads_data.size(); ++i)
{
std::cerr << "\t" << "i: " << i << ", count: " << threads_data[i].count << ", ready: " << threads_data[i].ready << ", block: " << !!threads_data[i].block << std::endl;
}*/
{
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
// std::cerr << "Checking end" << std::endl;
/// Если все блоки готовы и пустые
size_t i = 0;
for (; i < threads_data.size(); ++i)
if (!threads_data[i].ready || threads_data[i].block)
break;
if (i == threads_data.size())
/* std::cerr << std::endl << "pool.pending: " << pool.pending() << ", pool.active: " << pool.active() << ", pool.size: " << pool.size() << std::endl;
for (size_t i = 0; i < threads_data.size(); ++i)
{
std::cerr << "\t" << "i: " << i << ", count: " << threads_data[i].count << ", ready: " << threads_data[i].ready << ", block: " << !!threads_data[i].block << std::endl;
}
*/
if (isEnd())
return res;
// std::cerr << "Searching for first ready block" << std::endl;
// std::cerr << "Searching for first ready block" << std::endl;
/** Найдём и вернём готовый непустой блок, если такой есть.
* При чём, выберем блок из источника, из которого было получено меньше всего блоков.
@ -100,15 +97,18 @@ public:
}
if (argmin_i == -1)
{
// std::cerr << "Continue" << std::endl;
continue;
}
// std::cerr << "Returning found block " << argmin_i << std::endl;
// std::cerr << "Returning found block " << argmin_i << std::endl;
res = threads_data[argmin_i].block;
/// Запустим получение следующего блока
threads_data[argmin_i].reset();
// std::cerr << "Scheduling " << argmin_i << std::endl;
// std::cerr << "Scheduling " << argmin_i << std::endl;
++threads_data[argmin_i].count;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data[argmin_i])/*, argmin_i*/));
@ -161,8 +161,11 @@ private:
{
try
{
// std::cerr << "\033[1;37m" << "Calculating " << i << "\033[0m" << std::endl;
// sleep(i);
/* {
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << "\033[1;37m" << "Calculating " << i << "\033[0m" << std::endl;
}
sleep(i);*/
Block block = data.in->read();
{
@ -172,7 +175,11 @@ private:
}
ready_any.set();
// std::cerr << "\033[1;37m" << "Done " << i << "\033[0m" << std::endl;
/* {
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << "\033[1;37m" << "Done " << i << "\033[0m" << std::endl;
}*/
}
catch (const Exception & e)
{
@ -180,7 +187,7 @@ private:
}
catch (const Poco::Exception & e)
{
std::cerr << e.message() << std::endl;
//std::cerr << e.message() << std::endl;
data.exception = new Exception(e.message(), ErrorCodes::POCO_EXCEPTION);
}
catch (const std::exception & e)
@ -192,6 +199,20 @@ private:
data.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
}
}
/// Проверить, что во всех потоках были получены все блоки
bool isEnd()
{
// std::cerr << "Checking end" << std::endl;
/// Если все блоки готовы и пустые
size_t i = 0;
for (; i < threads_data.size(); ++i)
if (!threads_data[i].ready || threads_data[i].block)
break;
return i == threads_data.size();
}
};
}

View File

@ -7,12 +7,22 @@
namespace DB
{
void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent)
{
ostr << indent + 1 << ". " << getName() << "." << std::endl;
ostr << String(indent, ' ') << getShortName() << std::endl;
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->dumpTree(ostr, indent + 1);
}
void IBlockInputStream::dumpTreeWithProfile(std::ostream & ostr, size_t indent)
{
ostr << indent + 1 << ". " << getShortName() << "." << std::endl;
/// Для красоты
size_t width = log10(indent + 1) + 4 + getName().size();
size_t width = log10(indent + 1) + 4 + getShortName().size();
for (size_t i = 0; i < width; ++i)
ostr << "";
ostr << std::endl;
@ -31,5 +41,14 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent)
(*it)->dumpTree(ostr, indent + 1);
}
String IBlockInputStream::getShortName() const
{
String res = getName();
if (0 == strcmp(res.c_str() + res.size() - strlen("BlockInputStream"), "BlockInputStream"))
res = res.substr(0, res.size() - strlen("BlockInputStream"));
return res;
}
}

View File

@ -10,6 +10,11 @@ Block MergeSortingBlockInputStream::readImpl()
* - прочитать в оперативку все блоки;
* - объединять по два соседних блока;
*/
if (has_been_read)
return Block();
has_been_read = true;
typedef std::list<Block> Blocks;
Blocks blocks;

View File

@ -0,0 +1,69 @@
#include <iostream>
#include <iomanip>
#include <Poco/SharedPtr.h>
#include <Poco/Stopwatch.h>
#include <Poco/NumberParser.h>
#include <DB/IO/WriteBufferFromFileDescriptor.h>
#include <DB/Storages/StorageSystemNumbers.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/FormatFactory.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Interpreters/Context.h>
#include <DB/Interpreters/loadMetadata.h>
using Poco::SharedPtr;
int main(int argc, char ** argv)
{
try
{
DB::Context context;
context.path = "./";
context.aggregate_function_factory = new DB::AggregateFunctionFactory;
context.data_type_factory = new DB::DataTypeFactory;
context.storage_factory = new DB::StorageFactory;
DB::loadMetadata(context);
DB::Names column_names;
column_names.push_back("WatchID");
DB::StoragePtr table = (*context.databases)["default"]["hits6"];
DB::BlockInputStreams streams = table->read(column_names, NULL, context.settings.max_block_size, context.settings.max_threads);
for (size_t i = 0, size = streams.size(); i < size; ++i)
streams[i] = new DB::AsynchronousBlockInputStream(streams[i]);
DB::BlockInputStreamPtr stream = new DB::UnionBlockInputStream(streams, context.settings.max_threads);
stream = new DB::LimitBlockInputStream(stream, 10);
DB::FormatFactory format_factory;
DB::WriteBufferFromFileDescriptor wb(STDERR_FILENO);
DB::Block sample = table->getSampleBlock();
DB::BlockOutputStreamPtr out = format_factory.getOutput("TabSeparated", wb, sample);
DB::copyData(*stream, *out);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString();
return 1;
}
return 0;
}

View File

@ -164,7 +164,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
/// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос?
if (!query.table || !dynamic_cast<ASTSelectQuery *>(&*query.table))
streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads);
streams = table->read(required_columns, query_ptr, block_size, context.settings.max_threads);
else
streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), context.settings.asynchronous));

View File

@ -198,8 +198,11 @@ int main(int argc, char ** argv)
if (query_plan)
{
std::cerr << std::endl;
query_plan->dumpTreeWithProfile(std::cerr);
std::cerr << std::endl;
query_plan->dumpTree(std::cerr);
std::cerr << std::endl;
}
}
catch (const DB::Exception & e)

View File

@ -45,7 +45,8 @@ Block LogBlockInputStream::readImpl()
res.insert(column);
}
rows_read += res.getByPosition(0).column->size();
if (res)
rows_read += res.getByPosition(0).column->size();
return res;
}