This commit is contained in:
Michael Kolupaev 2014-07-17 12:29:10 +04:00
commit 29e85b5fbc
8 changed files with 132 additions and 155 deletions

View File

@ -72,7 +72,7 @@ protected:
} }
/// Функция для отладочного вывода информации /// Функция для отладочного вывода информации
virtual void write() void write()
{ {
std::cerr << "file " << file << std::endl; std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl; std::cerr << "name " << name << std::endl;

View File

@ -2,8 +2,8 @@
#include <list> #include <list>
#include <queue> #include <queue>
#include <atomic>
#include <Poco/Thread.h> #include <thread>
#include <Yandex/logger_useful.h> #include <Yandex/logger_useful.h>
@ -33,21 +33,15 @@ using Poco::SharedPtr;
*/ */
class UnionBlockInputStream : public IProfilingBlockInputStream class UnionBlockInputStream : public IProfilingBlockInputStream
{ {
class Thread;
public: public:
UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1)
: max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))), : max_threads(std::min(inputs_.size(), static_cast<size_t>(max_threads_))),
output_queue(max_threads), exhausted_inputs(0), finish(false), output_queue(max_threads)
pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream"))
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
for (size_t i = 0; i < inputs_.size(); ++i) for (size_t i = 0; i < inputs_.size(); ++i)
{ input_queue.emplace(inputs_[i], i);
input_queue.push(InputData());
input_queue.back().in = inputs_[i];
input_queue.back().i = i;
}
} }
String getName() const { return "UnionBlockInputStream"; } String getName() const { return "UnionBlockInputStream"; }
@ -116,7 +110,7 @@ public:
protected: protected:
void finalize() void finalize()
{ {
if (threads_data.empty()) if (threads.empty())
return; return;
LOG_TRACE(log, "Waiting for threads to finish"); LOG_TRACE(log, "Waiting for threads to finish");
@ -124,14 +118,11 @@ protected:
/// Вынем всё, что есть в очереди готовых данных. /// Вынем всё, что есть в очереди готовых данных.
output_queue.clear(); output_queue.clear();
/** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. /// В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится.
* PS. Может быть, для переменной finish нужен барьер? for (auto & thread : threads)
*/ thread.join();
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) threads.clear();
it->thread->join();
threads_data.clear();
LOG_TRACE(log, "Waited for threads to finish"); LOG_TRACE(log, "Waited for threads to finish");
} }
@ -143,15 +134,11 @@ protected:
return res.block; return res.block;
/// Запускаем потоки, если это ещё не было сделано. /// Запускаем потоки, если это ещё не было сделано.
if (threads_data.empty()) if (threads.empty())
{ {
threads_data.resize(max_threads); threads.reserve(max_threads);
for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) for (size_t i = 0; i < max_threads; ++i)
{ threads.emplace_back([=] { thread(current_memory_tracker); });
it->runnable = new Thread(*this, current_memory_tracker);
it->thread = new Poco::Thread;
it->thread->start(*it->runnable);
}
} }
/// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение. /// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение.
@ -173,7 +160,8 @@ protected:
/// Может быть, в очереди есть ещё эксепшен. /// Может быть, в очереди есть ещё эксепшен.
OutputData res; OutputData res;
while (output_queue.tryPop(res) && res.exception) while (output_queue.tryPop(res))
if (res.exception)
res.exception->rethrow(); res.exception->rethrow();
finalize(); finalize();
@ -188,26 +176,13 @@ private:
{ {
BlockInputStreamPtr in; BlockInputStreamPtr in;
size_t i; /// Порядковый номер источника (для отладки). size_t i; /// Порядковый номер источника (для отладки).
InputData() {}
InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {}
}; };
/// Данные отдельного потока void thread(MemoryTracker * memory_tracker)
struct ThreadData
{
SharedPtr<Poco::Thread> thread;
SharedPtr<Thread> runnable;
};
class Thread : public Poco::Runnable
{
public:
Thread(UnionBlockInputStream & parent_, MemoryTracker * memory_tracker_)
: parent(parent_), memory_tracker(memory_tracker_)
{
}
void run()
{ {
current_memory_tracker = memory_tracker; current_memory_tracker = memory_tracker;
ExceptionPtr exception; ExceptionPtr exception;
@ -223,9 +198,12 @@ private:
if (exception) if (exception)
{ {
/// Отдаём эксепшен в основной поток.
output_queue.push(exception);
try try
{ {
parent.cancel(); cancel();
} }
catch (...) catch (...)
{ {
@ -234,86 +212,72 @@ private:
* - то пофиг. * - то пофиг.
*/ */
} }
/// Отдаём эксепшен в основной поток.
parent.output_queue.push(exception);
} }
} }
void loop() void loop()
{ {
while (!parent.finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут. while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут.
{ {
InputData input; InputData input;
/// Выбираем следующий источник. /// Выбираем следующий источник.
{ {
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.) /// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.)
if (parent.input_queue.empty()) if (input_queue.empty())
break; break;
input = parent.input_queue.front(); input = input_queue.front();
/// Убираем источник из очереди доступных источников. /// Убираем источник из очереди доступных источников.
parent.input_queue.pop(); input_queue.pop();
} }
/// Основная работа. /// Основная работа.
Block block = input.in->read(); Block block = input.in->read();
{ {
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex); Poco::ScopedLock<Poco::FastMutex> lock(mutex);
/// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых. /// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых.
if (block) if (block)
{ {
parent.input_queue.push(input); input_queue.push(input);
if (parent.finish) if (finish)
break; break;
parent.output_queue.push(block); output_queue.push(block);
} }
else else
{ {
++parent.exhausted_inputs; ++exhausted_inputs;
/// Если все источники иссякли. /// Если все источники иссякли.
if (parent.exhausted_inputs == parent.children.size()) if (exhausted_inputs == children.size())
{ {
parent.finish = true; finish = true;
break; break;
} }
} }
} }
} }
if (parent.finish) if (finish)
{ {
Poco::ScopedLock<Poco::FastMutex> lock(parent.mutex); /// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз.
if (false == pushed_end_of_output_queue.exchange(true))
/// Отдаём в основной поток пустой блок, что означает, что данных больше нет. output_queue.push(OutputData());
if (!parent.pushed_end_of_output_queue)
{
parent.pushed_end_of_output_queue = true;
parent.output_queue.push(OutputData());
} }
} }
}
private:
UnionBlockInputStream & parent;
MemoryTracker * memory_tracker;
};
unsigned max_threads; unsigned max_threads;
/// Потоки. /// Потоки.
typedef std::list<ThreadData> ThreadsData; typedef std::vector<std::thread> ThreadsData;
ThreadsData threads_data; ThreadsData threads;
/// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент. /// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент.
typedef std::queue<InputData> InputQueue; typedef std::queue<InputData> InputQueue;
@ -334,19 +298,20 @@ private:
typedef ConcurrentBoundedQueue<OutputData> OutputQueue; typedef ConcurrentBoundedQueue<OutputData> OutputQueue;
OutputQueue output_queue; OutputQueue output_queue;
/// Для операций с очередями. /// Для операций с input_queue.
Poco::FastMutex mutex; Poco::FastMutex mutex;
/// Сколько источников иссякло. /// Сколько источников иссякло.
size_t exhausted_inputs; size_t exhausted_inputs = 0;
/// Завершить работу потоков (раньше, чем иссякнут источники). /// Завершить работу потоков (раньше, чем иссякнут источники).
volatile bool finish; std::atomic<bool> finish { false };
/// Положили ли в output_queue пустой блок. /// Положили ли в output_queue пустой блок.
volatile bool pushed_end_of_output_queue; std::atomic<bool> pushed_end_of_output_queue { false };
bool all_read;
Logger * log; bool all_read { false };
Logger * log = &Logger::get("UnionBlockInputStream");
}; };
} }

