dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-06-24 23:17:06 +00:00
parent e6cc5e7b56
commit f8800fdc21
5 changed files with 85 additions and 82 deletions

View File

@ -24,101 +24,99 @@ public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))), : max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
pool(max_threads), pool(max_threads),
threads_data(inputs_.size()),
ready_any(0, inputs_.size()) ready_any(0, inputs_.size())
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
for (size_t i = 0; i < inputs_.size(); ++i) for (size_t i = 0; i < inputs_.size(); ++i)
threads_data[i].in = inputs_[i]; {
threads_data.push_back(ThreadData());
threads_data.back().in = inputs_[i];
threads_data.back().i = i;
}
} }
Block readImpl() Block readImpl()
{ {
Block res; Block res;
// time_t current_time = time(0);
// std::cerr << std::endl << ctime(&current_time) << std::endl;
while (1) while (1)
{ {
{ {
Poco::ScopedLock<Poco::FastMutex> lock(mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
if (isEnd()) if (threads_data.empty())
return res; return res;
// std::cerr << "Starting initial threads" << std::endl; if (pool.pending() + pool.active() < pool.size())
/// Запустим вычисления для как можно большего количества источников, которые ещё ни разу не брались
size_t started_threads = 0;
for (size_t i = 0; i < threads_data.size(); ++i)
{ {
if (0 == threads_data[i].count) std::cerr << "Starting initial threads" << std::endl;
{
if (pool.pending() + pool.active() >= pool.size())
break;
// std::cerr << "Scheduling " << i << std::endl;
++threads_data[i].count;
++started_threads;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data[i])/*, i*/));
if (started_threads == max_threads) /// Запустим вычисления для как можно большего количества источников, которые ещё ни разу не брались
size_t started_threads = 0;
size_t max_threads_to_start = pool.size() - pool.pending() + pool.active();
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end() && 0 == it->count; ++it)
{
std::cerr << "Scheduling initial " << it->i << std::endl;
++it->count;
++started_threads;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(*it)));
if (started_threads == max_threads_to_start)
break; break;
} }
} }
} }
// std::cerr << "Waiting for one thread to finish" << std::endl; std::cerr << "Waiting for one thread to finish" << std::endl;
ready_any.wait(); ready_any.wait();
{ {
Poco::ScopedLock<Poco::FastMutex> lock(mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/* std::cerr << std::endl << "pool.pending: " << pool.pending() << ", pool.active: " << pool.active() << ", pool.size: " << pool.size() << std::endl; std::cerr << std::endl << "pool.pending: " << pool.pending() << ", pool.active: " << pool.active() << ", pool.size: " << pool.size() << std::endl;
for (size_t i = 0; i < threads_data.size(); ++i)
{ if (threads_data.empty())
std::cerr << "\t" << "i: " << i << ", count: " << threads_data[i].count << ", ready: " << threads_data[i].ready << ", block: " << !!threads_data[i].block << std::endl;
}
*/
if (isEnd())
return res; return res;
// std::cerr << "Searching for first ready block" << std::endl; std::cerr << "Searching for first ready block" << std::endl;
/** Найдём и вернём готовый непустой блок, если такой есть. /** Найдём и вернём готовый непустой блок, если такой есть.
* При чём, выберем блок из источника, из которого было получено меньше всего блоков. * При чём, выберем блок из источника, из которого было получено меньше всего блоков.
*/ */
unsigned min_count = 0; ThreadsData::iterator it = threads_data.begin();
ssize_t argmin_i = -1; while (it != threads_data.end())
for (size_t i = 0; i < threads_data.size(); ++i)
{ {
if (threads_data[i].exception) if (it->exception)
threads_data[i].exception->rethrow(); it->exception->rethrow();
if (threads_data[i].ready && threads_data[i].block if (it->ready)
&& (argmin_i == -1 || threads_data[i].count < min_count))
{ {
min_count = threads_data[i].count; if (!it->block)
argmin_i = i; threads_data.erase(it++);
else
break;
} }
else
++it;
} }
if (argmin_i == -1) if (it == threads_data.end())
{ {
// std::cerr << "Continue" << std::endl; std::cerr << "Continue" << std::endl;
continue; continue;
} }
// std::cerr << "Returning found block " << argmin_i << std::endl; std::cerr << "Found block " << it->i << std::endl;
res = threads_data[argmin_i].block; res = it->block;
/// Запустим получение следующего блока /// Запустим получение следующего блока
threads_data[argmin_i].reset(); it->reset();
// std::cerr << "Scheduling " << argmin_i << std::endl; std::cerr << "Scheduling again " << it->i << std::endl;
++threads_data[argmin_i].count; ++it->count;
pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(threads_data[argmin_i])/*, argmin_i*/)); pool.schedule(boost::bind(&UnionBlockInputStream::calculate, this, boost::ref(*it)));
return res; return res;
} }
@ -148,6 +146,7 @@ private:
bool ready; /// Блок уже вычислен bool ready; /// Блок уже вычислен
Block block; Block block;
ExceptionPtr exception; ExceptionPtr exception;
size_t i; /// Порядковый номер источника.
void reset() void reset()
{ {
@ -156,25 +155,21 @@ private:
exception = NULL; exception = NULL;
} }
ThreadData() : count(0), ready(false) {} ThreadData() : count(0), ready(false), i(0) {}
}; };
typedef std::vector<ThreadData> ThreadsData; /// Список упорядочен по количеству полученных из источника блоков.
typedef std::list<ThreadData> ThreadsData;
ThreadsData threads_data; ThreadsData threads_data;
Poco::FastMutex mutex; Poco::FastMutex mutex;
Poco::Semaphore ready_any; Poco::Semaphore ready_any;
/// Вычисления, которые выполняться в отдельном потоке /// Вычисления, которые выполняться в отдельном потоке
void calculate(ThreadData & data/*, int i*/) void calculate(ThreadData & data)
{ {
try try
{ {
/* {
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << "\033[1;37m" << "Calculating " << i << "\033[0m" << std::endl;
}
sleep(i);*/
Block block = data.in->read(); Block block = data.in->read();
{ {
@ -184,11 +179,6 @@ private:
} }
ready_any.set(); ready_any.set();
/* {
Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << "\033[1;37m" << "Done " << i << "\033[0m" << std::endl;
}*/
} }
catch (const Exception & e) catch (const Exception & e)
{ {
@ -196,7 +186,6 @@ private:
} }
catch (const Poco::Exception & e) catch (const Poco::Exception & e)
{ {
//std::cerr << e.message() << std::endl;
data.exception = e.clone(); data.exception = e.clone();
} }
catch (const std::exception & e) catch (const std::exception & e)
@ -208,22 +197,6 @@ private:
data.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION); data.exception = new Exception("Unknown exception", ErrorCodes::UNKNOWN_EXCEPTION);
} }
} }
/** Проверить, что во всех потоках были получены все блоки.
* Объект может быть уничтожен и раньше. Например, если он находится внутри LIMIT-а.
*/
bool isEnd()
{
// std::cerr << "Checking end" << std::endl;
/// Если все блоки готовы и пустые
size_t i = 0;
for (; i < threads_data.size(); ++i)
if (!threads_data[i].ready || threads_data[i].block)
break;
return i == threads_data.size();
}
}; };
} }

