diff --git a/dbms/include/DB/Common/ExternalTable.h b/dbms/include/DB/Common/ExternalTable.h index 55ef51b2189..554b5c1adf1 100644 --- a/dbms/include/DB/Common/ExternalTable.h +++ b/dbms/include/DB/Common/ExternalTable.h @@ -72,7 +72,7 @@ protected: } /// Функция для отладочного вывода информации - virtual void write() + void write() { std::cerr << "file " << file << std::endl; std::cerr << "name " << name << std::endl; diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index acf8d8c3f36..f47a32eef13 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -83,7 +83,7 @@ protected: Block block; ExceptionPtr exception; - + Block readImpl() { /// Если вычислений ещё не было - вычислим первый блок синхронно @@ -108,14 +108,14 @@ protected: return res; } - + void next() { ready.reset(); pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker)); } - + /// Вычисления, которые могут выполняться в отдельном потоке void calculate(MemoryTracker * memory_tracker) diff --git a/dbms/include/DB/DataStreams/UnionBlockInputStream.h b/dbms/include/DB/DataStreams/UnionBlockInputStream.h index 59073d512db..64d59102f0f 100644 --- a/dbms/include/DB/DataStreams/UnionBlockInputStream.h +++ b/dbms/include/DB/DataStreams/UnionBlockInputStream.h @@ -2,8 +2,8 @@ #include #include - -#include +#include +#include #include @@ -33,21 +33,15 @@ using Poco::SharedPtr; */ class UnionBlockInputStream : public IProfilingBlockInputStream { - class Thread; public: UnionBlockInputStream(BlockInputStreams inputs_, unsigned max_threads_ = 1) : max_threads(std::min(inputs_.size(), static_cast(max_threads_))), - output_queue(max_threads), exhausted_inputs(0), finish(false), - pushed_end_of_output_queue(false), all_read(false), log(&Logger::get("UnionBlockInputStream")) + output_queue(max_threads) { children.insert(children.end(), inputs_.begin(), inputs_.end()); for (size_t i = 0; i < inputs_.size(); ++i) - { - input_queue.push(InputData()); - input_queue.back().in = inputs_[i]; - input_queue.back().i = i; - } + input_queue.emplace(inputs_[i], i); } String getName() const { return "UnionBlockInputStream"; } @@ -116,7 +110,7 @@ public: protected: void finalize() { - if (threads_data.empty()) + if (threads.empty()) return; LOG_TRACE(log, "Waiting for threads to finish"); @@ -124,14 +118,11 @@ protected: /// Вынем всё, что есть в очереди готовых данных. output_queue.clear(); - /** В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. - * PS. Может быть, для переменной finish нужен барьер? - */ + /// В этот момент, запоздавшие потоки ещё могут вставить в очередь какие-нибудь блоки, но очередь не переполнится. + for (auto & thread : threads) + thread.join(); - for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) - it->thread->join(); - - threads_data.clear(); + threads.clear(); LOG_TRACE(log, "Waited for threads to finish"); } @@ -143,15 +134,11 @@ protected: return res.block; /// Запускаем потоки, если это ещё не было сделано. - if (threads_data.empty()) + if (threads.empty()) { - threads_data.resize(max_threads); - for (ThreadsData::iterator it = threads_data.begin(); it != threads_data.end(); ++it) - { - it->runnable = new Thread(*this, current_memory_tracker); - it->thread = new Poco::Thread; - it->thread->start(*it->runnable); - } + threads.reserve(max_threads); + for (size_t i = 0; i < max_threads; ++i) + threads.emplace_back([=] { thread(current_memory_tracker); }); } /// Будем ждать, пока будет готов следующий блок или будет выкинуто исключение. @@ -173,8 +160,9 @@ protected: /// Может быть, в очереди есть ещё эксепшен. OutputData res; - while (output_queue.tryPop(res) && res.exception) - res.exception->rethrow(); + while (output_queue.tryPop(res)) + if (res.exception) + res.exception->rethrow(); finalize(); @@ -188,132 +176,108 @@ private: { BlockInputStreamPtr in; size_t i; /// Порядковый номер источника (для отладки). + + InputData() {} + InputData(BlockInputStreamPtr & in_, size_t i_) : in(in_), i(i_) {} }; - /// Данные отдельного потока - struct ThreadData + void thread(MemoryTracker * memory_tracker) { - SharedPtr thread; - SharedPtr runnable; - }; + current_memory_tracker = memory_tracker; + ExceptionPtr exception; - - class Thread : public Poco::Runnable - { - public: - Thread(UnionBlockInputStream & parent_, MemoryTracker * memory_tracker_) - : parent(parent_), memory_tracker(memory_tracker_) + try { + loop(); + } + catch (...) + { + exception = cloneCurrentException(); } - void run() + if (exception) { - current_memory_tracker = memory_tracker; - ExceptionPtr exception; - + /// Отдаём эксепшен в основной поток. + output_queue.push(exception); + try { - loop(); + cancel(); } catch (...) { - exception = cloneCurrentException(); - } - - if (exception) - { - try - { - parent.cancel(); - } - catch (...) - { - /** Если не удалось попросить остановиться одного или несколько источников. - * (например, разорвано соединение при распределённой обработке запроса) - * - то пофиг. - */ - } - - /// Отдаём эксепшен в основной поток. - parent.output_queue.push(exception); + /** Если не удалось попросить остановиться одного или несколько источников. + * (например, разорвано соединение при распределённой обработке запроса) + * - то пофиг. + */ } } + } - void loop() + void loop() + { + while (!finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут. { - while (!parent.finish) /// Может потребоваться прекратить работу раньше, чем все источники иссякнут. + InputData input; + + /// Выбираем следующий источник. { - InputData input; + Poco::ScopedLock lock(mutex); - /// Выбираем следующий источник. + /// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.) + if (input_queue.empty()) + break; + + input = input_queue.front(); + + /// Убираем источник из очереди доступных источников. + input_queue.pop(); + } + + /// Основная работа. + Block block = input.in->read(); + + { + Poco::ScopedLock lock(mutex); + + /// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых. + if (block) { - Poco::ScopedLock lock(parent.mutex); + input_queue.push(input); - /// Если свободных источников нет, то этот поток больше не нужен. (Но другие потоки могут работать со своими источниками.) - if (parent.input_queue.empty()) + if (finish) break; - input = parent.input_queue.front(); - - /// Убираем источник из очереди доступных источников. - parent.input_queue.pop(); + output_queue.push(block); } - - /// Основная работа. - Block block = input.in->read(); - + else { - Poco::ScopedLock lock(parent.mutex); + ++exhausted_inputs; - /// Если этот источник ещё не иссяк, то положим полученный блок в очередь готовых. - if (block) + /// Если все источники иссякли. + if (exhausted_inputs == children.size()) { - parent.input_queue.push(input); - - if (parent.finish) - break; - - parent.output_queue.push(block); + finish = true; + break; } - else - { - ++parent.exhausted_inputs; - - /// Если все источники иссякли. - if (parent.exhausted_inputs == parent.children.size()) - { - parent.finish = true; - break; - } - } - } - } - - if (parent.finish) - { - Poco::ScopedLock lock(parent.mutex); - - /// Отдаём в основной поток пустой блок, что означает, что данных больше нет. - if (!parent.pushed_end_of_output_queue) - { - parent.pushed_end_of_output_queue = true; - parent.output_queue.push(OutputData()); } } } - private: - UnionBlockInputStream & parent; - MemoryTracker * memory_tracker; - }; - + if (finish) + { + /// Отдаём в основной поток пустой блок, что означает, что данных больше нет; только один раз. + if (false == pushed_end_of_output_queue.exchange(true)) + output_queue.push(OutputData()); + } + } unsigned max_threads; /// Потоки. - typedef std::list ThreadsData; - ThreadsData threads_data; + typedef std::vector ThreadsData; + ThreadsData threads; /// Очередь доступных источников, которые не заняты каким-либо потоком в данный момент. typedef std::queue InputQueue; @@ -334,19 +298,20 @@ private: typedef ConcurrentBoundedQueue OutputQueue; OutputQueue output_queue; - /// Для операций с очередями. + /// Для операций с input_queue. Poco::FastMutex mutex; /// Сколько источников иссякло. - size_t exhausted_inputs; + size_t exhausted_inputs = 0; /// Завершить работу потоков (раньше, чем иссякнут источники). - volatile bool finish; + std::atomic finish { false }; /// Положили ли в output_queue пустой блок. - volatile bool pushed_end_of_output_queue; - bool all_read; + std::atomic pushed_end_of_output_queue { false }; - Logger * log; + bool all_read { false }; + + Logger * log = &Logger::get("UnionBlockInputStream"); }; } diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index 125677ab074..ad09f269290 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -151,8 +151,6 @@ void ExpressionAction::prepare(Block & sample_block) { for (const auto & col : columns_added_by_join) sample_block.insert(ColumnWithNameAndType(col.type->createColumn(), col.type, col.name)); - - std::cerr << sample_block.dumpNames() << std::endl; } else if (type == ADD_COLUMN) { @@ -264,8 +262,6 @@ void ExpressionAction::execute(Block & block) const { Block new_block; - //std::cerr << block.dumpNames() << std::endl; - for (size_t i = 0; i < projection.size(); ++i) { const std::string & name = projection[i].first; diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index ab959ccfa06..3d6ab474c89 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -1245,8 +1245,8 @@ bool ExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, bool only_ty Names join_key_names_right(join_key_names_right_set.begin(), join_key_names_right_set.end()); JoinPtr join = new Join(join_key_names_left, join_key_names_right, settings.limits, ast_join.kind, ast_join.strictness); - for (const auto & name_type : columns_added_by_join) - std::cerr << "! Column added by JOIN: " << name_type.name << std::endl; +/* for (const auto & name_type : columns_added_by_join) + std::cerr << "! Column added by JOIN: " << name_type.name << std::endl;*/ Names required_joined_columns(join_key_names_right.begin(), join_key_names_right.end()); for (const auto & name_type : columns_added_by_join) @@ -1596,8 +1596,6 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd if (!select_query || !select_query->join) return; - std::cerr << "collectJoinedColumns" << std::endl; - auto & node = typeid_cast(*select_query->join); auto & keys = typeid_cast(*node.using_expr_list); auto & table = node.table->children.at(0); /// TODO: поддержка идентификаторов. @@ -1626,14 +1624,14 @@ void ExpressionAnalyzer::collectJoinedColumns(NameSet & joined_columns, NamesAnd } } - for (const auto & name : join_key_names_left_set) +/* for (const auto & name : join_key_names_left_set) std::cerr << "JOIN key (left): " << name << std::endl; for (const auto & name : join_key_names_right_set) std::cerr << "JOIN key (right): " << name << std::endl; std::cerr << std::endl; for (const auto & name : joined_columns) std::cerr << "JOINed column: " << name << std::endl; - std::cerr << std::endl; + std::cerr << std::endl;*/ } Names ExpressionAnalyzer::getRequiredColumns() diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index 0c234dbfb54..9f4f3690bb4 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -45,7 +45,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net std::string query_param = params.get("query", ""); if (!query_param.empty()) query_param += '\n'; - + /// Если указано compress, то будем сжимать результат. SharedPtr out = new WriteBufferFromHTTPServerResponse(response); SharedPtr out_maybe_compressed; @@ -69,7 +69,7 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net std::string quota_key = params.get("quota_key", ""); std::string query_id = params.get("query_id", ""); - + Context context = *server.global_context; context.setGlobalContext(*server.global_context); @@ -171,10 +171,20 @@ void HTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net } -void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response) +void HTTPHandler::trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { try { + /** Если POST и Keep-Alive, прочитаем тело до конца. + * Иначе вместо следующего запроса, будет прочитан кусок этого тела. + */ + if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST + && response.getKeepAlive() + && !request.stream().eof()) + { + request.stream().ignore(std::numeric_limits::max()); + } + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); if (!response.sent()) response.send() << s.str() << std::endl; @@ -214,26 +224,26 @@ void HTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne s << "Code: " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); LOG_ERROR(log, s.str()); - trySendExceptionToClient(s, response); + trySendExceptionToClient(s, request, response); } catch (Poco::Exception & e) { std::stringstream s; s << "Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code() << ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what(); - trySendExceptionToClient(s, response); + trySendExceptionToClient(s, request, response); } catch (std::exception & e) { std::stringstream s; s << "Code: " << ErrorCodes::STD_EXCEPTION << ". " << e.what(); - trySendExceptionToClient(s, response); + trySendExceptionToClient(s, request, response); } catch (...) { std::stringstream s; s << "Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ". Unknown exception."; - trySendExceptionToClient(s, response); + trySendExceptionToClient(s, request, response); } } diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index fa7a2021e6e..03c917e57c3 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -17,7 +17,7 @@ public: } void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); - void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerResponse & response); + void trySendExceptionToClient(std::stringstream & s, Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); private: Server & server; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b403220f05b..f2d3183ad7c 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -35,7 +35,15 @@ public: void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { - response.send() << "Ok." << std::endl; + try + { + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + catch (...) + { + tryLogCurrentException("PingRequestHandler"); + } } }; @@ -47,11 +55,11 @@ private: Server & server; Logger * log; std::string name; - + public: HTTPRequestHandlerFactory(Server & server_, const std::string & name_) : server(server_), log(&Logger::get(name_)), name(name_) {} - + Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) { LOG_TRACE(log, "HTTP Request for " << name << ". " @@ -74,14 +82,14 @@ class TCPConnectionFactory : public Poco::Net::TCPServerConnectionFactory private: Server & server; Logger * log; - + public: TCPConnectionFactory(Server & server_) : server(server_), log(&Logger::get("TCPConnectionFactory")) {} Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) { LOG_TRACE(log, "TCP Request. " << "Address: " << socket.peerAddress().toString()); - + return new TCPHandler(server, socket); } }; @@ -282,14 +290,14 @@ int Server::main(const std::vector & args) /// Создаём системные таблицы. global_context->addDatabase("system"); - + global_context->addTable("system", "one", StorageSystemOne::create("one")); global_context->addTable("system", "numbers", StorageSystemNumbers::create("numbers")); global_context->addTable("system", "tables", StorageSystemTables::create("tables", *global_context)); global_context->addTable("system", "databases", StorageSystemDatabases::create("databases", *global_context)); global_context->addTable("system", "processes", StorageSystemProcesses::create("processes", *global_context)); global_context->addTable("system", "events", StorageSystemEvents::create("events")); - + global_context->setCurrentDatabase(config().getString("default_database", "default")); { @@ -363,9 +371,9 @@ int Server::main(const std::vector & args) olap_http_server->start(); LOG_INFO(log, "Ready for connections."); - + waitForTerminationRequest(); - + LOG_DEBUG(log, "Received termination signal. Waiting for current connections to close."); users_config_reloader = nullptr; @@ -394,7 +402,7 @@ int Server::main(const std::vector & args) global_context = nullptr; LOG_DEBUG(log, "Destroyed global context."); - + return Application::EXIT_OK; }