mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Common base for bridges part 3
This commit is contained in:
parent
1c4d4c8e54
commit
9e9bf2bb75
@ -1,236 +1,5 @@
|
|||||||
#include "LibraryBridge.h"
|
#include "LibraryBridge.h"
|
||||||
|
|
||||||
#include "HandlerFactory.h"
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <IO/ReadHelpers.h>
|
|
||||||
#include <boost/program_options.hpp>
|
|
||||||
|
|
||||||
#include <Poco/Net/NetException.h>
|
|
||||||
#include <Poco/String.h>
|
|
||||||
#include <Poco/Util/HelpFormatter.h>
|
|
||||||
#include <Common/Exception.h>
|
|
||||||
#include <Common/StringUtils/StringUtils.h>
|
|
||||||
#include <Common/config.h>
|
|
||||||
#include <Formats/registerFormats.h>
|
|
||||||
#include <common/logger_useful.h>
|
|
||||||
#include <ext/scope_guard.h>
|
|
||||||
#include <ext/range.h>
|
|
||||||
#include <Common/SensitiveDataMasker.h>
|
|
||||||
#include <Server/HTTP/HTTPServer.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
|
|
||||||
{
|
|
||||||
Poco::Net::SocketAddress socket_address;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
socket_address = Poco::Net::SocketAddress(host, port);
|
|
||||||
}
|
|
||||||
catch (const Poco::Net::DNSException & e)
|
|
||||||
{
|
|
||||||
const auto code = e.code();
|
|
||||||
if (code == EAI_FAMILY
|
|
||||||
#if defined(EAI_ADDRFAMILY)
|
|
||||||
|| code == EAI_ADDRFAMILY
|
|
||||||
#endif
|
|
||||||
)
|
|
||||||
{
|
|
||||||
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
|
|
||||||
}
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
return socket_address;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
|
|
||||||
{
|
|
||||||
auto address = makeSocketAddress(host, port, log);
|
|
||||||
#if POCO_VERSION < 0x01080000
|
|
||||||
socket.bind(address, /* reuseAddress = */ true);
|
|
||||||
#else
|
|
||||||
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
socket.listen(/* backlog = */ 64);
|
|
||||||
|
|
||||||
return address;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void LibraryBridge::handleHelp(const std::string &, const std::string &)
|
|
||||||
{
|
|
||||||
Poco::Util::HelpFormatter help_formatter(options());
|
|
||||||
help_formatter.setCommand(commandName());
|
|
||||||
help_formatter.setHeader("HTTP-proxy for library dictionary requests");
|
|
||||||
help_formatter.setUsage("--http-port <port>");
|
|
||||||
help_formatter.format(std::cerr);
|
|
||||||
|
|
||||||
stopOptionsProcessing();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void LibraryBridge::defineOptions(Poco::Util::OptionSet & options)
|
|
||||||
{
|
|
||||||
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
|
|
||||||
.argument("max-server-connections")
|
|
||||||
.binding("max-server-connections"));
|
|
||||||
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
|
|
||||||
.argument("keep-alive-timeout")
|
|
||||||
.binding("keep-alive-timeout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
|
|
||||||
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
|
|
||||||
.argument("err-log-path")
|
|
||||||
.binding("logger.errorlog"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("stdout-path", "", "stdout log path, default console")
|
|
||||||
.argument("stdout-path")
|
|
||||||
.binding("logger.stdout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("stderr-path", "", "stderr log path, default console")
|
|
||||||
.argument("stderr-path")
|
|
||||||
.binding("logger.stderr"));
|
|
||||||
|
|
||||||
using Me = std::decay_t<decltype(*this)>;
|
|
||||||
options.addOption(Poco::Util::Option("help", "", "produce this help message")
|
|
||||||
.binding("help")
|
|
||||||
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
|
|
||||||
|
|
||||||
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void LibraryBridge::initialize(Application & self)
|
|
||||||
{
|
|
||||||
BaseDaemon::closeFDs();
|
|
||||||
|
|
||||||
config().setString("logger", "LibraryBridge");
|
|
||||||
|
|
||||||
/// Redirect stdout, stderr to specified files.
|
|
||||||
/// Some libraries and sanitizers write to stderr in case of errors.
|
|
||||||
const auto stdout_path = config().getString("logger.stdout", "");
|
|
||||||
if (!stdout_path.empty())
|
|
||||||
{
|
|
||||||
if (!freopen(stdout_path.c_str(), "a+", stdout))
|
|
||||||
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
|
|
||||||
|
|
||||||
/// Disable buffering for stdout.
|
|
||||||
setbuf(stdout, nullptr);
|
|
||||||
}
|
|
||||||
const auto stderr_path = config().getString("logger.stderr", "");
|
|
||||||
if (!stderr_path.empty())
|
|
||||||
{
|
|
||||||
if (!freopen(stderr_path.c_str(), "a+", stderr))
|
|
||||||
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
|
|
||||||
|
|
||||||
/// Disable buffering for stderr.
|
|
||||||
setbuf(stderr, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
buildLoggers(config(), logger(), self.commandName());
|
|
||||||
|
|
||||||
BaseDaemon::logRevision();
|
|
||||||
|
|
||||||
log = &logger();
|
|
||||||
hostname = config().getString("listen-host", "127.0.0.1");
|
|
||||||
port = config().getUInt("http-port");
|
|
||||||
if (port > 0xFFFF)
|
|
||||||
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
|
|
||||||
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
|
|
||||||
max_server_connections = config().getUInt("max-server-connections", 1024);
|
|
||||||
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
|
|
||||||
|
|
||||||
initializeTerminationAndSignalProcessing();
|
|
||||||
|
|
||||||
ServerApplication::initialize(self); // NOLINT
|
|
||||||
}
|
|
||||||
|
|
||||||
void LibraryBridge::uninitialize()
|
|
||||||
{
|
|
||||||
BaseDaemon::uninitialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
int LibraryBridge::main(const std::vector<std::string> & /*args*/)
|
|
||||||
{
|
|
||||||
registerFormats();
|
|
||||||
|
|
||||||
Poco::Net::ServerSocket socket;
|
|
||||||
auto address = socketBindListen(socket, hostname, port, log);
|
|
||||||
|
|
||||||
socket.setReceiveTimeout(http_timeout);
|
|
||||||
socket.setSendTimeout(http_timeout);
|
|
||||||
|
|
||||||
Poco::ThreadPool server_pool(3, max_server_connections);
|
|
||||||
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
|
||||||
|
|
||||||
http_params->setTimeout(http_timeout);
|
|
||||||
http_params->setKeepAliveTimeout(keep_alive_timeout);
|
|
||||||
|
|
||||||
auto shared_context = Context::createShared();
|
|
||||||
Context context(Context::createGlobal(shared_context.get()));
|
|
||||||
context.makeGlobalContext();
|
|
||||||
|
|
||||||
if (config().has("query_masking_rules"))
|
|
||||||
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
|
|
||||||
|
|
||||||
auto server = HTTPServer(
|
|
||||||
context,
|
|
||||||
std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context),
|
|
||||||
server_pool,
|
|
||||||
socket,
|
|
||||||
http_params);
|
|
||||||
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Listening http://{}", address.toString());
|
|
||||||
|
|
||||||
SCOPE_EXIT({
|
|
||||||
LOG_DEBUG(log, "Received termination signal.");
|
|
||||||
LOG_DEBUG(log, "Waiting for current connections to close.");
|
|
||||||
|
|
||||||
server.stop();
|
|
||||||
|
|
||||||
for (size_t count : ext::range(1, 6))
|
|
||||||
{
|
|
||||||
if (server.currentConnections() == 0)
|
|
||||||
break;
|
|
||||||
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
waitForTerminationRequest();
|
|
||||||
return Application::EXIT_OK;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
||||||
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
|
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
|
||||||
{
|
{
|
||||||
|
@ -1,39 +1,26 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <daemon/BaseDaemon.h>
|
#include <Common/Bridge/IBridge.h>
|
||||||
#include <common/logger_useful.h>
|
#include "HandlerFactory.h"
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
class LibraryBridge : public BaseDaemon
|
class LibraryBridge : public IBridge
|
||||||
{
|
{
|
||||||
|
|
||||||
public:
|
|
||||||
void defineOptions(Poco::Util::OptionSet & options) override;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void initialize(Application & self) override;
|
const std::string bridgeName() const override
|
||||||
|
{
|
||||||
|
return "LibraryBridge";
|
||||||
|
}
|
||||||
|
|
||||||
void uninitialize() override;
|
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
|
||||||
|
{
|
||||||
int main(const std::vector<std::string> & args) override;
|
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||||
|
}
|
||||||
private:
|
|
||||||
void handleHelp(const std::string &, const std::string &);
|
|
||||||
|
|
||||||
std::string hostname;
|
|
||||||
size_t port;
|
|
||||||
|
|
||||||
size_t http_timeout;
|
|
||||||
std::string log_level;
|
|
||||||
size_t max_server_connections;
|
|
||||||
size_t keep_alive_timeout;
|
|
||||||
|
|
||||||
Poco::Logger * log;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
std::unique_ptr<HTTPRequestHandler> HandlerFactory::createRequestHandler(const HTTPServerRequest & request)
|
std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
|
||||||
{
|
{
|
||||||
Poco::URI uri{request.getURI()};
|
Poco::URI uri{request.getURI()};
|
||||||
LOG_TRACE(log, "Request URI: {}", uri.toString());
|
LOG_TRACE(log, "Request URI: {}", uri.toString());
|
||||||
|
@ -20,10 +20,10 @@ namespace DB
|
|||||||
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
|
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
|
||||||
* Also stores Session pools for ODBC connections
|
* Also stores Session pools for ODBC connections
|
||||||
*/
|
*/
|
||||||
class HandlerFactory : public HTTPRequestHandlerFactory
|
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
|
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
|
||||||
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
|
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
|
||||||
{
|
{
|
||||||
pool_map = std::make_shared<ODBCHandler::PoolMap>();
|
pool_map = std::make_shared<ODBCHandler::PoolMap>();
|
||||||
@ -38,4 +38,5 @@ private:
|
|||||||
Context & context;
|
Context & context;
|
||||||
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
|
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,244 +1,4 @@
|
|||||||
#include "ODBCBridge.h"
|
#include "ODBCBridge.h"
|
||||||
#include "HandlerFactory.h"
|
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <IO/ReadHelpers.h>
|
|
||||||
#include <boost/program_options.hpp>
|
|
||||||
|
|
||||||
#if USE_ODBC
|
|
||||||
// It doesn't make much sense to build this bridge without ODBC, but we still do this.
|
|
||||||
# include <Poco/Data/ODBC/Connector.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include <Poco/Net/NetException.h>
|
|
||||||
#include <Poco/String.h>
|
|
||||||
#include <Poco/Util/HelpFormatter.h>
|
|
||||||
#include <Common/Exception.h>
|
|
||||||
#include <Common/StringUtils/StringUtils.h>
|
|
||||||
#include <Common/config.h>
|
|
||||||
#include <Formats/registerFormats.h>
|
|
||||||
#include <common/logger_useful.h>
|
|
||||||
#include <ext/scope_guard.h>
|
|
||||||
#include <ext/range.h>
|
|
||||||
#include <Common/SensitiveDataMasker.h>
|
|
||||||
#include <Server/HTTP/HTTPServer.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
|
|
||||||
{
|
|
||||||
Poco::Net::SocketAddress socket_address;
|
|
||||||
try
|
|
||||||
{
|
|
||||||
socket_address = Poco::Net::SocketAddress(host, port);
|
|
||||||
}
|
|
||||||
catch (const Poco::Net::DNSException & e)
|
|
||||||
{
|
|
||||||
const auto code = e.code();
|
|
||||||
if (code == EAI_FAMILY
|
|
||||||
#if defined(EAI_ADDRFAMILY)
|
|
||||||
|| code == EAI_ADDRFAMILY
|
|
||||||
#endif
|
|
||||||
)
|
|
||||||
{
|
|
||||||
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
|
|
||||||
}
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
return socket_address;
|
|
||||||
}
|
|
||||||
|
|
||||||
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
|
|
||||||
{
|
|
||||||
auto address = makeSocketAddress(host, port, log);
|
|
||||||
#if POCO_VERSION < 0x01080000
|
|
||||||
socket.bind(address, /* reuseAddress = */ true);
|
|
||||||
#else
|
|
||||||
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
socket.listen(/* backlog = */ 64);
|
|
||||||
|
|
||||||
return address;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void ODBCBridge::handleHelp(const std::string &, const std::string &)
|
|
||||||
{
|
|
||||||
Poco::Util::HelpFormatter help_formatter(options());
|
|
||||||
help_formatter.setCommand(commandName());
|
|
||||||
help_formatter.setHeader("HTTP-proxy for odbc requests");
|
|
||||||
help_formatter.setUsage("--http-port <port>");
|
|
||||||
help_formatter.format(std::cerr);
|
|
||||||
|
|
||||||
stopOptionsProcessing();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void ODBCBridge::defineOptions(Poco::Util::OptionSet & options)
|
|
||||||
{
|
|
||||||
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
|
|
||||||
.argument("max-server-connections")
|
|
||||||
.binding("max-server-connections"));
|
|
||||||
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
|
|
||||||
.argument("keep-alive-timeout")
|
|
||||||
.binding("keep-alive-timeout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
|
|
||||||
|
|
||||||
options.addOption(
|
|
||||||
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
|
|
||||||
.argument("err-log-path")
|
|
||||||
.binding("logger.errorlog"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("stdout-path", "", "stdout log path, default console")
|
|
||||||
.argument("stdout-path")
|
|
||||||
.binding("logger.stdout"));
|
|
||||||
|
|
||||||
options.addOption(Poco::Util::Option("stderr-path", "", "stderr log path, default console")
|
|
||||||
.argument("stderr-path")
|
|
||||||
.binding("logger.stderr"));
|
|
||||||
|
|
||||||
using Me = std::decay_t<decltype(*this)>;
|
|
||||||
options.addOption(Poco::Util::Option("help", "", "produce this help message")
|
|
||||||
.binding("help")
|
|
||||||
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
|
|
||||||
|
|
||||||
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
|
|
||||||
}
|
|
||||||
|
|
||||||
void ODBCBridge::initialize(Application & self)
|
|
||||||
{
|
|
||||||
BaseDaemon::closeFDs();
|
|
||||||
is_help = config().has("help");
|
|
||||||
|
|
||||||
if (is_help)
|
|
||||||
return;
|
|
||||||
|
|
||||||
config().setString("logger", "ODBCBridge");
|
|
||||||
|
|
||||||
/// Redirect stdout, stderr to specified files.
|
|
||||||
/// Some libraries and sanitizers write to stderr in case of errors.
|
|
||||||
const auto stdout_path = config().getString("logger.stdout", "");
|
|
||||||
if (!stdout_path.empty())
|
|
||||||
{
|
|
||||||
if (!freopen(stdout_path.c_str(), "a+", stdout))
|
|
||||||
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
|
|
||||||
|
|
||||||
/// Disable buffering for stdout.
|
|
||||||
setbuf(stdout, nullptr);
|
|
||||||
}
|
|
||||||
const auto stderr_path = config().getString("logger.stderr", "");
|
|
||||||
if (!stderr_path.empty())
|
|
||||||
{
|
|
||||||
if (!freopen(stderr_path.c_str(), "a+", stderr))
|
|
||||||
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
|
|
||||||
|
|
||||||
/// Disable buffering for stderr.
|
|
||||||
setbuf(stderr, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
buildLoggers(config(), logger(), self.commandName());
|
|
||||||
|
|
||||||
BaseDaemon::logRevision();
|
|
||||||
|
|
||||||
log = &logger();
|
|
||||||
hostname = config().getString("listen-host", "127.0.0.1");
|
|
||||||
port = config().getUInt("http-port");
|
|
||||||
if (port > 0xFFFF)
|
|
||||||
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
||||||
|
|
||||||
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
|
|
||||||
max_server_connections = config().getUInt("max-server-connections", 1024);
|
|
||||||
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
|
|
||||||
|
|
||||||
initializeTerminationAndSignalProcessing();
|
|
||||||
|
|
||||||
#if USE_ODBC
|
|
||||||
// It doesn't make much sense to build this bridge without ODBC, but we
|
|
||||||
// still do this.
|
|
||||||
Poco::Data::ODBC::Connector::registerConnector();
|
|
||||||
#endif
|
|
||||||
|
|
||||||
ServerApplication::initialize(self); // NOLINT
|
|
||||||
}
|
|
||||||
|
|
||||||
void ODBCBridge::uninitialize()
|
|
||||||
{
|
|
||||||
BaseDaemon::uninitialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
int ODBCBridge::main(const std::vector<std::string> & /*args*/)
|
|
||||||
{
|
|
||||||
if (is_help)
|
|
||||||
return Application::EXIT_OK;
|
|
||||||
|
|
||||||
registerFormats();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Starting up");
|
|
||||||
Poco::Net::ServerSocket socket;
|
|
||||||
auto address = socketBindListen(socket, hostname, port, log);
|
|
||||||
socket.setReceiveTimeout(http_timeout);
|
|
||||||
socket.setSendTimeout(http_timeout);
|
|
||||||
Poco::ThreadPool server_pool(3, max_server_connections);
|
|
||||||
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
|
||||||
http_params->setTimeout(http_timeout);
|
|
||||||
http_params->setKeepAliveTimeout(keep_alive_timeout);
|
|
||||||
|
|
||||||
auto shared_context = Context::createShared();
|
|
||||||
Context context(Context::createGlobal(shared_context.get()));
|
|
||||||
context.makeGlobalContext();
|
|
||||||
|
|
||||||
if (config().has("query_masking_rules"))
|
|
||||||
{
|
|
||||||
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
|
|
||||||
}
|
|
||||||
|
|
||||||
auto server = HTTPServer(
|
|
||||||
context,
|
|
||||||
std::make_shared<HandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context),
|
|
||||||
server_pool,
|
|
||||||
socket,
|
|
||||||
http_params);
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
LOG_INFO(log, "Listening http://{}", address.toString());
|
|
||||||
|
|
||||||
SCOPE_EXIT({
|
|
||||||
LOG_DEBUG(log, "Received termination signal.");
|
|
||||||
LOG_DEBUG(log, "Waiting for current connections to close.");
|
|
||||||
server.stop();
|
|
||||||
for (size_t count : ext::range(1, 6))
|
|
||||||
{
|
|
||||||
if (server.currentConnections() == 0)
|
|
||||||
break;
|
|
||||||
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
waitForTerminationRequest();
|
|
||||||
return Application::EXIT_OK;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
||||||
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
|
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
|
||||||
|
@ -2,38 +2,39 @@
|
|||||||
|
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
#include <Poco/Logger.h>
|
#include <Poco/Logger.h>
|
||||||
#include <daemon/BaseDaemon.h>
|
#include <Common/Bridge/IBridge.h>
|
||||||
|
#include "HandlerFactory.h"
|
||||||
|
|
||||||
|
#if USE_ODBC
|
||||||
|
# include <Poco/Data/ODBC/Connector.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
/** Class represents clickhouse-odbc-bridge server, which listen
|
|
||||||
* incoming HTTP POST and GET requests on specified port and host.
|
class ODBCBridge : public IBridge
|
||||||
* Has two handlers '/' for all incoming POST requests to ODBC driver
|
|
||||||
* and /ping for GET request about service status
|
|
||||||
*/
|
|
||||||
class ODBCBridge : public BaseDaemon
|
|
||||||
{
|
{
|
||||||
public:
|
|
||||||
void defineOptions(Poco::Util::OptionSet & options) override;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void initialize(Application & self) override;
|
const std::string bridgeName() const override
|
||||||
|
{
|
||||||
|
return "ODBCBridge";
|
||||||
|
}
|
||||||
|
|
||||||
void uninitialize() override;
|
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
|
||||||
|
{
|
||||||
|
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||||
|
}
|
||||||
|
|
||||||
int main(const std::vector<std::string> & args) override;
|
void registerODBCConnector() const override
|
||||||
|
{
|
||||||
|
#if USE_ODBC
|
||||||
|
// It doesn't make much sense to build this bridge without ODBC, but we
|
||||||
|
// still do this.
|
||||||
|
Poco::Data::ODBC::Connector::registerConnector();
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
|
||||||
void handleHelp(const std::string &, const std::string &);
|
|
||||||
|
|
||||||
bool is_help;
|
|
||||||
std::string hostname;
|
|
||||||
size_t port;
|
|
||||||
size_t http_timeout;
|
|
||||||
std::string log_level;
|
|
||||||
size_t max_server_connections;
|
|
||||||
size_t keep_alive_timeout;
|
|
||||||
|
|
||||||
Poco::Logger * log;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
240
src/Common/Bridge/IBridge.cpp
Normal file
240
src/Common/Bridge/IBridge.cpp
Normal file
@ -0,0 +1,240 @@
|
|||||||
|
#include "IBridge.h"
|
||||||
|
//#include "HandlerFactory.h"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <IO/ReadHelpers.h>
|
||||||
|
#include <boost/program_options.hpp>
|
||||||
|
|
||||||
|
#include <Poco/Net/NetException.h>
|
||||||
|
#include <Poco/String.h>
|
||||||
|
#include <Poco/Util/HelpFormatter.h>
|
||||||
|
#include <Common/Exception.h>
|
||||||
|
#include <Common/StringUtils/StringUtils.h>
|
||||||
|
#include <Common/config.h>
|
||||||
|
#include <Formats/registerFormats.h>
|
||||||
|
#include <common/logger_useful.h>
|
||||||
|
#include <ext/scope_guard.h>
|
||||||
|
#include <ext/range.h>
|
||||||
|
#include <Common/SensitiveDataMasker.h>
|
||||||
|
#include <Server/HTTP/HTTPServer.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
|
||||||
|
{
|
||||||
|
Poco::Net::SocketAddress socket_address;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
socket_address = Poco::Net::SocketAddress(host, port);
|
||||||
|
}
|
||||||
|
catch (const Poco::Net::DNSException & e)
|
||||||
|
{
|
||||||
|
const auto code = e.code();
|
||||||
|
if (code == EAI_FAMILY
|
||||||
|
#if defined(EAI_ADDRFAMILY)
|
||||||
|
|| code == EAI_ADDRFAMILY
|
||||||
|
#endif
|
||||||
|
)
|
||||||
|
{
|
||||||
|
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
|
||||||
|
}
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
return socket_address;
|
||||||
|
}
|
||||||
|
|
||||||
|
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
|
||||||
|
{
|
||||||
|
auto address = makeSocketAddress(host, port, log);
|
||||||
|
#if POCO_VERSION < 0x01080000
|
||||||
|
socket.bind(address, /* reuseAddress = */ true);
|
||||||
|
#else
|
||||||
|
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
socket.listen(/* backlog = */ 64);
|
||||||
|
|
||||||
|
return address;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void IBridge::handleHelp(const std::string &, const std::string &)
|
||||||
|
{
|
||||||
|
Poco::Util::HelpFormatter help_formatter(options());
|
||||||
|
help_formatter.setCommand(commandName());
|
||||||
|
help_formatter.setHeader("HTTP-proxy for odbc requests");
|
||||||
|
help_formatter.setUsage("--http-port <port>");
|
||||||
|
help_formatter.format(std::cerr);
|
||||||
|
|
||||||
|
stopOptionsProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void IBridge::defineOptions(Poco::Util::OptionSet & options)
|
||||||
|
{
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true) .binding("http-port"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024").argument("max-server-connections").binding("max-server-connections"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10").argument("keep-alive-timeout").binding("keep-alive-timeout"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("log-level", "", "sets log level, default info") .argument("log-level").binding("logger.level"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("err-log-path", "", "err log path for all logs, default no").argument("err-log-path").binding("logger.errorlog"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("stdout-path", "", "stdout log path, default console").argument("stdout-path").binding("logger.stdout"));
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("stderr-path", "", "stderr log path, default console").argument("stderr-path").binding("logger.stderr"));
|
||||||
|
|
||||||
|
using Me = std::decay_t<decltype(*this)>;
|
||||||
|
|
||||||
|
options.addOption(
|
||||||
|
Poco::Util::Option("help", "", "produce this help message").binding("help").callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
|
||||||
|
|
||||||
|
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void IBridge::initialize(Application & self)
|
||||||
|
{
|
||||||
|
BaseDaemon::closeFDs();
|
||||||
|
is_help = config().has("help");
|
||||||
|
|
||||||
|
if (is_help)
|
||||||
|
return;
|
||||||
|
|
||||||
|
config().setString("logger", bridgeName());
|
||||||
|
|
||||||
|
/// Redirect stdout, stderr to specified files.
|
||||||
|
/// Some libraries and sanitizers write to stderr in case of errors.
|
||||||
|
const auto stdout_path = config().getString("logger.stdout", "");
|
||||||
|
if (!stdout_path.empty())
|
||||||
|
{
|
||||||
|
if (!freopen(stdout_path.c_str(), "a+", stdout))
|
||||||
|
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
|
||||||
|
|
||||||
|
/// Disable buffering for stdout.
|
||||||
|
setbuf(stdout, nullptr);
|
||||||
|
}
|
||||||
|
const auto stderr_path = config().getString("logger.stderr", "");
|
||||||
|
if (!stderr_path.empty())
|
||||||
|
{
|
||||||
|
if (!freopen(stderr_path.c_str(), "a+", stderr))
|
||||||
|
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
|
||||||
|
|
||||||
|
/// Disable buffering for stderr.
|
||||||
|
setbuf(stderr, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
buildLoggers(config(), logger(), self.commandName());
|
||||||
|
|
||||||
|
BaseDaemon::logRevision();
|
||||||
|
|
||||||
|
log = &logger();
|
||||||
|
hostname = config().getString("listen-host", "127.0.0.1");
|
||||||
|
port = config().getUInt("http-port");
|
||||||
|
if (port > 0xFFFF)
|
||||||
|
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||||
|
|
||||||
|
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
|
||||||
|
max_server_connections = config().getUInt("max-server-connections", 1024);
|
||||||
|
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
|
||||||
|
|
||||||
|
initializeTerminationAndSignalProcessing();
|
||||||
|
|
||||||
|
/// Will do nothing in case it is not ODBCBridge.
|
||||||
|
registerODBCConnector();
|
||||||
|
|
||||||
|
ServerApplication::initialize(self); // NOLINT
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void IBridge::uninitialize()
|
||||||
|
{
|
||||||
|
BaseDaemon::uninitialize();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int IBridge::main(const std::vector<std::string> & /*args*/)
|
||||||
|
{
|
||||||
|
if (is_help)
|
||||||
|
return Application::EXIT_OK;
|
||||||
|
|
||||||
|
registerFormats();
|
||||||
|
LOG_INFO(log, "Starting up {} on host: {}, port: {}", bridgeName(), hostname, port);
|
||||||
|
|
||||||
|
Poco::Net::ServerSocket socket;
|
||||||
|
auto address = socketBindListen(socket, hostname, port, log);
|
||||||
|
socket.setReceiveTimeout(http_timeout);
|
||||||
|
socket.setSendTimeout(http_timeout);
|
||||||
|
|
||||||
|
Poco::ThreadPool server_pool(3, max_server_connections);
|
||||||
|
|
||||||
|
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
|
||||||
|
http_params->setTimeout(http_timeout);
|
||||||
|
http_params->setKeepAliveTimeout(keep_alive_timeout);
|
||||||
|
|
||||||
|
auto shared_context = Context::createShared();
|
||||||
|
Context context(Context::createGlobal(shared_context.get()));
|
||||||
|
context.makeGlobalContext();
|
||||||
|
|
||||||
|
if (config().has("query_masking_rules"))
|
||||||
|
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
|
||||||
|
|
||||||
|
auto server = HTTPServer(
|
||||||
|
context,
|
||||||
|
getHandlerFactoryPtr(context),
|
||||||
|
server_pool,
|
||||||
|
socket,
|
||||||
|
http_params);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
LOG_INFO(log, "Listening http://{}", address.toString());
|
||||||
|
|
||||||
|
SCOPE_EXIT({
|
||||||
|
LOG_DEBUG(log, "Received termination signal.");
|
||||||
|
LOG_DEBUG(log, "Waiting for current connections to close.");
|
||||||
|
server.stop();
|
||||||
|
for (size_t count : ext::range(1, 6))
|
||||||
|
{
|
||||||
|
if (server.currentConnections() == 0)
|
||||||
|
break;
|
||||||
|
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
waitForTerminationRequest();
|
||||||
|
return Application::EXIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
51
src/Common/Bridge/IBridge.h
Normal file
51
src/Common/Bridge/IBridge.h
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
|
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||||
|
#include <Poco/Util/ServerApplication.h>
|
||||||
|
#include <Poco/Logger.h>
|
||||||
|
#include <daemon/BaseDaemon.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Class represents base for clickhouse-odbc-bridge and clickhouse-library-bridge servers.
|
||||||
|
/// Listens to incoming HTTP POST and GET requests on specified port and host.
|
||||||
|
/// Has two handlers '/' for all incoming POST requests and /ping for GET request about service status.
|
||||||
|
class IBridge : public BaseDaemon
|
||||||
|
{
|
||||||
|
|
||||||
|
public:
|
||||||
|
void defineOptions(Poco::Util::OptionSet & options) override;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
using HandlerFactoryPtr = std::shared_ptr<HTTPRequestHandlerFactory>;
|
||||||
|
|
||||||
|
void initialize(Application & self) override;
|
||||||
|
|
||||||
|
void uninitialize() override;
|
||||||
|
|
||||||
|
int main(const std::vector<std::string> & args) override;
|
||||||
|
|
||||||
|
virtual const std::string bridgeName() const = 0;
|
||||||
|
|
||||||
|
virtual HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const = 0;
|
||||||
|
|
||||||
|
virtual void registerODBCConnector() const {}
|
||||||
|
|
||||||
|
size_t keep_alive_timeout;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void handleHelp(const std::string &, const std::string &);
|
||||||
|
|
||||||
|
bool is_help;
|
||||||
|
std::string hostname;
|
||||||
|
size_t port;
|
||||||
|
std::string log_level;
|
||||||
|
size_t max_server_connections;
|
||||||
|
size_t http_timeout;
|
||||||
|
|
||||||
|
Poco::Logger * log;
|
||||||
|
};
|
||||||
|
}
|
@ -14,7 +14,7 @@ class LibraryBridgeHelper : public IBridgeHelper
|
|||||||
{
|
{
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static constexpr inline size_t DEFAULT_PORT = 9018;
|
static constexpr inline size_t DEFAULT_PORT = 9012;
|
||||||
|
|
||||||
LibraryBridgeHelper(const Context & context_, const std::string & dictionary_id_);
|
LibraryBridgeHelper(const Context & context_, const std::string & dictionary_id_);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user