From dd01eb6b40236f8e12dec9425870ab30300d25f8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 7 Aug 2018 20:57:44 +0300 Subject: [PATCH 01/16] CLICKHOUSE-3878: Add odbc-bridge first version --- dbms/programs/CMakeLists.txt | 16 +- dbms/programs/main.cpp | 8 + dbms/programs/odbc-bridge/CMakeLists.txt | 12 ++ dbms/programs/odbc-bridge/ODBCBridge.cpp | 193 +++++++++++++++++++++ dbms/programs/odbc-bridge/ODBCBridge.h | 41 +++++ dbms/programs/odbc-bridge/ODBCHandler.cpp | 137 +++++++++++++++ dbms/programs/odbc-bridge/ODBCHandler.h | 70 ++++++++ dbms/programs/odbc-bridge/odbc-bridge.cpp | 2 + dbms/src/Common/ErrorCodes.cpp | 1 + libs/libdaemon/include/daemon/BaseDaemon.h | 3 + libs/libdaemon/src/BaseDaemon.cpp | 19 +- 11 files changed, 495 insertions(+), 7 deletions(-) create mode 100644 dbms/programs/odbc-bridge/CMakeLists.txt create mode 100644 dbms/programs/odbc-bridge/ODBCBridge.cpp create mode 100644 dbms/programs/odbc-bridge/ODBCBridge.h create mode 100644 dbms/programs/odbc-bridge/ODBCHandler.cpp create mode 100644 dbms/programs/odbc-bridge/ODBCHandler.h create mode 100644 dbms/programs/odbc-bridge/odbc-bridge.cpp diff --git a/dbms/programs/CMakeLists.txt b/dbms/programs/CMakeLists.txt index a5692d81c09..bde54c42f25 100644 --- a/dbms/programs/CMakeLists.txt +++ b/dbms/programs/CMakeLists.txt @@ -13,6 +13,10 @@ option (ENABLE_CLICKHOUSE_COMPRESSOR "Enable clickhouse-compressor" ${ENABLE_CLI option (ENABLE_CLICKHOUSE_COPIER "Enable clickhouse-copier" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL}) +option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL}) + +MESSAGE(STATUS, "ENABLED_ODBC") +MESSAGE(STATUS, ${ENABLE_CLICKHOUSE_ODBC_BRIDGE}) configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h) @@ -27,10 +31,11 @@ add_subdirectory (copier) add_subdirectory (format) add_subdirectory (clang) add_subdirectory (obfuscator) +add_subdirectory (odbc-bridge) if (CLICKHOUSE_SPLIT_BINARY) set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test - clickhouse-extract-from-config clickhouse-format clickhouse-copier) + clickhouse-extract-from-config clickhouse-format clickhouse-copier clickhouse-odbc-bridge) if (USE_EMBEDDED_COMPILER) list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-clang clickhouse-lld) @@ -83,6 +88,9 @@ else () if (USE_EMBEDDED_COMPILER) target_link_libraries (clickhouse clickhouse-compiler-lib) endif () + if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) + target_link_libraries (clickhouse clickhouse-odbc-bridge-lib) + endif() set (CLICKHOUSE_BUNDLE) if (ENABLE_CLICKHOUSE_SERVER) @@ -135,6 +143,12 @@ else () install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-obfuscator DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-obfuscator) endif () + if (ENABLE_CLICKHOUSE_ODBC_BRIDGE) + add_custom_target (clickhouse-odbc-bridge ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-odbc-bridge DEPENDS clickhouse) + install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-odbc-bridge DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge) + endif () + # install always because depian package want this files: add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse) diff --git a/dbms/programs/main.cpp b/dbms/programs/main.cpp index aba03a87a83..3bf1fa1d6ed 100644 --- a/dbms/programs/main.cpp +++ b/dbms/programs/main.cpp @@ -56,6 +56,10 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv); #if ENABLE_CLICKHOUSE_OBFUSCATOR int mainEntryClickHouseObfuscator(int argc, char ** argv); #endif +#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE) +int mainEntryClickHouseODBCBridge(int argc, char ** argv); +#endif + #if USE_EMBEDDED_COMPILER int mainEntryClickHouseClang(int argc, char ** argv); @@ -101,6 +105,10 @@ std::pair clickhouse_applications[] = #if ENABLE_CLICKHOUSE_OBFUSCATOR {"obfuscator", mainEntryClickHouseObfuscator}, #endif +#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE) + {"odbc-bridge", mainEntryClickHouseODBCBridge}, +#endif + #if USE_EMBEDDED_COMPILER {"clang", mainEntryClickHouseClang}, {"clang++", mainEntryClickHouseClang}, diff --git a/dbms/programs/odbc-bridge/CMakeLists.txt b/dbms/programs/odbc-bridge/CMakeLists.txt new file mode 100644 index 00000000000..4c1df641fd8 --- /dev/null +++ b/dbms/programs/odbc-bridge/CMakeLists.txt @@ -0,0 +1,12 @@ +add_library (clickhouse-odbc-bridge-lib + ODBCBridge.cpp + ODBCHandler.cpp + ) + +target_link_libraries (clickhouse-odbc-bridge-lib clickhouse_common_io daemon) +target_include_directories (clickhouse-odbc-bridge-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include) + +if (CLICKHOUSE_SPLIT_BINARY) + add_executable (clickhouse-odbc-bridge odbc-bridge.cpp) + target_link_libraries (clickhouse-odbc-bridge clickhouse-odbc-bridge-lib) +endif () diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp new file mode 100644 index 00000000000..c9ae3d464fc --- /dev/null +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -0,0 +1,193 @@ +#include "ODBCBridge.h" +#include "ODBCHandler.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int ARGUMENT_OUT_OF_BOUND; +} + +namespace +{ + std::string getCanonicalPath(std::string && path) + { + Poco::trimInPlace(path); + if (path.empty()) + throw Exception("path configuration parameter is empty"); + if (path.back() != '/') + path += '/'; + return std::move(path); + } + + Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log) + { + Poco::Net::SocketAddress socket_address; + try + { + socket_address = Poco::Net::SocketAddress(host, port); + } + catch (const Poco::Net::DNSException & e) + { + const auto code = e.code(); + if (code == EAI_FAMILY +#if defined(EAI_ADDRFAMILY) + || code == EAI_ADDRFAMILY +#endif + ) + { + LOG_ERROR(log, + "Cannot resolve listen_host (" << host << "), error " << e.code() << ": " << e.message() + << ". " + "If it is an IPv6 address and your host has disabled IPv6, then consider to " + "specify IPv4 address to listen in element of configuration " + "file. Example: 0.0.0.0"); + } + + throw; + } + return socket_address; + } + + Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log) + { + auto address = makeSocketAddress(host, port, log); +#if POCO_VERSION < 0x01080000 + socket.bind(address, /* reuseAddress = */ true); +#else + socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false); +#endif + + socket.listen(/* backlog = */ 64); + + return address; + }; +} + +std::string ODBCBridge::getDefaultCorePath() const +{ + return getCanonicalPath(config().getString("path")) + "cores"; +} + +void ODBCBridge::handleHelp(const std::string &, const std::string &) +{ + Poco::Util::HelpFormatter helpFormatter(options()); + helpFormatter.setCommand(commandName()); + helpFormatter.setHeader("HTTP-proxy for odbc requests"); + helpFormatter.setUsage("--http-port "); + helpFormatter.format(std::cerr); + + stopOptionsProcessing(); +} + + +void ODBCBridge::defineOptions(Poco::Util::OptionSet & options) +{ + options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port")); + options.addOption( + Poco::Util::Option("http-host", "", "hostname to listen, default localhost").argument("http-host").binding("http-host")); + options.addOption( + Poco::Util::Option("http-timeout", "", "http timout for socket, default 1800").argument("http-timeout").binding("http-timeout")); + options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("log-level")); + options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024") + .argument("max-server-connections") + .binding("max-server-connections")); + options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10") + .argument("keep-alive-timeout") + .binding("keep-alive-timeout")); + + using Me = std::decay_t; + options.addOption(Poco::Util::Option("help", "", "produce this help message") + .binding("help") + .callback(Poco::Util::OptionCallback(this, &Me::handleHelp))); + + ServerApplication::defineOptions(options); /// Don't need complex .xml config +} + +void ODBCBridge::initialize(Application & self) +{ + is_help = config().has("help"); + if (is_help) + return; + + log_level = config().getString("log-level", "info"); + Poco::Logger::root().setLevel(log_level); + log = &logger(); + hostname = config().getString("http-host", "localhost"); + port = config().getUInt("http-port"); + if (port < 0 || port > 0xFFFF) + throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND); + + http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT); + max_server_connections = config().getUInt("max-server-connections", 1024); + keep_alive_timeout = config().getUInt("keep-alive-timeout", 10); + + initializeTerminationAndSignalProcessing(); + + ServerApplication::initialize(self); +} + +void ODBCBridge::uninitialize() +{ + LOG_INFO(log, "Shutting down"); + BaseDaemon::uninitialize(); +} + +int ODBCBridge::main(const std::vector & /*args*/) +{ + if (is_help) + return 0; + + LOG_INFO(log, "Starting up"); + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, hostname, port, log); + socket.setReceiveTimeout(http_timeout); + socket.setSendTimeout(http_timeout); + Poco::ThreadPool server_pool(3, max_server_connections); + Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; + http_params->setTimeout(http_timeout); + http_params->setKeepAliveTimeout(keep_alive_timeout); + + context = std::make_shared(Context::createGlobal()); + context->setGlobalContext(*context); + + auto server = Poco::Net::HTTPServer( + new ODBCRequestHandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params); + server.start(); + + LOG_INFO(log, "Listening http://" + address.toString()); + + waitForTerminationRequest(); + + + return Application::EXIT_OK; +} +} + +int mainEntryClickHouseODBCBridge(int argc, char ** argv) +{ + DB::ODBCBridge app; + try + { + return app.run(argc, argv); + } + catch (...) + { + std::cerr << DB::getCurrentExceptionMessage(true) << "\n"; + auto code = DB::getCurrentExceptionCode(); + return code ? code : 1; + } +} diff --git a/dbms/programs/odbc-bridge/ODBCBridge.h b/dbms/programs/odbc-bridge/ODBCBridge.h new file mode 100644 index 00000000000..59b75133f8e --- /dev/null +++ b/dbms/programs/odbc-bridge/ODBCBridge.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ +class ODBCBridge : public BaseDaemon +{ + +public: + void defineOptions(Poco::Util::OptionSet & options) override; + +protected: + void initialize(Application & self) override; + + void uninitialize() override; + + int main(const std::vector & args) override; + + std::string getDefaultCorePath() const override; + +private: + + void handleHelp(const std::string &, const std::string &); + + bool is_help; + std::string hostname; + UInt16 port; + size_t http_timeout; + std::string log_level; + size_t max_server_connections; + size_t keep_alive_timeout; + + Poco::Logger * log; + + std::shared_ptr context; /// need for settings only +}; + +} diff --git a/dbms/programs/odbc-bridge/ODBCHandler.cpp b/dbms/programs/odbc-bridge/ODBCHandler.cpp new file mode 100644 index 00000000000..903af5e346c --- /dev/null +++ b/dbms/programs/odbc-bridge/ODBCHandler.cpp @@ -0,0 +1,137 @@ +#include "ODBCHandler.h" +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_REQUEST_PARAMETER; +} + +namespace +{ + std::string buildConnectionString(const std::string & DSN, const std::string & database) + { + std::stringstream ss; + ss << "DSN=" << DSN << ";DATABASE=" << database; + return ss.str(); + } + NamesAndTypesList parseColumns(std::string && column_string) + { + const auto & factory_instance = DataTypeFactory::instance(); + NamesAndTypesList result; + static boost::char_separator sep(","); + boost::tokenizer> tokens(column_string, sep); + for (const std::string & name_and_type_str : tokens) + { + std::vector name_and_type; + boost::split(name_and_type, name_and_type_str, boost::is_any_of(":")); + if (name_and_type.size() != 2) + throw Exception("ODBCBridge: Invalid columns parameter '" + column_string + "'", ErrorCodes::BAD_REQUEST_PARAMETER); + result.emplace_back(name_and_type[0], factory_instance.get(name_and_type[1])); + } + return result; + } +} +void ODBCHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +{ + Poco::Net::HTMLForm params(request, request.stream()); + LOG_TRACE(log, "Request URI: " + request.getURI()); + + if (!params.has("query")) + throw Exception("ODBCBridge: No 'query' in request body", ErrorCodes::BAD_REQUEST_PARAMETER); + + std::string query = params.get("query"); + + if (!params.has("columns")) + throw Exception("ODBCBridge: No 'columns' in request body", ErrorCodes::BAD_REQUEST_PARAMETER); + + std::string columns = params.get("columns"); + + auto names_and_types = parseColumns(std::move(columns)); + Block sample_block; + for (const NameAndTypePair & column_data : names_and_types) + { + sample_block.insert({column_data.type, column_data.name}); + } + + ODBCBlockInputStream inp(pool->get(), query, sample_block, max_block_size); + + WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + std::shared_ptr writer = FormatFactory::instance().getOutput(format, out, sample_block, *context); + + writer->writePrefix(); + while (auto block = inp.read()) + writer->write(block); + + writer->writeSuffix(); + + + writer->flush(); +} + + +void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) +{ + try + { + setResponseDefaultHeaders(response, keep_alive_timeout); + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + catch (...) + { + tryLogCurrentException("PingHandler"); + } +} + +Poco::Net::HTTPRequestHandler * ODBCRequestHandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) +{ + const auto & uri = request.getURI(); + LOG_TRACE(log, "Request URI: " + uri); + + if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) + return new PingHandler(keep_alive_timeout); + + HTMLForm params(request); + std::string DSN = params.get("DSN"); + std::string database = params.get("database"); + std::string max_block_size = params.get("max_block_size", ""); + std::string format = params.get("format", "RowBinary"); + + std::string connection_string = buildConnectionString(DSN, database); + + LOG_TRACE(log, "Connection string:" << connection_string); + + if (!pool_map.count(connection_string)) + pool_map[connection_string] = createAndCheckResizePocoSessionPool( + [&] { return std::make_shared("ODBC", validateODBCConnectionString(connection_string)); }); + + if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) + return new ODBCHTTPHandler(pool_map.at(connection_string), + format, + max_block_size == "" ? DEFAULT_BLOCK_SIZE : parse(max_block_size), + keep_alive_timeout, + context); + + return nullptr; +} +} diff --git a/dbms/programs/odbc-bridge/ODBCHandler.h b/dbms/programs/odbc-bridge/ODBCHandler.h new file mode 100644 index 00000000000..857b89f8d63 --- /dev/null +++ b/dbms/programs/odbc-bridge/ODBCHandler.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +namespace DB +{ +class ODBCHTTPHandler : public Poco::Net::HTTPRequestHandler +{ +public: + ODBCHTTPHandler(std::shared_ptr pool_, + const std::string & format_, + size_t max_block_size_, + size_t keep_alive_timeout_, + std::shared_ptr context_) + : log(&Poco::Logger::get("ODBCHTTPHandler")) + , pool(pool_) + , format(format_) + , max_block_size(max_block_size_) + , keep_alive_timeout(keep_alive_timeout_) + , context(context_) + { + } + + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + +private: + Poco::Logger * log; + std::shared_ptr pool; + std::string format; + size_t max_block_size; + size_t keep_alive_timeout; + std::shared_ptr context; +}; + +class PingHandler : public Poco::Net::HTTPRequestHandler +{ +public: + PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {} + void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override; + +private: + size_t keep_alive_timeout; +}; + +class ODBCRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory +{ +public: + ODBCRequestHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr context_) + : log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_) + { + } + + Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; + +private: + Poco::Logger * log; + std::string name; + size_t keep_alive_timeout; + std::shared_ptr context; + std::unordered_map> pool_map; +}; +} diff --git a/dbms/programs/odbc-bridge/odbc-bridge.cpp b/dbms/programs/odbc-bridge/odbc-bridge.cpp new file mode 100644 index 00000000000..2d600b21bf7 --- /dev/null +++ b/dbms/programs/odbc-bridge/odbc-bridge.cpp @@ -0,0 +1,2 @@ +int mainEntryClickHouseODBCBridge(int argc, char ** argv); +int main(int argc_, char ** argv_) { return mainEntryOdbcBridge(argc_, argv_); } diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 9794e32949a..5adeb30e49d 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -379,6 +379,7 @@ namespace ErrorCodes extern const int CANNOT_IOSETUP = 402; extern const int INVALID_JOIN_ON_EXPRESSION = 403; extern const int BAD_ODBC_CONNECTION_STRING = 404; + extern const int BAD_REQUEST_PARAMETER = 405; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 9d0cbcf19c6..390560f8546 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -159,6 +159,9 @@ protected: /// thread safe virtual void handleSignal(int signal_id); + /// initialize termination process and signal handlers + virtual void initializeTerminationAndSignalProcessing(); + /// реализация обработки сигналов завершения через pipe не требует блокировки сигнала с помощью sigprocmask во всех потоках void waitForTerminationRequest() #if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 // in old upstream poco not vitrual diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index f9e87cc4693..d21c7e004ab 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -1031,6 +1031,19 @@ void BaseDaemon::initialize(Application & self) throw Poco::Exception("Cannot change directory to " + core_path); } + initializeTerminationAndSignalProcessing(); + + logRevision(); + + for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite")) + { + graphite_writers.emplace(key, std::make_unique(key)); + } +} + + +void BaseDaemon::initializeTerminationAndSignalProcessing() +{ std::set_terminate(terminate_handler); /// We want to avoid SIGPIPE when working with sockets and pipes, and just handle return value/errno instead. @@ -1071,15 +1084,9 @@ void BaseDaemon::initialize(Application & self) static KillingErrorHandler killing_error_handler; Poco::ErrorHandler::set(&killing_error_handler); - logRevision(); - signal_listener.reset(new SignalListener(*this)); signal_listener_thread.start(*signal_listener); - for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite")) - { - graphite_writers.emplace(key, std::make_unique(key)); - } } void BaseDaemon::logRevision() const From 92f3beb95be1f19c9eda6196e6d98d900ed9f2b3 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:15:29 +0300 Subject: [PATCH 02/16] CLICKHOUSE-3878: Better odbc-bridge with ability to handle custom logs and exceptions --- dbms/programs/odbc-bridge/CMakeLists.txt | 3 +- dbms/programs/odbc-bridge/HandlerFactory.cpp | 65 ++++++++ dbms/programs/odbc-bridge/HandlerFactory.h | 32 ++++ dbms/programs/odbc-bridge/Handlers.cpp | 140 ++++++++++++++++++ .../odbc-bridge/{ODBCHandler.h => Handlers.h} | 25 +--- dbms/programs/odbc-bridge/ODBCBridge.cpp | 25 +++- dbms/programs/odbc-bridge/ODBCHandler.cpp | 137 ----------------- 7 files changed, 261 insertions(+), 166 deletions(-) create mode 100644 dbms/programs/odbc-bridge/HandlerFactory.cpp create mode 100644 dbms/programs/odbc-bridge/HandlerFactory.h create mode 100644 dbms/programs/odbc-bridge/Handlers.cpp rename dbms/programs/odbc-bridge/{ODBCHandler.h => Handlers.h} (58%) delete mode 100644 dbms/programs/odbc-bridge/ODBCHandler.cpp diff --git a/dbms/programs/odbc-bridge/CMakeLists.txt b/dbms/programs/odbc-bridge/CMakeLists.txt index 4c1df641fd8..dcdfa7009d4 100644 --- a/dbms/programs/odbc-bridge/CMakeLists.txt +++ b/dbms/programs/odbc-bridge/CMakeLists.txt @@ -1,6 +1,7 @@ add_library (clickhouse-odbc-bridge-lib + Handlers.cpp + HandlerFactory.cpp ODBCBridge.cpp - ODBCHandler.cpp ) target_link_libraries (clickhouse-odbc-bridge-lib clickhouse_common_io daemon) diff --git a/dbms/programs/odbc-bridge/HandlerFactory.cpp b/dbms/programs/odbc-bridge/HandlerFactory.cpp new file mode 100644 index 00000000000..cc1a55b2335 --- /dev/null +++ b/dbms/programs/odbc-bridge/HandlerFactory.cpp @@ -0,0 +1,65 @@ +#include "HandlerFactory.h" +#include +#include "Handlers.h" + +#include +#include +#include +#include + +namespace DB +{ +namespace +{ + std::string buildConnectionString(const std::string & DSN, const std::string & database) + { + std::stringstream ss; + ss << "DSN=" << DSN << ";DATABASE=" << database; + return ss.str(); + } +} +Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) +{ + const auto & uri = request.getURI(); + LOG_TRACE(log, "Request URI: " + uri); + + if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) + return new PingHandler(keep_alive_timeout); + + HTMLForm params(request); + std::string DSN = params.get("DSN", ""); + std::string database = params.get("database", ""); + + std::string max_block_size = params.get("max_block_size", ""); + std::string format = params.get("format", "RowBinary"); + + std::string connection_string = buildConnectionString(DSN, database); + + LOG_TRACE(log, "Connection string:" << connection_string); + + std::shared_ptr pool = nullptr; + if (!pool_map.count(connection_string)) + try + { + std::string validated = validateODBCConnectionString(connection_string); + pool + = createAndCheckResizePocoSessionPool([validated] { return std::make_shared("ODBC", validated); }); + pool_map[connection_string] = pool; + } + catch (const Exception & ex) + { + LOG_WARNING(log, "Connection string validation failed: " + ex.message()); + } + else + { + pool = pool_map[connection_string]; + } + + + if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) + return new ODBCHandler( + pool, format, max_block_size == "" ? DEFAULT_BLOCK_SIZE : parse(max_block_size), keep_alive_timeout, context); + + return nullptr; +} +} diff --git a/dbms/programs/odbc-bridge/HandlerFactory.h b/dbms/programs/odbc-bridge/HandlerFactory.h new file mode 100644 index 00000000000..9df3b7ece9c --- /dev/null +++ b/dbms/programs/odbc-bridge/HandlerFactory.h @@ -0,0 +1,32 @@ +#pragma once +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" + #include +#pragma GCC diagnostic pop + + +namespace DB +{ +class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory +{ +public: + HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr context_) + : log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_) + { + } + + Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; + +private: + Poco::Logger * log; + std::string name; + size_t keep_alive_timeout; + std::shared_ptr context; + std::unordered_map> pool_map; +}; +} diff --git a/dbms/programs/odbc-bridge/Handlers.cpp b/dbms/programs/odbc-bridge/Handlers.cpp new file mode 100644 index 00000000000..22355fb546d --- /dev/null +++ b/dbms/programs/odbc-bridge/Handlers.cpp @@ -0,0 +1,140 @@ +#include "Handlers.h" +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_REQUEST_PARAMETER; +} + +namespace +{ + std::optional parseColumns(std::string && column_string, Poco::Logger * log) + { + const auto & factory_instance = DataTypeFactory::instance(); + NamesAndTypesList result; + static boost::char_separator sep(","); + boost::tokenizer> tokens(column_string, sep); + for (const std::string & name_and_type_str : tokens) + { + std::vector name_and_type; + boost::split(name_and_type, name_and_type_str, boost::is_any_of(":")); + if (name_and_type.size() != 2) + return std::nullopt; + try + { + result.emplace_back(name_and_type[0], factory_instance.get(name_and_type[1])); + } + catch (...) + { + tryLogCurrentException(log); + return std::nullopt; + } + } + return result; + } +} + +void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) +{ + Poco::Net::HTMLForm params(request, request.stream()); + LOG_TRACE(log, "Request URI: " + request.getURI()); + + auto process_error = [&response, this](const std::string & message) { + response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); + if (!response.sent()) + response.send() << message << std::endl; + LOG_WARNING(log, message); + }; + + if (pool == nullptr) + { + process_error("ODBCBridge: DSN or database in URL params is not provided or incorrect"); + return; + } + + if (!params.has("query")) + { + process_error("ODBCBridge: No 'query' in request body"); + return; + } + + if (!params.has("columns")) + { + process_error("ODBCBridge: No 'columns' in request body"); + return; + } + + std::string query = params.get("query"); + std::string columns = params.get("columns"); + + auto names_and_types = parseColumns(std::move(columns), log); + + if (!names_and_types) + { + process_error("ODBCBridge: Invalid 'columns' parameter in request body"); + return; + } + + Block sample_block; + for (const NameAndTypePair & column_data : *names_and_types) + sample_block.insert({column_data.type, column_data.name}); + + WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + + std::shared_ptr writer = FormatFactory::instance().getOutput(format, out, sample_block, *context); + + try + { + ODBCBlockInputStream inp(pool->get(), query, sample_block, max_block_size); + + writer->writePrefix(); + while (auto block = inp.read()) + writer->write(block); + + writer->writeSuffix(); + + + writer->flush(); + } + catch (...) + { + auto message = "Communication with ODBC-driver failed: \n" + getCurrentExceptionMessage(true); + response.setStatusAndReason( + Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending + writeStringBinary(message, out); + LOG_WARNING(log, message); + } +} + + +void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) +{ + try + { + setResponseDefaultHeaders(response, keep_alive_timeout); + const char * data = "Ok.\n"; + response.sendBuffer(data, strlen(data)); + } + catch (...) + { + tryLogCurrentException("PingHandler"); + } +} +} diff --git a/dbms/programs/odbc-bridge/ODBCHandler.h b/dbms/programs/odbc-bridge/Handlers.h similarity index 58% rename from dbms/programs/odbc-bridge/ODBCHandler.h rename to dbms/programs/odbc-bridge/Handlers.h index 857b89f8d63..4628ce5fa69 100644 --- a/dbms/programs/odbc-bridge/ODBCHandler.h +++ b/dbms/programs/odbc-bridge/Handlers.h @@ -3,7 +3,6 @@ #include #include #include -#include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -12,15 +11,15 @@ namespace DB { -class ODBCHTTPHandler : public Poco::Net::HTTPRequestHandler +class ODBCHandler : public Poco::Net::HTTPRequestHandler { public: - ODBCHTTPHandler(std::shared_ptr pool_, + ODBCHandler(std::shared_ptr pool_, const std::string & format_, size_t max_block_size_, size_t keep_alive_timeout_, std::shared_ptr context_) - : log(&Poco::Logger::get("ODBCHTTPHandler")) + : log(&Poco::Logger::get("ODBCHandler")) , pool(pool_) , format(format_) , max_block_size(max_block_size_) @@ -49,22 +48,4 @@ public: private: size_t keep_alive_timeout; }; - -class ODBCRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory -{ -public: - ODBCRequestHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr context_) - : log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_) - { - } - - Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; - -private: - Poco::Logger * log; - std::string name; - size_t keep_alive_timeout; - std::shared_ptr context; - std::unordered_map> pool_map; -}; } diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp index c9ae3d464fc..9d5047492ca 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.cpp +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -1,5 +1,5 @@ #include "ODBCBridge.h" -#include "ODBCHandler.h" +#include "HandlerFactory.h" #include #include @@ -101,7 +101,6 @@ void ODBCBridge::defineOptions(Poco::Util::OptionSet & options) Poco::Util::Option("http-host", "", "hostname to listen, default localhost").argument("http-host").binding("http-host")); options.addOption( Poco::Util::Option("http-timeout", "", "http timout for socket, default 1800").argument("http-timeout").binding("http-timeout")); - options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("log-level")); options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024") .argument("max-server-connections") .binding("max-server-connections")); @@ -109,22 +108,36 @@ void ODBCBridge::defineOptions(Poco::Util::OptionSet & options) .argument("keep-alive-timeout") .binding("keep-alive-timeout")); + options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level")); + + options.addOption( + Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log")); + + options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no") + .argument("err-log-path") + .binding("logger.errorlog")); + using Me = std::decay_t; options.addOption(Poco::Util::Option("help", "", "produce this help message") .binding("help") .callback(Poco::Util::OptionCallback(this, &Me::handleHelp))); - ServerApplication::defineOptions(options); /// Don't need complex .xml config + ServerApplication::defineOptions(options); /// Don't need complex BaseDaemon's .xml config } void ODBCBridge::initialize(Application & self) { is_help = config().has("help"); + if (is_help) return; - log_level = config().getString("log-level", "info"); - Poco::Logger::root().setLevel(log_level); + if (!config().has("logger.log")) + config().setBool("logger.console", true); + + config().setString("logger", "ODBCBridge"); + + buildLoggers(config()); log = &logger(); hostname = config().getString("http-host", "localhost"); port = config().getUInt("http-port"); @@ -165,7 +178,7 @@ int ODBCBridge::main(const std::vector & /*args*/) context->setGlobalContext(*context); auto server = Poco::Net::HTTPServer( - new ODBCRequestHandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params); + new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params); server.start(); LOG_INFO(log, "Listening http://" + address.toString()); diff --git a/dbms/programs/odbc-bridge/ODBCHandler.cpp b/dbms/programs/odbc-bridge/ODBCHandler.cpp deleted file mode 100644 index 903af5e346c..00000000000 --- a/dbms/programs/odbc-bridge/ODBCHandler.cpp +++ /dev/null @@ -1,137 +0,0 @@ -#include "ODBCHandler.h" -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -namespace DB -{ -namespace ErrorCodes -{ - extern const int BAD_REQUEST_PARAMETER; -} - -namespace -{ - std::string buildConnectionString(const std::string & DSN, const std::string & database) - { - std::stringstream ss; - ss << "DSN=" << DSN << ";DATABASE=" << database; - return ss.str(); - } - NamesAndTypesList parseColumns(std::string && column_string) - { - const auto & factory_instance = DataTypeFactory::instance(); - NamesAndTypesList result; - static boost::char_separator sep(","); - boost::tokenizer> tokens(column_string, sep); - for (const std::string & name_and_type_str : tokens) - { - std::vector name_and_type; - boost::split(name_and_type, name_and_type_str, boost::is_any_of(":")); - if (name_and_type.size() != 2) - throw Exception("ODBCBridge: Invalid columns parameter '" + column_string + "'", ErrorCodes::BAD_REQUEST_PARAMETER); - result.emplace_back(name_and_type[0], factory_instance.get(name_and_type[1])); - } - return result; - } -} -void ODBCHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) -{ - Poco::Net::HTMLForm params(request, request.stream()); - LOG_TRACE(log, "Request URI: " + request.getURI()); - - if (!params.has("query")) - throw Exception("ODBCBridge: No 'query' in request body", ErrorCodes::BAD_REQUEST_PARAMETER); - - std::string query = params.get("query"); - - if (!params.has("columns")) - throw Exception("ODBCBridge: No 'columns' in request body", ErrorCodes::BAD_REQUEST_PARAMETER); - - std::string columns = params.get("columns"); - - auto names_and_types = parseColumns(std::move(columns)); - Block sample_block; - for (const NameAndTypePair & column_data : names_and_types) - { - sample_block.insert({column_data.type, column_data.name}); - } - - ODBCBlockInputStream inp(pool->get(), query, sample_block, max_block_size); - - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); - std::shared_ptr writer = FormatFactory::instance().getOutput(format, out, sample_block, *context); - - writer->writePrefix(); - while (auto block = inp.read()) - writer->write(block); - - writer->writeSuffix(); - - - writer->flush(); -} - - -void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) -{ - try - { - setResponseDefaultHeaders(response, keep_alive_timeout); - const char * data = "Ok.\n"; - response.sendBuffer(data, strlen(data)); - } - catch (...) - { - tryLogCurrentException("PingHandler"); - } -} - -Poco::Net::HTTPRequestHandler * ODBCRequestHandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) -{ - const auto & uri = request.getURI(); - LOG_TRACE(log, "Request URI: " + uri); - - if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) - return new PingHandler(keep_alive_timeout); - - HTMLForm params(request); - std::string DSN = params.get("DSN"); - std::string database = params.get("database"); - std::string max_block_size = params.get("max_block_size", ""); - std::string format = params.get("format", "RowBinary"); - - std::string connection_string = buildConnectionString(DSN, database); - - LOG_TRACE(log, "Connection string:" << connection_string); - - if (!pool_map.count(connection_string)) - pool_map[connection_string] = createAndCheckResizePocoSessionPool( - [&] { return std::make_shared("ODBC", validateODBCConnectionString(connection_string)); }); - - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) - return new ODBCHTTPHandler(pool_map.at(connection_string), - format, - max_block_size == "" ? DEFAULT_BLOCK_SIZE : parse(max_block_size), - keep_alive_timeout, - context); - - return nullptr; -} -} From fe10ccb1f73fed655577424b83e7149763c990e8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:29:09 +0300 Subject: [PATCH 03/16] CLICKHOUSE-3878: Remove some copypaste --- dbms/programs/odbc-bridge/ODBCBridge.cpp | 19 +------------------ dbms/programs/odbc-bridge/ODBCBridge.h | 2 -- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp index 9d5047492ca..cc74be9983b 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.cpp +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -23,16 +23,6 @@ namespace ErrorCodes namespace { - std::string getCanonicalPath(std::string && path) - { - Poco::trimInPlace(path); - if (path.empty()) - throw Exception("path configuration parameter is empty"); - if (path.back() != '/') - path += '/'; - return std::move(path); - } - Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log) { Poco::Net::SocketAddress socket_address; @@ -77,11 +67,6 @@ namespace }; } -std::string ODBCBridge::getDefaultCorePath() const -{ - return getCanonicalPath(config().getString("path")) + "cores"; -} - void ODBCBridge::handleHelp(const std::string &, const std::string &) { Poco::Util::HelpFormatter helpFormatter(options()); @@ -155,14 +140,13 @@ void ODBCBridge::initialize(Application & self) void ODBCBridge::uninitialize() { - LOG_INFO(log, "Shutting down"); BaseDaemon::uninitialize(); } int ODBCBridge::main(const std::vector & /*args*/) { if (is_help) - return 0; + return Application::EXIT_OK; LOG_INFO(log, "Starting up"); Poco::Net::ServerSocket socket; @@ -185,7 +169,6 @@ int ODBCBridge::main(const std::vector & /*args*/) waitForTerminationRequest(); - return Application::EXIT_OK; } } diff --git a/dbms/programs/odbc-bridge/ODBCBridge.h b/dbms/programs/odbc-bridge/ODBCBridge.h index 59b75133f8e..41949e2675e 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.h +++ b/dbms/programs/odbc-bridge/ODBCBridge.h @@ -19,8 +19,6 @@ protected: int main(const std::vector & args) override; - std::string getDefaultCorePath() const override; - private: void handleHelp(const std::string &, const std::string &); From b31dd7bf1f347d5a892552844cc6689f2b7d9918 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:41:08 +0300 Subject: [PATCH 04/16] CLICKHOUSE-3878: Correct max_block_size handling --- dbms/programs/odbc-bridge/HandlerFactory.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/dbms/programs/odbc-bridge/HandlerFactory.cpp b/dbms/programs/odbc-bridge/HandlerFactory.cpp index cc1a55b2335..6cf79c71312 100644 --- a/dbms/programs/odbc-bridge/HandlerFactory.cpp +++ b/dbms/programs/odbc-bridge/HandlerFactory.cpp @@ -30,7 +30,7 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco: std::string DSN = params.get("DSN", ""); std::string database = params.get("database", ""); - std::string max_block_size = params.get("max_block_size", ""); + std::string max_block_size_str = params.get("max_block_size", ""); std::string format = params.get("format", "RowBinary"); std::string connection_string = buildConnectionString(DSN, database); @@ -55,10 +55,20 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco: pool = pool_map[connection_string]; } + size_t max_block_size = DEFAULT_BLOCK_SIZE; + + if (!max_block_size_str.empty()) + try + { + max_block_size = std::stoul(max_block_size_str); + } + catch (...) + { + tryLogCurrentException(log); + } if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) - return new ODBCHandler( - pool, format, max_block_size == "" ? DEFAULT_BLOCK_SIZE : parse(max_block_size), keep_alive_timeout, context); + return new ODBCHandler(pool, format, max_block_size, keep_alive_timeout, context); return nullptr; } From 46e9dc132a572a005387f4536a696686fda941dc Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:44:41 +0300 Subject: [PATCH 05/16] CLICKHOUSE-3878: Remove redundant message --- dbms/programs/CMakeLists.txt | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbms/programs/CMakeLists.txt b/dbms/programs/CMakeLists.txt index bde54c42f25..f4de2f4de4d 100644 --- a/dbms/programs/CMakeLists.txt +++ b/dbms/programs/CMakeLists.txt @@ -15,9 +15,6 @@ option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL}) option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL}) -MESSAGE(STATUS, "ENABLED_ODBC") -MESSAGE(STATUS, ${ENABLE_CLICKHOUSE_ODBC_BRIDGE}) - configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h) add_subdirectory (server) From 65c6a8ff93f657a78574dd98dade1eccadbc4ee5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:49:49 +0300 Subject: [PATCH 06/16] CLICKHOUSE-3878: Try to avoid merge conflict --- dbms/src/Common/ErrorCodes.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 5adeb30e49d..966bbd1dee2 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -379,7 +379,8 @@ namespace ErrorCodes extern const int CANNOT_IOSETUP = 402; extern const int INVALID_JOIN_ON_EXPRESSION = 403; extern const int BAD_ODBC_CONNECTION_STRING = 404; - extern const int BAD_REQUEST_PARAMETER = 405; + extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 405; + extern const int BAD_REQUEST_PARAMETER = 406; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; From 97bcdcedb3e7900a152c26057ceaf32c19c08004 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 8 Aug 2018 19:52:11 +0300 Subject: [PATCH 07/16] mistake --- dbms/src/Common/ErrorCodes.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 966bbd1dee2..5adeb30e49d 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -379,8 +379,7 @@ namespace ErrorCodes extern const int CANNOT_IOSETUP = 402; extern const int INVALID_JOIN_ON_EXPRESSION = 403; extern const int BAD_ODBC_CONNECTION_STRING = 404; - extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 405; - extern const int BAD_REQUEST_PARAMETER = 406; + extern const int BAD_REQUEST_PARAMETER = 405; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; From 6d40546a9a4f835d1320f7bd04a2524fa1e38448 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 9 Aug 2018 15:57:34 +0300 Subject: [PATCH 08/16] CLICKHOUSE-3878: Remove connection string building and validation, change format of columns in request, more convinient exception messages --- dbms/programs/odbc-bridge/HandlerFactory.cpp | 53 +---------- dbms/programs/odbc-bridge/HandlerFactory.h | 4 +- dbms/programs/odbc-bridge/Handlers.cpp | 97 +++++++++++--------- dbms/programs/odbc-bridge/Handlers.h | 22 +++-- dbms/programs/odbc-bridge/ODBCBridge.cpp | 2 +- 5 files changed, 72 insertions(+), 106 deletions(-) diff --git a/dbms/programs/odbc-bridge/HandlerFactory.cpp b/dbms/programs/odbc-bridge/HandlerFactory.cpp index 6cf79c71312..a5e9f4b014c 100644 --- a/dbms/programs/odbc-bridge/HandlerFactory.cpp +++ b/dbms/programs/odbc-bridge/HandlerFactory.cpp @@ -1,6 +1,5 @@ #include "HandlerFactory.h" #include -#include "Handlers.h" #include #include @@ -9,15 +8,6 @@ namespace DB { -namespace -{ - std::string buildConnectionString(const std::string & DSN, const std::string & database) - { - std::stringstream ss; - ss << "DSN=" << DSN << ";DATABASE=" << database; - return ss.str(); - } -} Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request) { const auto & uri = request.getURI(); @@ -26,49 +16,8 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco: if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET) return new PingHandler(keep_alive_timeout); - HTMLForm params(request); - std::string DSN = params.get("DSN", ""); - std::string database = params.get("database", ""); - - std::string max_block_size_str = params.get("max_block_size", ""); - std::string format = params.get("format", "RowBinary"); - - std::string connection_string = buildConnectionString(DSN, database); - - LOG_TRACE(log, "Connection string:" << connection_string); - - std::shared_ptr pool = nullptr; - if (!pool_map.count(connection_string)) - try - { - std::string validated = validateODBCConnectionString(connection_string); - pool - = createAndCheckResizePocoSessionPool([validated] { return std::make_shared("ODBC", validated); }); - pool_map[connection_string] = pool; - } - catch (const Exception & ex) - { - LOG_WARNING(log, "Connection string validation failed: " + ex.message()); - } - else - { - pool = pool_map[connection_string]; - } - - size_t max_block_size = DEFAULT_BLOCK_SIZE; - - if (!max_block_size_str.empty()) - try - { - max_block_size = std::stoul(max_block_size_str); - } - catch (...) - { - tryLogCurrentException(log); - } - if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST) - return new ODBCHandler(pool, format, max_block_size, keep_alive_timeout, context); + return new ODBCHandler(pool_map, keep_alive_timeout, context); return nullptr; } diff --git a/dbms/programs/odbc-bridge/HandlerFactory.h b/dbms/programs/odbc-bridge/HandlerFactory.h index 9df3b7ece9c..b33108766f2 100644 --- a/dbms/programs/odbc-bridge/HandlerFactory.h +++ b/dbms/programs/odbc-bridge/HandlerFactory.h @@ -3,6 +3,7 @@ #include #include #include +#include "Handlers.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -18,6 +19,7 @@ public: HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr context_) : log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_) { + pool_map = std::make_shared(); } Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override; @@ -27,6 +29,6 @@ private: std::string name; size_t keep_alive_timeout; std::shared_ptr context; - std::unordered_map> pool_map; + std::shared_ptr pool_map; }; } diff --git a/dbms/programs/odbc-bridge/Handlers.cpp b/dbms/programs/odbc-bridge/Handlers.cpp index 22355fb546d..701cfdfb6e4 100644 --- a/dbms/programs/odbc-bridge/Handlers.cpp +++ b/dbms/programs/odbc-bridge/Handlers.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include #include @@ -25,32 +27,46 @@ namespace ErrorCodes namespace { - std::optional parseColumns(std::string && column_string, Poco::Logger * log) + std::unique_ptr parseColumns(std::string && column_string) { - const auto & factory_instance = DataTypeFactory::instance(); - NamesAndTypesList result; - static boost::char_separator sep(","); - boost::tokenizer> tokens(column_string, sep); - for (const std::string & name_and_type_str : tokens) + std::unique_ptr sample_block = std::make_unique(); + auto names_and_types = NamesAndTypesList::parse(column_string); + for (const NameAndTypePair & column_data : names_and_types) + sample_block->insert({column_data.type, column_data.name}); + return sample_block; + } + + size_t parseMaxBlockSize(const std::string & max_block_size_str, Poco::Logger * log) + { + size_t max_block_size = DEFAULT_BLOCK_SIZE; + if (!max_block_size_str.empty()) { - std::vector name_and_type; - boost::split(name_and_type, name_and_type_str, boost::is_any_of(":")); - if (name_and_type.size() != 2) - return std::nullopt; try { - result.emplace_back(name_and_type[0], factory_instance.get(name_and_type[1])); + max_block_size = std::stoul(max_block_size_str); } catch (...) { tryLogCurrentException(log); - return std::nullopt; } } - return result; + return max_block_size; } } + +ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) +{ + if (!pool_map->count(connection_str)) + { + std::lock_guard lock(mutex); + pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] { + return std::make_shared("ODBC", connection_str); + })); + } + return pool_map->at(connection_str); +} + void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) { Poco::Net::HTMLForm params(request, request.stream()); @@ -63,12 +79,6 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne LOG_WARNING(log, message); }; - if (pool == nullptr) - { - process_error("ODBCBridge: DSN or database in URL params is not provided or incorrect"); - return; - } - if (!params.has("query")) { process_error("ODBCBridge: No 'query' in request body"); @@ -77,45 +87,49 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne if (!params.has("columns")) { - process_error("ODBCBridge: No 'columns' in request body"); + process_error("ODBCBridge: No 'columns' in request URL"); return; } - std::string query = params.get("query"); - std::string columns = params.get("columns"); - - auto names_and_types = parseColumns(std::move(columns), log); - - if (!names_and_types) + if (!params.has("connection_string")) { - process_error("ODBCBridge: Invalid 'columns' parameter in request body"); + process_error("ODBCBridge: No 'connection_string' in request URL"); return; } - Block sample_block; - for (const NameAndTypePair & column_data : *names_and_types) - sample_block.insert({column_data.type, column_data.name}); - - WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); - - std::shared_ptr writer = FormatFactory::instance().getOutput(format, out, sample_block, *context); + size_t max_block_size = parseMaxBlockSize(params.get("max_block_size", ""), log); + std::string columns = params.get("columns"); + std::unique_ptr sample_block; try { - ODBCBlockInputStream inp(pool->get(), query, sample_block, max_block_size); + sample_block = parseColumns(std::move(columns)); + } + catch (const Exception & ex) + { + process_error("ODBCBridge: Invalid 'columns' parameter in request body '" + ex.message() + "'"); + return; + } - writer->writePrefix(); - while (auto block = inp.read()) - writer->write(block); + std::string format = params.get("format", "RowBinary"); + std::string query = params.get("query"); + LOG_TRACE(log, "ODBCBridge: Query '" << query << "'"); - writer->writeSuffix(); + std::string connection_string = params.get("connection_string"); + LOG_TRACE(log, "ODBCBridge: Connection string '" << connection_string << "'"); + WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); + try + { - writer->flush(); + BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context); + auto pool = getPool(connection_string); + ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); + copyData(inp, *writer); } catch (...) { - auto message = "Communication with ODBC-driver failed: \n" + getCurrentExceptionMessage(true); + auto message = "ODBCBridge:\n" + getCurrentExceptionMessage(true); response.setStatusAndReason( Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending writeStringBinary(message, out); @@ -123,7 +137,6 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne } } - void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response) { try diff --git a/dbms/programs/odbc-bridge/Handlers.h b/dbms/programs/odbc-bridge/Handlers.h index 4628ce5fa69..0047acde0bb 100644 --- a/dbms/programs/odbc-bridge/Handlers.h +++ b/dbms/programs/odbc-bridge/Handlers.h @@ -6,7 +6,7 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" -#include + #include #pragma GCC diagnostic pop namespace DB @@ -14,15 +14,14 @@ namespace DB class ODBCHandler : public Poco::Net::HTTPRequestHandler { public: - ODBCHandler(std::shared_ptr pool_, - const std::string & format_, - size_t max_block_size_, + using PoolPtr = std::shared_ptr; + using PoolMap = std::unordered_map; + + ODBCHandler(std::shared_ptr pool_map_, size_t keep_alive_timeout_, std::shared_ptr context_) : log(&Poco::Logger::get("ODBCHandler")) - , pool(pool_) - , format(format_) - , max_block_size(max_block_size_) + , pool_map(pool_map_) , keep_alive_timeout(keep_alive_timeout_) , context(context_) { @@ -32,11 +31,14 @@ public: private: Poco::Logger * log; - std::shared_ptr pool; - std::string format; - size_t max_block_size; + + std::shared_ptr pool_map; size_t keep_alive_timeout; std::shared_ptr context; + + static inline std::mutex mutex; + + PoolPtr getPool(const std::string & connection_str); }; class PingHandler : public Poco::Net::HTTPRequestHandler diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp index cc74be9983b..123fb5efa4b 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.cpp +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -126,7 +126,7 @@ void ODBCBridge::initialize(Application & self) log = &logger(); hostname = config().getString("http-host", "localhost"); port = config().getUInt("http-port"); - if (port < 0 || port > 0xFFFF) + if (port > 0xFFFF) throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND); http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT); From 04db4ddc4671c6d19f929fafab4f149e399a66e0 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 9 Aug 2018 21:49:05 +0300 Subject: [PATCH 09/16] CLICKHOUSE-3878: Next iteration in odbc-bridge --- dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/Storages/StorageODBC.cpp | 166 ++++++++++++++---- dbms/src/Storages/StorageODBC.h | 57 +++--- dbms/src/Storages/StorageURL.cpp | 61 +++++-- dbms/src/Storages/StorageURL.h | 54 ++++-- dbms/src/TableFunctions/TableFunctionODBC.cpp | 2 +- 6 files changed, 248 insertions(+), 93 deletions(-) diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 966bbd1dee2..15e6bba45f2 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -381,6 +381,7 @@ namespace ErrorCodes extern const int BAD_ODBC_CONNECTION_STRING = 404; extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 405; extern const int BAD_REQUEST_PARAMETER = 406; + extern const int EXTERNAL_EXECUTABLE_NOT_FOUND = 407; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 2361597e04b..52664447386 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -1,73 +1,161 @@ -#include -#include -#include -#include -#include #include #include +#include +#include #include +#include +#include +#include +#include +#include +#include - +#include +#include +#include +#include +#include +#include namespace DB { - namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int EXTERNAL_EXECUTABLE_NOT_FOUND; } -StorageODBC::StorageODBC( - const std::string & name, +StorageODBC::StorageODBC(const std::string & table_name_, const std::string & connection_string, - const std::string & remote_database_name, - const std::string & remote_table_name, - const ColumnsDescription & columns_) - : IStorage{columns_} - , name(name) - , remote_database_name(remote_database_name) - , remote_table_name(remote_table_name) + const std::string & remote_database_name_, + const std::string & remote_table_name_, + const ColumnsDescription & columns_, + const Context & context_) + : IStorageURLBase(Poco::URI(), context_, table_name_, "RowBinary", columns_) + , connection_string(connection_string) + , remote_database_name(remote_database_name_) + , remote_table_name(remote_table_name_) { - pool = createAndCheckResizePocoSessionPool([&] - { - return std::make_shared("ODBC", validateODBCConnectionString(connection_string)); - }); + const auto & config = context_.getConfigRef(); + size_t bridge_port = config.getUInt("odbc_bridge.port", 9018); + std::string bridge_host = config.getString("odbc_bridge.host", "localhost"); + + uri.setHost(bridge_host); + uri.setPort(bridge_port); + uri.setScheme("http"); + + ping_uri = uri; + ping_uri.setPath("/ping"); } -BlockInputStreams StorageODBC::read( - const Names & column_names, +std::string StorageODBC::getReadMethod() const +{ + return Poco::Net::HTTPRequest::HTTP_POST; +} + +std::vector> StorageODBC::getReadURIParams(const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum & /*processed_stage*/, + size_t max_block_size) const +{ + NamesAndTypesList cols; + for (const String & name : column_names) + { + auto column_data = getColumn(name); + cols.emplace_back(column_data.name, column_data.type); + } + std::vector> result; + + result.emplace_back("connection_string", connection_string); + result.emplace_back("columns", cols.toString()); + result.emplace_back("max_block_size", std::to_string(max_block_size)); + + return result; +} + +std::function StorageODBC::getReadPOSTDataCallback(const Names & /*column_names*/, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & /*processed_stage*/, + size_t /*max_block_size*/) const +{ + String query = transformQueryForExternalDatabase( + *query_info.query, getColumns().ordinary, IdentifierQuotingStyle::DoubleQuotes, remote_database_name, remote_table_name, context); + + return [query](std::ostream & os) { os << "query=" << query; }; +} + +bool StorageODBC::checkODBCBridgeIsRunning() const +{ + try + { + ReadWriteBufferFromHTTP buf(ping_uri, Poco::Net::HTTPRequest::HTTP_GET, nullptr); + return checkString("Ok.", buf); + } + catch (...) + { + return false; + } +} + + +void StorageODBC::startODBCBridge() const +{ + const auto & config = context_global.getConfigRef(); + const auto & settings = context_global.getSettingsRef(); + Poco::Path path{config.getString("application.dir", "")}; + path.setFileName("clickhouse-odbc-bridge"); + + if (!path.isFile()) + throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND); + + std::stringstream command; + command << path.toString() << ' '; + command << "--daemon" << ' '; + command << "--http-port " << config.getUInt("odbc_bridge.port", 9018) << ' '; + command << "--http-host " << config.getString("odbc_bridge.host", "localhost") << ' '; + command << "--http-timeout " << settings.http_receive_timeout.value.totalSeconds() << ' '; + if (config.has("logger.odbc_bridge_log")) + command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' '; + if (config.has("logger.odbc_bridge_errlog")) + command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' '; + if (config.has("logger.odbc_bridge_level")) + command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' '; + + auto cmd = ShellCommand::execute(command.str()); + cmd->wait(); +} +BlockInputStreams StorageODBC::read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, - unsigned /*num_streams*/) + unsigned num_streams) { - check(column_names); - processed_stage = QueryProcessingStage::FetchColumns; - String query = transformQueryForExternalDatabase( - *query_info.query, getColumns().ordinary, IdentifierQuotingStyle::DoubleQuotes, remote_database_name, remote_table_name, context); - - Block sample_block; - for (const String & name : column_names) + if (!checkODBCBridgeIsRunning()) { - auto column_data = getColumn(name); - sample_block.insert({ column_data.type, column_data.name }); + startODBCBridge(); + size_t counter = 0; + while (!checkODBCBridgeIsRunning() && counter <= 5) + { + sleep(1); + counter++; + } } - return { std::make_shared(pool->get(), query, sample_block, max_block_size) }; + return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams); } void registerStorageODBC(StorageFactory & factory) { - factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args) - { + factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; if (engine_args.size() != 3) throw Exception( - "Storage ODBC requires exactly 3 parameters: ODBC('DSN', database, table).", - ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + "Storage ODBC requires exactly 3 parameters: ODBC('DSN', database, table).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (size_t i = 0; i < 2; ++i) engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context); @@ -76,8 +164,8 @@ void registerStorageODBC(StorageFactory & factory) static_cast(*engine_args[0]).value.safeGet(), static_cast(*engine_args[1]).value.safeGet(), static_cast(*engine_args[2]).value.safeGet(), - args.columns); + args.columns, + args.context); }); } - } diff --git a/dbms/src/Storages/StorageODBC.h b/dbms/src/Storages/StorageODBC.h index 605d35b0202..38ed5ba8285 100644 --- a/dbms/src/Storages/StorageODBC.h +++ b/dbms/src/Storages/StorageODBC.h @@ -1,49 +1,60 @@ #pragma once +#include #include -#include - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" - #include -#pragma GCC diagnostic pop - - namespace DB { - /** Implements storage in the ODBC database. * Use ENGINE = odbc(connection_string, table_name) * Example ENGINE = odbc('dsn=test', table) * Read only. */ -class StorageODBC : public ext::shared_ptr_helper, public IStorage +class StorageODBC : public ext::shared_ptr_helper, public IStorageURLBase { public: - StorageODBC( - const std::string & name, - const std::string & connection_string, - const std::string & remote_database_name, - const std::string & remote_table_name, - const ColumnsDescription & columns_); + std::string getName() const override + { + return "ODBC"; + } - std::string getName() const override { return "ODBC"; } - std::string getTableName() const override { return name; } - - BlockInputStreams read( - const Names & column_names, + BlockInputStreams read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned num_streams) override; + +protected: + StorageODBC(const std::string & table_name_, + const std::string & connection_string, + const std::string & remote_database_name, + const std::string & remote_table_name, + const ColumnsDescription & columns_, + const Context & context_); + private: - std::string name; + std::string connection_string; std::string remote_database_name; std::string remote_table_name; + Poco::URI ping_uri; - std::shared_ptr pool; + std::string getReadMethod() const override; + + std::vector> getReadURIParams(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const override; + + std::function getReadPOSTDataCallback(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const override; + + bool checkODBCBridgeIsRunning() const; + void startODBCBridge() const; }; } diff --git a/dbms/src/Storages/StorageURL.cpp b/dbms/src/Storages/StorageURL.cpp index 1c3b8246492..b94c0b8390d 100644 --- a/dbms/src/Storages/StorageURL.cpp +++ b/dbms/src/Storages/StorageURL.cpp @@ -24,12 +24,12 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; }; -StorageURL::StorageURL(const Poco::URI & uri_, +IStorageURLBase::IStorageURLBase(const Poco::URI & uri_, + const Context & context_, const std::string & table_name_, const String & format_name_, - const ColumnsDescription & columns_, - Context & context_) - : IStorage(columns_), uri(uri_), format_name(format_name_), table_name(table_name_), context_global(context_) + const ColumnsDescription & columns_) + : IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_) { } @@ -39,6 +39,8 @@ namespace { public: StorageURLBlockInputStream(const Poco::URI & uri, + const std::string & method, + std::function callback, const String & format, const String & name_, const Block & sample_block, @@ -47,7 +49,7 @@ namespace const ConnectionTimeouts & timeouts) : name(name_) { - read_buf = std::make_unique(uri, Poco::Net::HTTPRequest::HTTP_GET, nullptr, timeouts); + read_buf = std::make_unique(uri, method, callback, timeouts); reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); } @@ -89,7 +91,7 @@ namespace StorageURLBlockOutputStream(const Poco::URI & uri, const String & format, const Block & sample_block_, - Context & context, + const Context & context, const ConnectionTimeouts & timeouts) : sample_block(sample_block_) { @@ -127,16 +129,45 @@ namespace } -BlockInputStreams StorageURL::read( - const Names & /*column_names*/, +std::string IStorageURLBase::getReadMethod() const +{ + return Poco::Net::HTTPRequest::HTTP_GET; +} + +std::vector> IStorageURLBase::getReadURIParams(const Names & /*column_names*/, const SelectQueryInfo & /*query_info*/, - const Context & context, + const Context & /*context*/, QueryProcessingStage::Enum & /*processed_stage*/, + size_t /*max_block_size*/) const +{ + return {}; +} + +std::function IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/, + const SelectQueryInfo & /*query_info*/, + const Context & /*context*/, + QueryProcessingStage::Enum & /*processed_stage*/, + size_t /*max_block_size*/) const +{ + return nullptr; +} + + +BlockInputStreams IStorageURLBase::read(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned /*num_streams*/) { - return {std::make_shared( - uri, + auto request_uri = uri; + auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size); + for (const auto & [param, value] : params) + request_uri.addQueryParameter(param, value); + + return {std::make_shared(request_uri, + getReadMethod(), + getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size), format_name, getName(), getSampleBlock(), @@ -145,9 +176,9 @@ BlockInputStreams StorageURL::read( ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))}; } -void StorageURL::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {} +void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {} -BlockOutputStreamPtr StorageURL::write(const ASTPtr & /*query*/, const Settings & /*settings*/) +BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Settings & /*settings*/) { return std::make_shared( uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global.getSettingsRef())); @@ -155,8 +186,7 @@ BlockOutputStreamPtr StorageURL::write(const ASTPtr & /*query*/, const Settings void registerStorageURL(StorageFactory & factory) { - factory.registerStorage("URL", [](const StorageFactory::Arguments & args) - { + factory.registerStorage("URL", [](const StorageFactory::Arguments & args) { ASTs & engine_args = args.engine_args; if (!(engine_args.size() == 1 || engine_args.size() == 2)) @@ -175,5 +205,4 @@ void registerStorageURL(StorageFactory & factory) return StorageURL::create(uri, args.table_name, format_name, args.columns, args.context); }); } - } diff --git a/dbms/src/Storages/StorageURL.h b/dbms/src/Storages/StorageURL.h index 80bec4e8d35..b375f800b4a 100644 --- a/dbms/src/Storages/StorageURL.h +++ b/dbms/src/Storages/StorageURL.h @@ -13,14 +13,9 @@ namespace DB * HTTP POST when insert is called. In POST request the data is send * using Chunked transfer encoding, so server have to support it. */ -class StorageURL : public ext::shared_ptr_helper, public IStorage +class IStorageURLBase : public IStorage { public: - String getName() const override - { - return "URL"; - } - String getTableName() const override { return table_name; @@ -38,18 +33,49 @@ public: void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; protected: + IStorageURLBase(const Poco::URI & uri_, + const Context & context_, + const std::string & table_name_, + const String & format_name_, + const ColumnsDescription & columns_); + + Poco::URI uri; + const Context & context_global; + +private: + String format_name; + String table_name; + + virtual std::string getReadMethod() const; + + virtual std::vector> getReadURIParams(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const; + + virtual std::function getReadPOSTDataCallback(const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size) const; +}; + +class StorageURL : public ext::shared_ptr_helper, public IStorageURLBase +{ +public: StorageURL(const Poco::URI & uri_, const std::string & table_name_, const String & format_name_, const ColumnsDescription & columns_, - Context & context_); + Context & context_) + : IStorageURLBase(uri_, context_, table_name_, format_name_, columns_) + { + } -private: - Poco::URI uri; - String format_name; - String table_name; - Context & context_global; - - Logger * log = &Logger::get("StorageURL"); + String getName() const override + { + return "URL"; + } }; } diff --git a/dbms/src/TableFunctions/TableFunctionODBC.cpp b/dbms/src/TableFunctions/TableFunctionODBC.cpp index a45283c65f2..228c00d1893 100644 --- a/dbms/src/TableFunctions/TableFunctionODBC.cpp +++ b/dbms/src/TableFunctions/TableFunctionODBC.cpp @@ -114,7 +114,7 @@ StoragePtr TableFunctionODBC::executeImpl(const ASTPtr & ast_function, const Con columns.emplace_back(reinterpret_cast(column_name), getDataType(type)); } - auto result = StorageODBC::create(table_name, connection_string, "", table_name, ColumnsDescription{columns}); + auto result = StorageODBC::create(table_name, connection_string, "", table_name, ColumnsDescription{columns}, context); result->startup(); return result; } From dde09bd8a5311a041bd3e042d106111e38553eca Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 10 Aug 2018 14:42:12 +0300 Subject: [PATCH 10/16] CLICKHOUSE-3878: Start bridge not like daemon but background child, more explicit logging, fix mutex bug, add SCOPE_EXIT in bridge --- dbms/programs/odbc-bridge/Handlers.cpp | 35 ++++++++++-------------- dbms/programs/odbc-bridge/ODBCBridge.cpp | 21 ++++++++++++-- dbms/src/Common/ErrorCodes.cpp | 1 + dbms/src/Storages/StorageODBC.cpp | 30 ++++++++++++++------ dbms/src/Storages/StorageODBC.h | 2 ++ 5 files changed, 57 insertions(+), 32 deletions(-) diff --git a/dbms/programs/odbc-bridge/Handlers.cpp b/dbms/programs/odbc-bridge/Handlers.cpp index 701cfdfb6e4..1f60eddeeb5 100644 --- a/dbms/programs/odbc-bridge/Handlers.cpp +++ b/dbms/programs/odbc-bridge/Handlers.cpp @@ -35,31 +35,14 @@ namespace sample_block->insert({column_data.type, column_data.name}); return sample_block; } - - size_t parseMaxBlockSize(const std::string & max_block_size_str, Poco::Logger * log) - { - size_t max_block_size = DEFAULT_BLOCK_SIZE; - if (!max_block_size_str.empty()) - { - try - { - max_block_size = std::stoul(max_block_size_str); - } - catch (...) - { - tryLogCurrentException(log); - } - } - return max_block_size; - } } ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str) { + std::lock_guard lock(mutex); if (!pool_map->count(connection_str)) { - std::lock_guard lock(mutex); pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] { return std::make_shared("ODBC", connection_str); })); @@ -97,7 +80,17 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne return; } - size_t max_block_size = parseMaxBlockSize(params.get("max_block_size", ""), log); + size_t max_block_size = DEFAULT_BLOCK_SIZE; + if (params.has("max_block_size")) + { + std::string max_block_size_str = params.get("max_block_size", ""); + if (max_block_size_str.empty()) + { + process_error("ODBCBridge: Empty max_block_size specified"); + return; + } + max_block_size = parse(max_block_size_str); + } std::string columns = params.get("columns"); std::unique_ptr sample_block; @@ -108,12 +101,13 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne catch (const Exception & ex) { process_error("ODBCBridge: Invalid 'columns' parameter in request body '" + ex.message() + "'"); + LOG_WARNING(log, ex.getStackTrace().toString()); return; } std::string format = params.get("format", "RowBinary"); std::string query = params.get("query"); - LOG_TRACE(log, "ODBCBridge: Query '" << query << "'"); + LOG_TRACE(log, "ODBCBridge: " << query); std::string connection_string = params.get("connection_string"); LOG_TRACE(log, "ODBCBridge: Connection string '" << connection_string << "'"); @@ -121,7 +115,6 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); try { - BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context); auto pool = getPool(connection_string); ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size); diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp index 123fb5efa4b..46a6e33a386 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.cpp +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -13,6 +13,8 @@ #include #include #include +#include +#include namespace DB { @@ -83,9 +85,10 @@ void ODBCBridge::defineOptions(Poco::Util::OptionSet & options) { options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port")); options.addOption( - Poco::Util::Option("http-host", "", "hostname to listen, default localhost").argument("http-host").binding("http-host")); + Poco::Util::Option("listen-host", "", "hostname to listen, default localhost").argument("listen-host").binding("listen-host")); options.addOption( Poco::Util::Option("http-timeout", "", "http timout for socket, default 1800").argument("http-timeout").binding("http-timeout")); + options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024") .argument("max-server-connections") .binding("max-server-connections")); @@ -124,7 +127,7 @@ void ODBCBridge::initialize(Application & self) buildLoggers(config()); log = &logger(); - hostname = config().getString("http-host", "localhost"); + hostname = config().getString("listen-host", "localhost"); port = config().getUInt("http-port"); if (port > 0xFFFF) throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND); @@ -167,8 +170,20 @@ int ODBCBridge::main(const std::vector & /*args*/) LOG_INFO(log, "Listening http://" + address.toString()); - waitForTerminationRequest(); + SCOPE_EXIT({ + LOG_DEBUG(log, "Received termination signal."); + LOG_DEBUG(log, "Waiting for current connections to close."); + server.stop(); + for (size_t count : ext::range(1, 6)) + { + if (server.currentConnections() == 0) + break; + LOG_DEBUG(log, "Waiting for " << server.currentConnections() << " connections, try " << count); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + waitForTerminationRequest(); return Application::EXIT_OK; } } diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 15e6bba45f2..004a92af1a3 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -382,6 +382,7 @@ namespace ErrorCodes extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 405; extern const int BAD_REQUEST_PARAMETER = 406; extern const int EXTERNAL_EXECUTABLE_NOT_FOUND = 407; + extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING = 408; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 52664447386..1cbbc2a1540 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -16,12 +16,14 @@ #include #include #include +#include namespace DB { namespace ErrorCodes { extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int EXTERNAL_EXECUTABLE_NOT_FOUND; + extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING; } @@ -35,6 +37,7 @@ StorageODBC::StorageODBC(const std::string & table_name_, , connection_string(connection_string) , remote_database_name(remote_database_name_) , remote_table_name(remote_table_name_) + , log(&Poco::Logger::get("StorageODBC")) { const auto & config = context_.getConfigRef(); size_t bridge_port = config.getUInt("odbc_bridge.port", 9018); @@ -99,7 +102,6 @@ bool StorageODBC::checkODBCBridgeIsRunning() const } } - void StorageODBC::startODBCBridge() const { const auto & config = context_global.getConfigRef(); @@ -112,9 +114,8 @@ void StorageODBC::startODBCBridge() const std::stringstream command; command << path.toString() << ' '; - command << "--daemon" << ' '; command << "--http-port " << config.getUInt("odbc_bridge.port", 9018) << ' '; - command << "--http-host " << config.getString("odbc_bridge.host", "localhost") << ' '; + command << "--listen-host " << config.getString("odbc_bridge.listen_host", "localhost") << ' '; command << "--http-timeout " << settings.http_receive_timeout.value.totalSeconds() << ' '; if (config.has("logger.odbc_bridge_log")) command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' '; @@ -122,10 +123,15 @@ void StorageODBC::startODBCBridge() const command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' '; if (config.has("logger.odbc_bridge_level")) command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' '; + command << "&"; // we don't want to wait this process - auto cmd = ShellCommand::execute(command.str()); + auto command_str = command.str(); + LOG_TRACE(log, "Starting clickhouse-odbc-bridge with command: " << command_str); + + auto cmd = ShellCommand::execute(command_str); cmd->wait(); } + BlockInputStreams StorageODBC::read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -135,13 +141,21 @@ BlockInputStreams StorageODBC::read(const Names & column_names, { if (!checkODBCBridgeIsRunning()) { + LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it"); startODBCBridge(); - size_t counter = 0; - while (!checkODBCBridgeIsRunning() && counter <= 5) + bool started = false; + for (size_t counter : ext::range(1, 6)) { - sleep(1); - counter++; + LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter); + if (checkODBCBridgeIsRunning()) + { + started = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); } + if (!started) + throw Exception("StorageODBC: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING); } return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams); diff --git a/dbms/src/Storages/StorageODBC.h b/dbms/src/Storages/StorageODBC.h index 38ed5ba8285..4108887aa9b 100644 --- a/dbms/src/Storages/StorageODBC.h +++ b/dbms/src/Storages/StorageODBC.h @@ -40,6 +40,8 @@ private: std::string remote_table_name; Poco::URI ping_uri; + Poco::Logger * log; + std::string getReadMethod() const override; std::vector> getReadURIParams(const Names & column_names, From c3588a582cc3233c9c2cea0ee560e8592934ae83 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 10 Aug 2018 17:46:12 +0300 Subject: [PATCH 11/16] CLICKHOUSE-3878: Add some comments and small readme --- dbms/programs/odbc-bridge/HandlerFactory.h | 3 ++ dbms/programs/odbc-bridge/Handlers.h | 7 ++++ dbms/programs/odbc-bridge/ODBCBridge.h | 12 ++++--- dbms/programs/odbc-bridge/README.md | 38 ++++++++++++++++++++++ 4 files changed, 55 insertions(+), 5 deletions(-) create mode 100644 dbms/programs/odbc-bridge/README.md diff --git a/dbms/programs/odbc-bridge/HandlerFactory.h b/dbms/programs/odbc-bridge/HandlerFactory.h index b33108766f2..92a0267a16c 100644 --- a/dbms/programs/odbc-bridge/HandlerFactory.h +++ b/dbms/programs/odbc-bridge/HandlerFactory.h @@ -13,6 +13,9 @@ namespace DB { +/** Factory for '/ping' and '/' handlers. + * Also stores Session pools for ODBC connections + */ class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory { public: diff --git a/dbms/programs/odbc-bridge/Handlers.h b/dbms/programs/odbc-bridge/Handlers.h index 0047acde0bb..a8cb65015d7 100644 --- a/dbms/programs/odbc-bridge/Handlers.h +++ b/dbms/programs/odbc-bridge/Handlers.h @@ -11,6 +11,11 @@ namespace DB { +/** Main handler for requests to ODBC driver + * requires connection_string and columns in request params + * and also query in request body + * response in RowBinary format + */ class ODBCHandler : public Poco::Net::HTTPRequestHandler { public: @@ -41,6 +46,8 @@ private: PoolPtr getPool(const std::string & connection_str); }; +/** Simple ping handler, answers "Ok." to GET request + */ class PingHandler : public Poco::Net::HTTPRequestHandler { public: diff --git a/dbms/programs/odbc-bridge/ODBCBridge.h b/dbms/programs/odbc-bridge/ODBCBridge.h index 41949e2675e..730dcda07d6 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.h +++ b/dbms/programs/odbc-bridge/ODBCBridge.h @@ -1,14 +1,18 @@ #pragma once -#include -#include #include +#include +#include namespace DB { +/** Class represents clickhouse-odbc-bridge server, which listen + * incoming HTTP POST and GET requests on specified port and host. + * Has two handlers '/' for all incoming POST requests to ODBC driver + * and /ping for GET request about service status + */ class ODBCBridge : public BaseDaemon { - public: void defineOptions(Poco::Util::OptionSet & options) override; @@ -20,7 +24,6 @@ protected: int main(const std::vector & args) override; private: - void handleHelp(const std::string &, const std::string &); bool is_help; @@ -35,5 +38,4 @@ private: std::shared_ptr context; /// need for settings only }; - } diff --git a/dbms/programs/odbc-bridge/README.md b/dbms/programs/odbc-bridge/README.md new file mode 100644 index 00000000000..91a6e476733 --- /dev/null +++ b/dbms/programs/odbc-bridge/README.md @@ -0,0 +1,38 @@ +# clickhouse-odbc-bridge + +Simple HTTP-server which works like a proxy for ODBC driver. The main motivation +was possible segfaults or another faults in ODBC implementations, which can +crash whole clickhouse-server process. + +This tool works via HTTP, not via pipes, shared memory, or TCP because: +- It's simplier to implement +- It's simplier to debug +- jdbc-bridge can be implemented in the same way + +## Usage + +`clickhouse-server` use this tool inside odbc table function and StorageODBC. +However it can be used as standalone tool from command line with the following +parameters in POST-request URL: +- `connection_string` -- ODBC connection string. +- `columns` -- columns in ClickHouse NamesAndTypesList format, name in backticks, + type as string. Name and type are space separated, rows separated with + newline. +- `max_block_size` -- optional parameter, sets maximum size of single block. +Query is send in post body. Response is returned in RowBinary format. + +## Example: + +```bash +$ clickhouse-odbc-bridge --http-port 9018 --daemon + +$ curl -d "query=SELECT PageID, ImpID, AdType FROM Keys ORDER BY PageID, ImpID" --data-urlencode "connection_string=DSN=ClickHouse;DATABASE=stat" --data-urlencode "columns=columns format version: 1 +3 columns: +\`PageID\` String +\`ImpID\` String +\`AdType\` String +" "http://localhost:9018/" > result.txt + +$ cat result.txt +12246623837185725195925621517 +``` From f11574cbfbfe6d5a6df5faac1f6cd0161c2644ac Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 10 Aug 2018 18:07:54 +0300 Subject: [PATCH 12/16] CLICKHOUSE-3878: Sleep optimization --- dbms/src/Storages/StorageODBC.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 1cbbc2a1540..594ae19abe5 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -144,7 +144,7 @@ BlockInputStreams StorageODBC::read(const Names & column_names, LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it"); startODBCBridge(); bool started = false; - for (size_t counter : ext::range(1, 6)) + for (size_t counter : ext::range(1, 20)) { LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter); if (checkODBCBridgeIsRunning()) @@ -152,7 +152,7 @@ BlockInputStreams StorageODBC::read(const Names & column_names, started = true; break; } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if (!started) throw Exception("StorageODBC: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING); From 53b23e0113697c05fdfadd867a0f11795b484b30 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 12 Aug 2018 15:23:22 +0300 Subject: [PATCH 13/16] CLICKHOUSE-3878: Add inherited fd's closing function --- dbms/programs/odbc-bridge/ODBCBridge.cpp | 1 + dbms/programs/odbc-bridge/odbc-bridge.cpp | 2 +- libs/libdaemon/include/daemon/BaseDaemon.h | 5 ++++ libs/libdaemon/src/BaseDaemon.cpp | 34 ++++++++++++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dbms/programs/odbc-bridge/ODBCBridge.cpp b/dbms/programs/odbc-bridge/ODBCBridge.cpp index 46a6e33a386..bab58250fa4 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.cpp +++ b/dbms/programs/odbc-bridge/ODBCBridge.cpp @@ -115,6 +115,7 @@ void ODBCBridge::defineOptions(Poco::Util::OptionSet & options) void ODBCBridge::initialize(Application & self) { + BaseDaemon::closeFDs(); is_help = config().has("help"); if (is_help) diff --git a/dbms/programs/odbc-bridge/odbc-bridge.cpp b/dbms/programs/odbc-bridge/odbc-bridge.cpp index 2d600b21bf7..af42eef8647 100644 --- a/dbms/programs/odbc-bridge/odbc-bridge.cpp +++ b/dbms/programs/odbc-bridge/odbc-bridge.cpp @@ -1,2 +1,2 @@ int mainEntryClickHouseODBCBridge(int argc, char ** argv); -int main(int argc_, char ** argv_) { return mainEntryOdbcBridge(argc_, argv_); } +int main(int argc_, char ** argv_) { return mainEntryClickHouseODBCBridge(argc_, argv_); } diff --git a/libs/libdaemon/include/daemon/BaseDaemon.h b/libs/libdaemon/include/daemon/BaseDaemon.h index 390560f8546..7a77e74cc34 100644 --- a/libs/libdaemon/include/daemon/BaseDaemon.h +++ b/libs/libdaemon/include/daemon/BaseDaemon.h @@ -145,6 +145,11 @@ public: return layer; /// layer выставляется в классе-наследнике BaseDaemonApplication. } + /// close all process FDs except + /// 0-2 -- stdin, stdout, stderr + /// also doesn't close global internal pipes for signal handling + void closeFDs(); + protected: /// Возвращает TaskManager приложения /// все методы task_manager следует вызывать из одного потока diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index d21c7e004ab..8e6a885cf05 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #if USE_UNWIND #define UNW_LOCAL_ONLY @@ -54,6 +55,7 @@ #include #include #include +#include #include #include #include @@ -845,8 +847,40 @@ std::string BaseDaemon::getDefaultCorePath() const return "/opt/cores/"; } +void BaseDaemon::closeFDs() +{ +#if defined(__FreeBSD__) || (defined(__APPLE__) && defined(__MACH__)) + Poco::File proc_path{"/dev/fd"}; +#else + Poco::File proc_path{"/proc/self/fd"}; +#endif + if (proc_path.isDirectory()) /// Hooray, proc exists + { + Poco::DirectoryIterator itr(proc_path), end; + for (; itr != end; ++itr) + { + long fd = DB::parse(itr.name()); + if (fd > 2 && fd != signal_pipe.read_fd && fd != signal_pipe.write_fd) + ::close(fd); + } + } + else + { + long max_fd = -1; +#ifdef _SC_OPEN_MAX + max_fd = sysconf(_SC_OPEN_MAX); + if (max_fd == -1) +#endif + max_fd = 256; /// bad fallback + for (long fd = 3; fd < max_fd; ++fd) + if (fd != signal_pipe.read_fd && fd != signal_pipe.write_fd) + ::close(fd); + } +} + void BaseDaemon::initialize(Application & self) { + closeFDs(); task_manager.reset(new Poco::TaskManager); ServerApplication::initialize(self); From 76baaf96207a68beab2d7fae09853d8f40b72113 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 13 Aug 2018 18:00:41 +0300 Subject: [PATCH 14/16] CLICKHOUSE-3878: Remove redundant prefix --- dbms/programs/odbc-bridge/Handlers.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dbms/programs/odbc-bridge/Handlers.cpp b/dbms/programs/odbc-bridge/Handlers.cpp index 1f60eddeeb5..9e8cb5dee1a 100644 --- a/dbms/programs/odbc-bridge/Handlers.cpp +++ b/dbms/programs/odbc-bridge/Handlers.cpp @@ -64,19 +64,19 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne if (!params.has("query")) { - process_error("ODBCBridge: No 'query' in request body"); + process_error("No 'query' in request body"); return; } if (!params.has("columns")) { - process_error("ODBCBridge: No 'columns' in request URL"); + process_error("No 'columns' in request URL"); return; } if (!params.has("connection_string")) { - process_error("ODBCBridge: No 'connection_string' in request URL"); + process_error("No 'connection_string' in request URL"); return; } @@ -86,7 +86,7 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne std::string max_block_size_str = params.get("max_block_size", ""); if (max_block_size_str.empty()) { - process_error("ODBCBridge: Empty max_block_size specified"); + process_error("Empty max_block_size specified"); return; } max_block_size = parse(max_block_size_str); @@ -100,17 +100,17 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne } catch (const Exception & ex) { - process_error("ODBCBridge: Invalid 'columns' parameter in request body '" + ex.message() + "'"); + process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'"); LOG_WARNING(log, ex.getStackTrace().toString()); return; } std::string format = params.get("format", "RowBinary"); std::string query = params.get("query"); - LOG_TRACE(log, "ODBCBridge: " << query); + LOG_TRACE(log, "Query: " << query); std::string connection_string = params.get("connection_string"); - LOG_TRACE(log, "ODBCBridge: Connection string '" << connection_string << "'"); + LOG_TRACE(log, "Connection string: '" << connection_string << "'"); WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout); try @@ -122,11 +122,11 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne } catch (...) { - auto message = "ODBCBridge:\n" + getCurrentExceptionMessage(true); + auto message = getCurrentExceptionMessage(true); response.setStatusAndReason( Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending writeStringBinary(message, out); - LOG_WARNING(log, message); + tryLogCurrentException(log); } } From 83d5dba53b530756ad8487e9f6500f5d8400a5ba Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 13 Aug 2018 21:10:26 +0300 Subject: [PATCH 15/16] CLICKHOUSE-3878: Move ODBCDictionary to odbc-bridge --- dbms/src/Common/ODBCBridgeHelper.cpp | 106 ++++++++++++++++++ dbms/src/Common/ODBCBridgeHelper.h | 47 ++++++++ .../src/Dictionaries/ODBCDictionarySource.cpp | 102 +++++++++++++---- dbms/src/Dictionaries/ODBCDictionarySource.h | 14 ++- dbms/src/Storages/StorageODBC.cpp | 85 ++------------ dbms/src/Storages/StorageODBC.h | 7 +- 6 files changed, 259 insertions(+), 102 deletions(-) create mode 100644 dbms/src/Common/ODBCBridgeHelper.cpp create mode 100644 dbms/src/Common/ODBCBridgeHelper.h diff --git a/dbms/src/Common/ODBCBridgeHelper.cpp b/dbms/src/Common/ODBCBridgeHelper.cpp new file mode 100644 index 00000000000..92405a9270f --- /dev/null +++ b/dbms/src/Common/ODBCBridgeHelper.cpp @@ -0,0 +1,106 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING; +} +ODBCBridgeHelper::ODBCBridgeHelper(const Context & context_global_, const std::string & connection_string_) + : context_global(context_global_), connection_string(validateODBCConnectionString(connection_string_)) +{ + const auto & config = context_global.getConfigRef(); + size_t bridge_port = config.getUInt("odbc_bridge.port", DEFAULT_PORT); + std::string bridge_host = config.getString("odbc_bridge.host", DEFAULT_HOST); + + ping_url.setHost(bridge_host); + ping_url.setPort(bridge_port); + ping_url.setScheme("http"); + ping_url.setPath(PING_HANDLER); +} +void ODBCBridgeHelper::startODBCBridge() const +{ + const auto & config = context_global.getConfigRef(); + const auto & settings = context_global.getSettingsRef(); + Poco::Path path{config.getString("application.dir", "")}; + path.setFileName("clickhouse-odbc-bridge"); + + if (!path.isFile()) + throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND); + + std::stringstream command; + command << path.toString() << ' '; + command << "--http-port " << config.getUInt("odbc_bridge.port", DEFAULT_PORT) << ' '; + command << "--listen-host " << config.getString("odbc_bridge.listen_host", DEFAULT_HOST) << ' '; + command << "--http-timeout " << settings.http_receive_timeout.value.totalSeconds() << ' '; + if (config.has("logger.odbc_bridge_log")) + command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' '; + if (config.has("logger.odbc_bridge_errlog")) + command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' '; + if (config.has("logger.odbc_bridge_level")) + command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' '; + command << "&"; /// we don't want to wait this process + + auto command_str = command.str(); + LOG_TRACE(log, "Starting clickhouse-odbc-bridge with command: " << command_str); + + auto cmd = ShellCommand::execute(command_str); + cmd->wait(); +} + +std::vector> ODBCBridgeHelper::getURLParams(const NamesAndTypesList & cols, size_t max_block_size) const +{ + std::vector> result; + + result.emplace_back("connection_string", connection_string); /// already validated + result.emplace_back("columns", cols.toString()); + result.emplace_back("max_block_size", std::to_string(max_block_size)); + + return result; +} + +bool ODBCBridgeHelper::checkODBCBridgeIsRunning() const +{ + try + { + ReadWriteBufferFromHTTP buf(ping_url, ODBCBridgeHelper::PING_METHOD, nullptr); + return checkString(ODBCBridgeHelper::PING_OK_ANSWER, buf); + } + catch (...) + { + return false; + } +} + +void ODBCBridgeHelper::startODBCBridgeSync() const +{ + if (!checkODBCBridgeIsRunning()) + { + LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it"); + startODBCBridge(); + bool started = false; + for (size_t counter : ext::range(1, 20)) + { + LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter); + if (checkODBCBridgeIsRunning()) + { + started = true; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + if (!started) + throw Exception("ODBCBridgeHelper: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING); + } +} +} diff --git a/dbms/src/Common/ODBCBridgeHelper.h b/dbms/src/Common/ODBCBridgeHelper.h new file mode 100644 index 00000000000..4ef0cc871dd --- /dev/null +++ b/dbms/src/Common/ODBCBridgeHelper.h @@ -0,0 +1,47 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int EXTERNAL_EXECUTABLE_NOT_FOUND; +} +/** Helper for odbc-bridge, provide utility methods, not main request + */ +class ODBCBridgeHelper +{ +private: + const Context & context_global; + + std::string connection_string; + + Poco::URI ping_url; + + Poco::Logger * log = &Poco::Logger::get("ODBCBridgeHelper"); + +public: + static constexpr inline size_t DEFAULT_PORT = 9018; + + static constexpr inline auto DEFAULT_HOST = "localhost"; + static constexpr inline auto DEFAULT_FORMAT = "RowBinary"; + static constexpr inline auto PING_HANDLER = "/ping"; + static constexpr inline auto MAIN_HANDLER = "/"; + static constexpr inline auto PING_OK_ANSWER = "Ok."; + + static const inline std::string PING_METHOD = Poco::Net::HTTPRequest::HTTP_GET; + static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST; + + ODBCBridgeHelper(const Context & context_global_, const std::string & connection_string_); + + std::vector> getURLParams(const NamesAndTypesList & cols, size_t max_block_size) const; + bool checkODBCBridgeIsRunning() const; + + void startODBCBridge() const; + void startODBCBridgeSync() const; +}; +} diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.cpp b/dbms/src/Dictionaries/ODBCDictionarySource.cpp index 2224bb8e223..7dde01bb080 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.cpp +++ b/dbms/src/Dictionaries/ODBCDictionarySource.cpp @@ -10,11 +10,50 @@ #include #include #include +#include +#include namespace DB { +namespace +{ + class ODBCBridgeBlockInputStream : public IProfilingBlockInputStream + { + public: + ODBCBridgeBlockInputStream(const Poco::URI & uri, + std::function callback, + const Block & sample_block, + const Context & context, + size_t max_block_size, + const ConnectionTimeouts & timeouts) + { + read_buf = std::make_unique(uri, ODBCBridgeHelper::MAIN_METHOD, callback, timeouts); + reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size); + } + + Block readImpl() override + { + return reader->read(); + } + + Block getHeader() const override + { + return reader->getHeader(); + } + + String getName() const override + { + return "ODBCBridgeBlockInputStream"; + } + + private: + std::unique_ptr read_buf; + BlockInputStreamPtr reader; + }; + +} static const size_t max_block_size = 8192; @@ -32,20 +71,22 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru sample_block{sample_block}, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None}, /// NOTE Better to obtain quoting style via ODBC interface. load_all_query{query_builder.composeLoadAllQuery()}, - invalidate_query{config.getString(config_prefix + ".invalidate_query", "")} + invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}, + odbc_bridge_helper{context, config.getString(config_prefix + ".connection_string")}, + timeouts{ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())}, + global_context(context) { - std::size_t field_size = context.getSettingsRef().odbc_max_field_size; + const auto & global_config = context.getConfigRef(); + size_t bridge_port = global_config.getUInt("odbc_bridge.port", ODBCBridgeHelper::DEFAULT_PORT); + std::string bridge_host = global_config.getString("odbc_bridge.host", ODBCBridgeHelper::DEFAULT_HOST); - pool = createAndCheckResizePocoSessionPool([&] - { - auto session = std::make_shared( - config.getString(config_prefix + ".connector", "ODBC"), - validateODBCConnectionString(config.getString(config_prefix + ".connection_string"))); + bridge_url.setHost(bridge_host); + bridge_url.setPort(bridge_port); + bridge_url.setScheme("http"); - /// Default POCO value is 1024. Set property manually to make possible reading of longer strings. - session->setProperty("maxFieldSize", Poco::Any(field_size)); - return session; - }); + auto url_params = odbc_bridge_helper.getURLParams(sample_block.getNamesAndTypesList(), max_block_size); + for (const auto & [name, value] : url_params) + bridge_url.addQueryParameter(name, value); } /// copy-constructor is provided in order to support cloneability @@ -58,11 +99,14 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other) where{other.where}, update_field{other.update_field}, sample_block{other.sample_block}, - pool{other.pool}, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None}, load_all_query{other.load_all_query}, - invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response} + invalidate_query{other.invalidate_query}, + invalidate_query_response{other.invalidate_query_response}, + odbc_bridge_helper{other.odbc_bridge_helper}, + global_context{other.global_context} { + } std::string ODBCDictionarySource::getUpdateFieldAndDate() @@ -86,7 +130,7 @@ std::string ODBCDictionarySource::getUpdateFieldAndDate() BlockInputStreamPtr ODBCDictionarySource::loadAll() { LOG_TRACE(log, load_all_query); - return std::make_shared(pool->get(), load_all_query, sample_block, max_block_size); + return loadBase(load_all_query); } BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll() @@ -94,20 +138,20 @@ BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll() std::string load_query_update = getUpdateFieldAndDate(); LOG_TRACE(log, load_query_update); - return std::make_shared(pool->get(), load_query_update, sample_block, max_block_size); + return loadBase(load_query_update); } BlockInputStreamPtr ODBCDictionarySource::loadIds(const std::vector & ids) { const auto query = query_builder.composeLoadIdsQuery(ids); - return std::make_shared(pool->get(), query, sample_block, max_block_size); + return loadBase(query); } BlockInputStreamPtr ODBCDictionarySource::loadKeys( const Columns & key_columns, const std::vector & requested_rows) { const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN); - return std::make_shared(pool->get(), query, sample_block, max_block_size); + return loadBase(query); } bool ODBCDictionarySource::supportsSelectiveLoad() const @@ -148,8 +192,28 @@ std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) Block invalidate_sample_block; ColumnPtr column(ColumnString::create()); invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared(), "Sample Block")); - ODBCBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, 1); - return readInvalidateQuery(block_input_stream); + odbc_bridge_helper.startODBCBridgeSync(); + ReadWriteBufferFromHTTP buf(bridge_url, ODBCBridgeHelper::MAIN_METHOD, [request](std::ostream & os) { os << "query=" << request; }, timeouts); + BlockInputStreamPtr reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, buf, invalidate_sample_block, global_context, max_block_size); + ODBCBridgeBlockInputStream stream( + bridge_url, + [request](std::ostream & os) { os << "query=" << request; }, + invalidate_sample_block, + global_context, + max_block_size, + timeouts); + return readInvalidateQuery(stream); +} + +BlockInputStreamPtr ODBCDictionarySource::loadBase(const std::string & query) const +{ + odbc_bridge_helper.startODBCBridgeSync(); + return std::make_shared(bridge_url, + [query](std::ostream & os) { os << "query=" << query; }, + sample_block, + global_context, + max_block_size, + timeouts); } } diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.h b/dbms/src/Dictionaries/ODBCDictionarySource.h index 419a02f0ace..7d7a0ca51e0 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.h +++ b/dbms/src/Dictionaries/ODBCDictionarySource.h @@ -1,11 +1,16 @@ #pragma once #include +#include + +#include #include #include #include +#include + namespace Poco { @@ -58,6 +63,8 @@ private: // execute invalidate_query. expects single cell in result std::string doInvalidateQuery(const std::string & request) const; + BlockInputStreamPtr loadBase(const std::string & query) const; + Poco::Logger * log; std::chrono::time_point update_time; @@ -67,11 +74,16 @@ private: const std::string where; const std::string update_field; Block sample_block; - std::shared_ptr pool = nullptr; ExternalQueryBuilder query_builder; const std::string load_all_query; std::string invalidate_query; mutable std::string invalidate_query_response; + + ODBCBridgeHelper odbc_bridge_helper; + Poco::URI bridge_url; + ConnectionTimeouts timeouts; + const Context & global_context; + }; diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 594ae19abe5..8e46f743c08 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -33,27 +33,24 @@ StorageODBC::StorageODBC(const std::string & table_name_, const std::string & remote_table_name_, const ColumnsDescription & columns_, const Context & context_) - : IStorageURLBase(Poco::URI(), context_, table_name_, "RowBinary", columns_) - , connection_string(connection_string) + : IStorageURLBase(Poco::URI(), context_, table_name_, ODBCBridgeHelper::DEFAULT_FORMAT, columns_) + , odbc_bridge_helper(context_, connection_string) , remote_database_name(remote_database_name_) , remote_table_name(remote_table_name_) , log(&Poco::Logger::get("StorageODBC")) { - const auto & config = context_.getConfigRef(); - size_t bridge_port = config.getUInt("odbc_bridge.port", 9018); - std::string bridge_host = config.getString("odbc_bridge.host", "localhost"); + const auto & config = context_global.getConfigRef(); + size_t bridge_port = config.getUInt("odbc_bridge.port", ODBCBridgeHelper::DEFAULT_PORT); + std::string bridge_host = config.getString("odbc_bridge.host", ODBCBridgeHelper::DEFAULT_HOST); uri.setHost(bridge_host); uri.setPort(bridge_port); uri.setScheme("http"); - - ping_uri = uri; - ping_uri.setPath("/ping"); } std::string StorageODBC::getReadMethod() const { - return Poco::Net::HTTPRequest::HTTP_POST; + return ODBCBridgeHelper::MAIN_METHOD; } std::vector> StorageODBC::getReadURIParams(const Names & column_names, @@ -68,13 +65,7 @@ std::vector> StorageODBC::getReadURIParams(c auto column_data = getColumn(name); cols.emplace_back(column_data.name, column_data.type); } - std::vector> result; - - result.emplace_back("connection_string", connection_string); - result.emplace_back("columns", cols.toString()); - result.emplace_back("max_block_size", std::to_string(max_block_size)); - - return result; + return odbc_bridge_helper.getURLParams(cols, max_block_size); } std::function StorageODBC::getReadPOSTDataCallback(const Names & /*column_names*/, @@ -89,49 +80,6 @@ std::function StorageODBC::getReadPOSTDataCallback(const N return [query](std::ostream & os) { os << "query=" << query; }; } -bool StorageODBC::checkODBCBridgeIsRunning() const -{ - try - { - ReadWriteBufferFromHTTP buf(ping_uri, Poco::Net::HTTPRequest::HTTP_GET, nullptr); - return checkString("Ok.", buf); - } - catch (...) - { - return false; - } -} - -void StorageODBC::startODBCBridge() const -{ - const auto & config = context_global.getConfigRef(); - const auto & settings = context_global.getSettingsRef(); - Poco::Path path{config.getString("application.dir", "")}; - path.setFileName("clickhouse-odbc-bridge"); - - if (!path.isFile()) - throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND); - - std::stringstream command; - command << path.toString() << ' '; - command << "--http-port " << config.getUInt("odbc_bridge.port", 9018) << ' '; - command << "--listen-host " << config.getString("odbc_bridge.listen_host", "localhost") << ' '; - command << "--http-timeout " << settings.http_receive_timeout.value.totalSeconds() << ' '; - if (config.has("logger.odbc_bridge_log")) - command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' '; - if (config.has("logger.odbc_bridge_errlog")) - command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' '; - if (config.has("logger.odbc_bridge_level")) - command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' '; - command << "&"; // we don't want to wait this process - - auto command_str = command.str(); - LOG_TRACE(log, "Starting clickhouse-odbc-bridge with command: " << command_str); - - auto cmd = ShellCommand::execute(command_str); - cmd->wait(); -} - BlockInputStreams StorageODBC::read(const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -139,25 +87,8 @@ BlockInputStreams StorageODBC::read(const Names & column_names, size_t max_block_size, unsigned num_streams) { - if (!checkODBCBridgeIsRunning()) - { - LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it"); - startODBCBridge(); - bool started = false; - for (size_t counter : ext::range(1, 20)) - { - LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter); - if (checkODBCBridgeIsRunning()) - { - started = true; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - if (!started) - throw Exception("StorageODBC: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING); - } + odbc_bridge_helper.startODBCBridgeSync(); return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams); } diff --git a/dbms/src/Storages/StorageODBC.h b/dbms/src/Storages/StorageODBC.h index 4108887aa9b..ae33dedf827 100644 --- a/dbms/src/Storages/StorageODBC.h +++ b/dbms/src/Storages/StorageODBC.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -35,10 +36,9 @@ protected: const Context & context_); private: - std::string connection_string; + ODBCBridgeHelper odbc_bridge_helper; std::string remote_database_name; std::string remote_table_name; - Poco::URI ping_uri; Poco::Logger * log; @@ -55,8 +55,5 @@ private: const Context & context, QueryProcessingStage::Enum & processed_stage, size_t max_block_size) const override; - - bool checkODBCBridgeIsRunning() const; - void startODBCBridge() const; }; } From af19d4133d2a4f0d201807b0b220de8346f97987 Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 14 Aug 2018 13:33:41 +0300 Subject: [PATCH 16/16] CLICKHOUSE-3878: Move connection string validation to common, remove redundant headers, fix compilation issues --- dbms/programs/odbc-bridge/HandlerFactory.cpp | 1 - dbms/programs/odbc-bridge/ODBCBridge.h | 2 +- dbms/src/Common/ODBCBridgeHelper.cpp | 2 +- dbms/src/Common/tests/CMakeLists.txt | 3 +++ .../tests/validate-odbc-connection-string.cpp | 2 +- .../validate-odbc-connection-string.reference | 0 .../tests/validate-odbc-connection-string.sh | 0 .../validateODBCConnectionString.cpp | 2 +- .../validateODBCConnectionString.h | 0 dbms/src/Dictionaries/CMakeLists.txt | 3 --- .../src/Dictionaries/ODBCDictionarySource.cpp | 19 +++++++++---------- dbms/src/Storages/StorageODBC.cpp | 1 - dbms/src/TableFunctions/TableFunctionODBC.cpp | 3 +-- 13 files changed, 17 insertions(+), 21 deletions(-) rename dbms/src/{Dictionaries => Common}/tests/validate-odbc-connection-string.cpp (88%) rename dbms/src/{Dictionaries => Common}/tests/validate-odbc-connection-string.reference (100%) rename dbms/src/{Dictionaries => Common}/tests/validate-odbc-connection-string.sh (100%) rename dbms/src/{Dictionaries => Common}/validateODBCConnectionString.cpp (99%) rename dbms/src/{Dictionaries => Common}/validateODBCConnectionString.h (100%) diff --git a/dbms/programs/odbc-bridge/HandlerFactory.cpp b/dbms/programs/odbc-bridge/HandlerFactory.cpp index a5e9f4b014c..9a3824cef79 100644 --- a/dbms/programs/odbc-bridge/HandlerFactory.cpp +++ b/dbms/programs/odbc-bridge/HandlerFactory.cpp @@ -1,7 +1,6 @@ #include "HandlerFactory.h" #include -#include #include #include #include diff --git a/dbms/programs/odbc-bridge/ODBCBridge.h b/dbms/programs/odbc-bridge/ODBCBridge.h index 730dcda07d6..4ae11ad7301 100644 --- a/dbms/programs/odbc-bridge/ODBCBridge.h +++ b/dbms/programs/odbc-bridge/ODBCBridge.h @@ -28,7 +28,7 @@ private: bool is_help; std::string hostname; - UInt16 port; + size_t port; size_t http_timeout; std::string log_level; size_t max_server_connections; diff --git a/dbms/src/Common/ODBCBridgeHelper.cpp b/dbms/src/Common/ODBCBridgeHelper.cpp index 92405a9270f..87c7e0a3b30 100644 --- a/dbms/src/Common/ODBCBridgeHelper.cpp +++ b/dbms/src/Common/ODBCBridgeHelper.cpp @@ -1,7 +1,7 @@ #include #include -#include +#include #include #include #include diff --git a/dbms/src/Common/tests/CMakeLists.txt b/dbms/src/Common/tests/CMakeLists.txt index f4d01e85bd2..1c4330b97ce 100644 --- a/dbms/src/Common/tests/CMakeLists.txt +++ b/dbms/src/Common/tests/CMakeLists.txt @@ -71,3 +71,6 @@ target_link_libraries (cow_columns clickhouse_common_io) add_executable (stopwatch stopwatch.cpp) target_link_libraries (stopwatch clickhouse_common_io) + +add_executable (validate-odbc-connection-string validate-odbc-connection-string.cpp) +target_link_libraries (validate-odbc-connection-string dbms) diff --git a/dbms/src/Dictionaries/tests/validate-odbc-connection-string.cpp b/dbms/src/Common/tests/validate-odbc-connection-string.cpp similarity index 88% rename from dbms/src/Dictionaries/tests/validate-odbc-connection-string.cpp rename to dbms/src/Common/tests/validate-odbc-connection-string.cpp index 766a709d8fd..f668cd07fcd 100644 --- a/dbms/src/Dictionaries/tests/validate-odbc-connection-string.cpp +++ b/dbms/src/Common/tests/validate-odbc-connection-string.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include using namespace DB; diff --git a/dbms/src/Dictionaries/tests/validate-odbc-connection-string.reference b/dbms/src/Common/tests/validate-odbc-connection-string.reference similarity index 100% rename from dbms/src/Dictionaries/tests/validate-odbc-connection-string.reference rename to dbms/src/Common/tests/validate-odbc-connection-string.reference diff --git a/dbms/src/Dictionaries/tests/validate-odbc-connection-string.sh b/dbms/src/Common/tests/validate-odbc-connection-string.sh similarity index 100% rename from dbms/src/Dictionaries/tests/validate-odbc-connection-string.sh rename to dbms/src/Common/tests/validate-odbc-connection-string.sh diff --git a/dbms/src/Dictionaries/validateODBCConnectionString.cpp b/dbms/src/Common/validateODBCConnectionString.cpp similarity index 99% rename from dbms/src/Dictionaries/validateODBCConnectionString.cpp rename to dbms/src/Common/validateODBCConnectionString.cpp index b37a15c3a70..313de4885af 100644 --- a/dbms/src/Dictionaries/validateODBCConnectionString.cpp +++ b/dbms/src/Common/validateODBCConnectionString.cpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include namespace DB diff --git a/dbms/src/Dictionaries/validateODBCConnectionString.h b/dbms/src/Common/validateODBCConnectionString.h similarity index 100% rename from dbms/src/Dictionaries/validateODBCConnectionString.h rename to dbms/src/Common/validateODBCConnectionString.h diff --git a/dbms/src/Dictionaries/CMakeLists.txt b/dbms/src/Dictionaries/CMakeLists.txt index 65172356645..e69de29bb2d 100644 --- a/dbms/src/Dictionaries/CMakeLists.txt +++ b/dbms/src/Dictionaries/CMakeLists.txt @@ -1,3 +0,0 @@ -if (ENABLE_TESTS) - add_subdirectory (tests) -endif () diff --git a/dbms/src/Dictionaries/ODBCDictionarySource.cpp b/dbms/src/Dictionaries/ODBCDictionarySource.cpp index 7dde01bb080..b4e88ff60e1 100644 --- a/dbms/src/Dictionaries/ODBCDictionarySource.cpp +++ b/dbms/src/Dictionaries/ODBCDictionarySource.cpp @@ -1,12 +1,11 @@ +#include #include #include #include #include #include #include -#include -#include -#include +#include #include #include #include @@ -33,11 +32,6 @@ namespace reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size); } - Block readImpl() override - { - return reader->read(); - } - Block getHeader() const override { return reader->getHeader(); @@ -49,6 +43,11 @@ namespace } private: + Block readImpl() override + { + return reader->read(); + } + std::unique_ptr read_buf; BlockInputStreamPtr reader; }; @@ -193,8 +192,7 @@ std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) ColumnPtr column(ColumnString::create()); invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared(), "Sample Block")); odbc_bridge_helper.startODBCBridgeSync(); - ReadWriteBufferFromHTTP buf(bridge_url, ODBCBridgeHelper::MAIN_METHOD, [request](std::ostream & os) { os << "query=" << request; }, timeouts); - BlockInputStreamPtr reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, buf, invalidate_sample_block, global_context, max_block_size); + ODBCBridgeBlockInputStream stream( bridge_url, [request](std::ostream & os) { os << "query=" << request; }, @@ -202,6 +200,7 @@ std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request) global_context, max_block_size, timeouts); + return readInvalidateQuery(stream); } diff --git a/dbms/src/Storages/StorageODBC.cpp b/dbms/src/Storages/StorageODBC.cpp index 8e46f743c08..657456e3f6f 100644 --- a/dbms/src/Storages/StorageODBC.cpp +++ b/dbms/src/Storages/StorageODBC.cpp @@ -1,5 +1,4 @@ #include -#include #include #include #include diff --git a/dbms/src/TableFunctions/TableFunctionODBC.cpp b/dbms/src/TableFunctions/TableFunctionODBC.cpp index 0748fa4a0cd..94c2a1f3a25 100644 --- a/dbms/src/TableFunctions/TableFunctionODBC.cpp +++ b/dbms/src/TableFunctions/TableFunctionODBC.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include #include #include @@ -72,7 +71,7 @@ StoragePtr TableFunctionODBC::executeImpl(const ASTPtr & ast_function, const Con for (int i = 0; i < 2; ++i) args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context); - std::string connection_string = validateODBCConnectionString(static_cast(*args[0]).value.safeGet()); + std::string connection_string = static_cast(*args[0]).value.safeGet(); std::string table_name = static_cast(*args[1]).value.safeGet(); Poco::Data::ODBC::SessionImpl session(connection_string, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);