View File

@ -151,8 +151,6 @@ void ExpressionAction::prepare(Block & sample_block)
{ {
for (const auto & col : columns_added_by_join) for (const auto & col : columns_added_by_join)
sample_block.insert(ColumnWithNameAndType(col.type->createColumn(), col.type, col.name)); sample_block.insert(ColumnWithNameAndType(col.type->createColumn(), col.type, col.name));
std::cerr << sample_block.dumpNames() << std::endl;
} }
else if (type == ADD_COLUMN) else if (type == ADD_COLUMN)
{ {
@ -264,8 +262,6 @@ void ExpressionAction::execute(Block & block) const
{ {
Block new_block; Block new_block;
//std::cerr << block.dumpNames() << std::endl;
for (size_t i = 0; i < projection.size(); ++i) for (size_t i = 0; i < projection.size(); ++i)
{ {
const std::string & name = projection[i].first; const std::string & name = projection[i].first;

View File

@ -1245,8 +1245,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty
Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end()); Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end());
JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness); JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness);
for (const auto & name_type : columns_added_by_join) /* for (const auto & name_type : columns_added_by_join)
std::cerr << "! Column added by JOIN: " << name_type.name << std::endl; std::cerr << "! Column added by JOIN: " << name_type.name << std::endl;*/
Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end()); Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end());
for (const auto & name_type : columns_added_by_join) for (const auto & name_type : columns_added_by_join)
@ -1596,8 +1596,6 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
if (!select_query || !select_query->join) if (!select_query || !select_query->join)
return; return;
std::cerr << "collectJoinedColumns" << std::endl;
auto & node = typeid_cast<ASTJoin &>(*select_query->join); auto & node = typeid_cast<ASTJoin &>(*select_query->join);
auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list); auto & keys = typeid_cast<ASTExpressionList &>(*node.using_expr_list);
auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов. auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов.
@ -1626,14 +1624,14 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd
} }
} }
for (const auto & name : join_key_names_left_set) /* for (const auto & name : join_key_names_left_set)
std::cerr << "JOIN key (left): " << name << std::endl; std::cerr << "JOIN key (left): " << name << std::endl;
for (const auto & name : join_key_names_right_set) for (const auto & name : join_key_names_right_set)
std::cerr << "JOIN key (right): " << name << std::endl; std::cerr << "JOIN key (right): " << name << std::endl;
std::cerr << std::endl; std::cerr << std::endl;
for (const auto & name : joined_columns) for (const auto & name : joined_columns)
std::cerr << "JOINed column: " << name << std::endl; std::cerr << "JOINed column: " << name << std::endl;
std::cerr << std::endl; std::cerr << std::endl;*/
} }
Names ExpressionAnalyzer::getRequiredColumns() Names ExpressionAnalyzer::getRequiredColumns()