View File

@ -68,6 +68,7 @@ private:
void executeHaving( BlockInputStreams & streams, ExpressionPtr & expression); void executeHaving( BlockInputStreams & streams, ExpressionPtr & expression);
void executeOuterExpression( BlockInputStreams & streams, ExpressionPtr & expression); void executeOuterExpression( BlockInputStreams & streams, ExpressionPtr & expression);
void executeOrder( BlockInputStreams & streams, ExpressionPtr & expression); void executeOrder( BlockInputStreams & streams, ExpressionPtr & expression);
void executePreLimit( BlockInputStreams & streams, ExpressionPtr & expression);
void executeUnion( BlockInputStreams & streams, ExpressionPtr & expression); void executeUnion( BlockInputStreams & streams, ExpressionPtr & expression);
void executeLimit( BlockInputStreams & streams, ExpressionPtr & expression); void executeLimit( BlockInputStreams & streams, ExpressionPtr & expression);

View File

@ -107,10 +107,14 @@ bool Connection::ping()
readVarUInt(pong, *in); readVarUInt(pong, *in);
/// Можем получить запоздалые пакеты прогресса. /// Можем получить запоздалые пакеты прогресса. TODO: может быть, это можно исправить.
while (pong == Protocol::Server::Progress) while (pong == Protocol::Server::Progress)
{ {
receiveProgress(); receiveProgress();
if (in->eof())
return false;
readVarUInt(pong, *in); readVarUInt(pong, *in);
} }

View File

@ -96,7 +96,7 @@ Block IProfilingBlockInputStream::read()
Poco::ScopedLock<Poco::FastMutex> lock(mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
std::cerr << std::endl; std::cerr << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << getName() << std::endl; std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t" << getShortName() << std::endl;
std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t"; std::cerr << "[ " << Poco::ThreadNumber::get() << " ]\t";
for (size_t i = 0; i < res.columns(); ++i) for (size_t i = 0; i < res.columns(); ++i)

View File

@ -156,6 +156,13 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
executeHaving(streams, expression); executeHaving(streams, expression);
executeOuterExpression(streams, expression); executeOuterExpression(streams, expression);
executeOrder(streams, expression); executeOrder(streams, expression);
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
if (query.limit_length && streams.size() > 1)
executePreLimit(streams, expression);
executeUnion(streams, expression); executeUnion(streams, expression);
executeLimit(streams, expression); executeLimit(streams, expression);
} }
@ -397,17 +404,35 @@ void InterpreterSelectQuery::executeUnion(BlockInputStreams & streams, Expressio
} }
/// Предварительный LIMIT - применяется в каждом источнике, если источников несколько, до их объединения.
void InterpreterSelectQuery::executePreLimit(BlockInputStreams & streams, ExpressionPtr & expression)
{
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
/// Если есть LIMIT
if (query.limit_length)
{
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = new LimitBlockInputStream(stream, limit_length + limit_offset, 0);
}
}
}
void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams, ExpressionPtr & expression) void InterpreterSelectQuery::executeLimit(BlockInputStreams & streams, ExpressionPtr & expression)
{ {
size_t limit_length = 0; size_t limit_length = 0;
size_t limit_offset = 0; size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset); getLimitLengthAndOffset(query, limit_length, limit_offset);
BlockInputStreamPtr & stream = streams[0];
/// Если есть LIMIT /// Если есть LIMIT
if (query.limit_length) if (query.limit_length)
{ {
BlockInputStreamPtr & stream = streams[0];
stream = new LimitBlockInputStream(stream, limit_length, limit_offset); stream = new LimitBlockInputStream(stream, limit_length, limit_offset);
} }
} }