View File

@ -171,10 +171,20 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net
} }
void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response) void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{ {
try try
{ {
/** Если POST и Keep-Alive, прочитаем тело до конца.
* Иначе вместо следующего запроса, будет прочитан кусок этого тела.
*/
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST
&& response.getKeepAlive()
&& !request.stream().eof())
{
request.stream().ignore(std::numeric_limits<std::streamsize>::max());
}
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent()) if (!response.sent())
response.send() << s.str() << std::endl; response.send() << s.str() << std::endl;
@ -214,26 +224,26 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
s << "Code: " << e.code() s << "Code: " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
LOG_ERROR(log, s.str()); LOG_ERROR(log, s.str());
trySendExceptionToClient(s, response); trySendExceptionToClient(s, request, response);
} }
catch (Poco::Exception & e) catch (Poco::Exception & e)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what();
trySendExceptionToClient(s, response); trySendExceptionToClient(s, request, response);
} }
catch (std::exception & e) catch (std::exception & e)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what(); s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what();
trySendExceptionToClient(s, response); trySendExceptionToClient(s, request, response);
} }
catch (...) catch (...)
{ {
std::stringstream s; std::stringstream s;
s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception."; s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception.";
trySendExceptionToClient(s, response); trySendExceptionToClient(s, request, response);
} }
} }

View File

@ -17,7 +17,7 @@ public:
} }
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response); void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
private: private:
Server & server; Server & server;

View File

@ -35,7 +35,15 @@ public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{ {
response.send() << "Ok." << std::endl; try
{
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
} }
}; };