Merge pull request #2828 from alesapin/master

CLICKHOUSE-3878: ODBC-Bridge tool implementation
This commit is contained in:
alexey-milovidov 2018-08-14 17:25:43 +03:00 committed by GitHub
commit c5b90717c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1093 additions and 128 deletions

View File

@ -13,6 +13,7 @@ option (ENABLE_CLICKHOUSE_COMPRESSOR "Enable clickhouse-compressor" ${ENABLE_CLI
option (ENABLE_CLICKHOUSE_COPIER "Enable clickhouse-copier" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL})
configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h)
@ -27,10 +28,11 @@ add_subdirectory (copier)
add_subdirectory (format)
add_subdirectory (clang)
add_subdirectory (obfuscator)
add_subdirectory (odbc-bridge)
if (CLICKHOUSE_SPLIT_BINARY)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test
clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-copier)
clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-copier clickhouse-odbc-bridge)
if (USE_EMBEDDED_COMPILER)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-clang clickhouse-lld)
@ -83,6 +85,9 @@ else ()
if (USE_EMBEDDED_COMPILER)
target_link_libraries (clickhouse clickhouse-compiler-lib)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
target_link_libraries (clickhouse clickhouse-odbc-bridge-lib)
endif()
set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SERVER)
@ -135,6 +140,12 @@ else ()
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-obfuscator DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-obfuscator)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_custom_target (clickhouse-odbc-bridge ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-odbc-bridge DEPENDS clickhouse)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-odbc-bridge DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge)
endif ()
# install always because depian package want this files:
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)

View File

@ -56,6 +56,10 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_OBFUSCATOR
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
int mainEntryClickHouseODBCBridge(int argc, char ** argv);
#endif
#if USE_EMBEDDED_COMPILER
int mainEntryClickHouseClang(int argc, char ** argv);
@ -101,6 +105,10 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_OBFUSCATOR
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
{"odbc-bridge", mainEntryClickHouseODBCBridge},
#endif
#if USE_EMBEDDED_COMPILER
{"clang", mainEntryClickHouseClang},
{"clang++", mainEntryClickHouseClang},

View File

@ -0,0 +1,13 @@
add_library (clickhouse-odbc-bridge-lib
Handlers.cpp
HandlerFactory.cpp
ODBCBridge.cpp
)
target_link_libraries (clickhouse-odbc-bridge-lib clickhouse_common_io daemon)
target_include_directories (clickhouse-odbc-bridge-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
if (CLICKHOUSE_SPLIT_BINARY)
add_executable (clickhouse-odbc-bridge odbc-bridge.cpp)
target_link_libraries (clickhouse-odbc-bridge clickhouse-odbc-bridge-lib)
endif ()

View File

@ -0,0 +1,23 @@
#include "HandlerFactory.h"
#include <Common/HTMLForm.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <common/logger_useful.h>
namespace DB
{
Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request)
{
const auto & uri = request.getURI();
LOG_TRACE(log, "Request URI: " + uri);
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return new PingHandler(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return new ODBCHandler(pool_map, keep_alive_timeout, context);
return nullptr;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include "Handlers.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace DB
{
/** Factory for '/ping' and '/' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
pool_map = std::make_shared<ODBCHandler::PoolMap>();
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override;
private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
};
}

View File

@ -0,0 +1,146 @@
#include "Handlers.h"
#include <Common/HTMLForm.h>
#include <memory>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Formats/BinaryRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <boost/algorithm/string.hpp>
#include <boost/tokenizer.hpp>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_REQUEST_PARAMETER;
}
namespace
{
std::unique_ptr<Block> parseColumns(std::string && column_string)
{
std::unique_ptr<Block> sample_block = std::make_unique<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
}
ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
{
std::lock_guard lock(mutex);
if (!pool_map->count(connection_str))
{
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] {
return std::make_shared<Poco::Data::SessionPool>("ODBC", connection_str);
}));
}
return pool_map->at(connection_str);
}
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};
if (!params.has("query"))
{
process_error("No 'query' in request body");
return;
}
if (!params.has("columns"))
{
process_error("No 'columns' in request URL");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
}
size_t max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
std::string max_block_size_str = params.get("max_block_size", "");
if (max_block_size_str.empty())
{
process_error("Empty max_block_size specified");
return;
}
max_block_size = parse<size_t>(max_block_size_str);
}
std::string columns = params.get("columns");
std::unique_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
}
catch (const Exception & ex)
{
process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTrace().toString());
return;
}
std::string format = params.get("format", "RowBinary");
std::string query = params.get("query");
LOG_TRACE(log, "Query: " << query);
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '" << connection_string << "'");
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
try
{
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending
writeStringBinary(message, out);
tryLogCurrentException(log);
}
}
void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace DB
{
/** Main handler for requests to ODBC driver
* requires connection_string and columns in request params
* and also query in request body
* response in RowBinary format
*/
class ODBCHandler : public Poco::Net::HTTPRequestHandler
{
public:
using PoolPtr = std::shared_ptr<Poco::Data::SessionPool>;
using PoolMap = std::unordered_map<std::string, PoolPtr>;
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
size_t keep_alive_timeout_,
std::shared_ptr<Context> context_)
: log(&Poco::Logger::get("ODBCHandler"))
, pool_map(pool_map_)
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
Poco::Logger * log;
std::shared_ptr<PoolMap> pool_map;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
static inline std::mutex mutex;
PoolPtr getPool(const std::string & connection_str);
};
/** Simple ping handler, answers "Ok." to GET request
*/
class PingHandler : public Poco::Net::HTTPRequestHandler
{
public:
PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
size_t keep_alive_timeout;
};
}

View File

@ -0,0 +1,205 @@
#include "ODBCBridge.h"
#include "HandlerFactory.h"
#include <string>
#include <errno.h>
#include <IO/ReadHelpers.h>
#include <boost/program_options.hpp>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/String.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log,
"Cannot resolve listen_host (" << host << "), error " << e.code() << ": " << e.message()
<< ". "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>");
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
};
}
void ODBCBridge::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter helpFormatter(options());
helpFormatter.setCommand(commandName());
helpFormatter.setHeader("HTTP-proxy for odbc requests");
helpFormatter.setUsage("--http-port <port>");
helpFormatter.format(std::cerr);
stopOptionsProcessing();
}
void ODBCBridge::defineOptions(Poco::Util::OptionSet & options)
{
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
options.addOption(
Poco::Util::Option("listen-host", "", "hostname to listen, default localhost").argument("listen-host").binding("listen-host"));
options.addOption(
Poco::Util::Option("http-timeout", "", "http timout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
.argument("max-server-connections")
.binding("max-server-connections"));
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
.argument("keep-alive-timeout")
.binding("keep-alive-timeout"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
options.addOption(
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
.argument("err-log-path")
.binding("logger.errorlog"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message")
.binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
ServerApplication::defineOptions(options); /// Don't need complex BaseDaemon's .xml config
}
void ODBCBridge::initialize(Application & self)
{
BaseDaemon::closeFDs();
is_help = config().has("help");
if (is_help)
return;
if (!config().has("logger.log"))
config().setBool("logger.console", true);
config().setString("logger", "ODBCBridge");
buildLoggers(config());
log = &logger();
hostname = config().getString("listen-host", "localhost");
port = config().getUInt("http-port");
if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing();
ServerApplication::initialize(self);
}
void ODBCBridge::uninitialize()
{
BaseDaemon::uninitialize();
}
int ODBCBridge::main(const std::vector<std::string> & /*args*/)
{
if (is_help)
return Application::EXIT_OK;
LOG_INFO(log, "Starting up");
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
context = std::make_shared<Context>(Context::createGlobal());
context->setGlobalContext(*context);
auto server = Poco::Net::HTTPServer(
new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params);
server.start();
LOG_INFO(log, "Listening http://" + address.toString());
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
server.stop();
for (size_t count : ext::range(1, 6))
{
if (server.currentConnections() == 0)
break;
LOG_DEBUG(log, "Waiting for " << server.currentConnections() << " connections, try " << count);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
waitForTerminationRequest();
return Application::EXIT_OK;
}
}
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
{
DB::ODBCBridge app;
try
{
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <daemon/BaseDaemon.h>
namespace DB
{
/** Class represents clickhouse-odbc-bridge server, which listen
* incoming HTTP POST and GET requests on specified port and host.
* Has two handlers '/' for all incoming POST requests to ODBC driver
* and /ping for GET request about service status
*/
class ODBCBridge : public BaseDaemon
{
public:
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
void initialize(Application & self) override;
void uninitialize() override;
int main(const std::vector<std::string> & args) override;
private:
void handleHelp(const std::string &, const std::string &);
bool is_help;
std::string hostname;
size_t port;
size_t http_timeout;
std::string log_level;
size_t max_server_connections;
size_t keep_alive_timeout;
Poco::Logger * log;
std::shared_ptr<Context> context; /// need for settings only
};
}

View File

@ -0,0 +1,38 @@
# clickhouse-odbc-bridge
Simple HTTP-server which works like a proxy for ODBC driver. The main motivation
was possible segfaults or another faults in ODBC implementations, which can
crash whole clickhouse-server process.
This tool works via HTTP, not via pipes, shared memory, or TCP because:
- It's simplier to implement
- It's simplier to debug
- jdbc-bridge can be implemented in the same way
## Usage
`clickhouse-server` use this tool inside odbc table function and StorageODBC.
However it can be used as standalone tool from command line with the following
parameters in POST-request URL:
- `connection_string` -- ODBC connection string.
- `columns` -- columns in ClickHouse NamesAndTypesList format, name in backticks,
type as string. Name and type are space separated, rows separated with
newline.
- `max_block_size` -- optional parameter, sets maximum size of single block.
Query is send in post body. Response is returned in RowBinary format.
## Example:
```bash
$ clickhouse-odbc-bridge --http-port 9018 --daemon
$ curl -d "query=SELECT PageID, ImpID, AdType FROM Keys ORDER BY PageID, ImpID" --data-urlencode "connection_string=DSN=ClickHouse;DATABASE=stat" --data-urlencode "columns=columns format version: 1
3 columns:
\`PageID\` String
\`ImpID\` String
\`AdType\` String
" "http://localhost:9018/" > result.txt
$ cat result.txt
12246623837185725195925621517
```

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseODBCBridge(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseODBCBridge(argc_, argv_); }

View File

@ -382,6 +382,9 @@ namespace ErrorCodes
extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT = 405;
extern const int TOP_AND_LIMIT_TOGETHER = 406;
extern const int DECIMAL_OVERFLOW = 407;
extern const int BAD_REQUEST_PARAMETER = 408;
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND = 409;
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING = 410;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,106 @@
#include <Common/ODBCBridgeHelper.h>
#include <sstream>
#include <Common/validateODBCConnectionString.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <common/logger_useful.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
ODBCBridgeHelper::ODBCBridgeHelper(const Context & context_global_, const std::string & connection_string_)
: context_global(context_global_), connection_string(validateODBCConnectionString(connection_string_))
{
const auto & config = context_global.getConfigRef();
size_t bridge_port = config.getUInt("odbc_bridge.port", DEFAULT_PORT);
std::string bridge_host = config.getString("odbc_bridge.host", DEFAULT_HOST);
ping_url.setHost(bridge_host);
ping_url.setPort(bridge_port);
ping_url.setScheme("http");
ping_url.setPath(PING_HANDLER);
}
void ODBCBridgeHelper::startODBCBridge() const
{
const auto & config = context_global.getConfigRef();
const auto & settings = context_global.getSettingsRef();
Poco::Path path{config.getString("application.dir", "")};
path.setFileName("clickhouse-odbc-bridge");
if (!path.isFile())
throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() << ' ';
command << "--http-port " << config.getUInt("odbc_bridge.port", DEFAULT_PORT) << ' ';
command << "--listen-host " << config.getString("odbc_bridge.listen_host", DEFAULT_HOST) << ' ';
command << "--http-timeout " << settings.http_receive_timeout.value.totalSeconds() << ' ';
if (config.has("logger.odbc_bridge_log"))
command << "--log-path " << config.getString("logger.odbc_bridge_log") << ' ';
if (config.has("logger.odbc_bridge_errlog"))
command << "--err-log-path " << config.getString("logger.odbc_bridge_errlog") << ' ';
if (config.has("logger.odbc_bridge_level"))
command << "--log-level " << config.getString("logger.odbc_bridge_level") << ' ';
command << "&"; /// we don't want to wait this process
auto command_str = command.str();
LOG_TRACE(log, "Starting clickhouse-odbc-bridge with command: " << command_str);
auto cmd = ShellCommand::execute(command_str);
cmd->wait();
}
std::vector<std::pair<std::string, std::string>> ODBCBridgeHelper::getURLParams(const NamesAndTypesList & cols, size_t max_block_size) const
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols.toString());
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;
}
bool ODBCBridgeHelper::checkODBCBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(ping_url, ODBCBridgeHelper::PING_METHOD, nullptr);
return checkString(ODBCBridgeHelper::PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void ODBCBridgeHelper::startODBCBridgeSync() const
{
if (!checkODBCBridgeIsRunning())
{
LOG_TRACE(log, "clickhouse-odbc-bridge is not running, will try to start it");
startODBCBridge();
bool started = false;
for (size_t counter : ext::range(1, 20))
{
LOG_TRACE(log, "Checking clickhouse-odbc-bridge is running, try " << counter);
if (checkODBCBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (!started)
throw Exception("ODBCBridgeHelper: clickhouse-odbc-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
}

View File

@ -0,0 +1,47 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
}
/** Helper for odbc-bridge, provide utility methods, not main request
*/
class ODBCBridgeHelper
{
private:
const Context & context_global;
std::string connection_string;
Poco::URI ping_url;
Poco::Logger * log = &Poco::Logger::get("ODBCBridgeHelper");
public:
static constexpr inline size_t DEFAULT_PORT = 9018;
static constexpr inline auto DEFAULT_HOST = "localhost";
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
static const inline std::string PING_METHOD = Poco::Net::HTTPRequest::HTTP_GET;
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
ODBCBridgeHelper(const Context & context_global_, const std::string & connection_string_);
std::vector<std::pair<std::string, std::string>> getURLParams(const NamesAndTypesList & cols, size_t max_block_size) const;
bool checkODBCBridgeIsRunning() const;
void startODBCBridge() const;
void startODBCBridgeSync() const;
};
}

View File

@ -71,3 +71,6 @@ target_link_libraries (cow_columns clickhouse_common_io)
add_executable (stopwatch stopwatch.cpp)
target_link_libraries (stopwatch clickhouse_common_io)
add_executable (validate-odbc-connection-string validate-odbc-connection-string.cpp)
target_link_libraries (validate-odbc-connection-string dbms)

View File

@ -1,6 +1,6 @@
#include <iostream>
#include <Common/Exception.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include <Common/validateODBCConnectionString.h>
using namespace DB;

View File

@ -5,7 +5,7 @@
#include <common/find_first_symbols.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include <Common/validateODBCConnectionString.h>
namespace DB

View File

@ -1,3 +0,0 @@
if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()

View File

@ -1,20 +1,58 @@
#include <Dictionaries/ODBCDictionarySource.h>
#include <common/logger_useful.h>
#include <common/LocalDateTime.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Dictionaries/ODBCDictionarySource.h>
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Dictionaries/readInvalidateQuery.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace
{
class ODBCBridgeBlockInputStream : public IProfilingBlockInputStream
{
public:
ODBCBridgeBlockInputStream(const Poco::URI & uri,
std::function<void(std::ostream &)> callback,
const Block & sample_block,
const Context & context,
size_t max_block_size,
const ConnectionTimeouts & timeouts)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, ODBCBridgeHelper::MAIN_METHOD, callback, timeouts);
reader = FormatFactory::instance().getInput(ODBCBridgeHelper::DEFAULT_FORMAT, *read_buf, sample_block, context, max_block_size);
}
Block getHeader() const override
{
return reader->getHeader();
}
String getName() const override
{
return "ODBCBridgeBlockInputStream";
}
private:
Block readImpl() override
{
return reader->read();
}
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};
}
static const size_t max_block_size = 8192;
@ -32,20 +70,22 @@ ODBCDictionarySource::ODBCDictionarySource(const DictionaryStructure & dict_stru
sample_block{sample_block},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None}, /// NOTE Better to obtain quoting style via ODBC interface.
load_all_query{query_builder.composeLoadAllQuery()},
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
invalidate_query{config.getString(config_prefix + ".invalidate_query", "")},
odbc_bridge_helper{context, config.getString(config_prefix + ".connection_string")},
timeouts{ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef())},
global_context(context)
{
std::size_t field_size = context.getSettingsRef().odbc_max_field_size;
const auto & global_config = context.getConfigRef();
size_t bridge_port = global_config.getUInt("odbc_bridge.port", ODBCBridgeHelper::DEFAULT_PORT);
std::string bridge_host = global_config.getString("odbc_bridge.host", ODBCBridgeHelper::DEFAULT_HOST);
pool = createAndCheckResizePocoSessionPool([&]
{
auto session = std::make_shared<Poco::Data::SessionPool>(
config.getString(config_prefix + ".connector", "ODBC"),
validateODBCConnectionString(config.getString(config_prefix + ".connection_string")));
bridge_url.setHost(bridge_host);
bridge_url.setPort(bridge_port);
bridge_url.setScheme("http");
/// Default POCO value is 1024. Set property manually to make possible reading of longer strings.
session->setProperty("maxFieldSize", Poco::Any(field_size));
return session;
});
auto url_params = odbc_bridge_helper.getURLParams(sample_block.getNamesAndTypesList(), max_block_size);
for (const auto & [name, value] : url_params)
bridge_url.addQueryParameter(name, value);
}
/// copy-constructor is provided in order to support cloneability
@ -58,11 +98,14 @@ ODBCDictionarySource::ODBCDictionarySource(const ODBCDictionarySource & other)
where{other.where},
update_field{other.update_field},
sample_block{other.sample_block},
pool{other.pool},
query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::None},
load_all_query{other.load_all_query},
invalidate_query{other.invalidate_query}, invalidate_query_response{other.invalidate_query_response}
invalidate_query{other.invalidate_query},
invalidate_query_response{other.invalidate_query_response},
odbc_bridge_helper{other.odbc_bridge_helper},
global_context{other.global_context}
{
}
std::string ODBCDictionarySource::getUpdateFieldAndDate()
@ -86,7 +129,7 @@ std::string ODBCDictionarySource::getUpdateFieldAndDate()
BlockInputStreamPtr ODBCDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return std::make_shared<ODBCBlockInputStream>(pool->get(), load_all_query, sample_block, max_block_size);
return loadBase(load_all_query);
}
BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll()
@ -94,20 +137,20 @@ BlockInputStreamPtr ODBCDictionarySource::loadUpdatedAll()
std::string load_query_update = getUpdateFieldAndDate();
LOG_TRACE(log, load_query_update);
return std::make_shared<ODBCBlockInputStream>(pool->get(), load_query_update, sample_block, max_block_size);
return loadBase(load_query_update);
}
BlockInputStreamPtr ODBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return std::make_shared<ODBCBlockInputStream>(pool->get(), query, sample_block, max_block_size);
return loadBase(query);
}
BlockInputStreamPtr ODBCDictionarySource::loadKeys(
const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return std::make_shared<ODBCBlockInputStream>(pool->get(), query, sample_block, max_block_size);
return loadBase(query);
}
bool ODBCDictionarySource::supportsSelectiveLoad() const
@ -148,8 +191,28 @@ std::string ODBCDictionarySource::doInvalidateQuery(const std::string & request)
Block invalidate_sample_block;
ColumnPtr column(ColumnString::create());
invalidate_sample_block.insert(ColumnWithTypeAndName(column, std::make_shared<DataTypeString>(), "Sample Block"));
ODBCBlockInputStream block_input_stream(pool->get(), request, invalidate_sample_block, 1);
return readInvalidateQuery(block_input_stream);
odbc_bridge_helper.startODBCBridgeSync();
ODBCBridgeBlockInputStream stream(
bridge_url,
[request](std::ostream & os) { os << "query=" << request; },
invalidate_sample_block,
global_context,
max_block_size,
timeouts);
return readInvalidateQuery(stream);
}
BlockInputStreamPtr ODBCDictionarySource::loadBase(const std::string & query) const
{
odbc_bridge_helper.startODBCBridgeSync();
return std::make_shared<ODBCBridgeBlockInputStream>(bridge_url,
[query](std::ostream & os) { os << "query=" << query; },
sample_block,
global_context,
max_block_size,
timeouts);
}
}

View File

@ -1,11 +1,16 @@
#pragma once
#include <Poco/Data/SessionPool.h>
#include <Poco/URI.h>
#include <Common/ODBCBridgeHelper.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/ExternalQueryBuilder.h>
#include <Dictionaries/DictionaryStructure.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
@ -58,6 +63,8 @@ private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const std::string & query) const;
Poco::Logger * log;
std::chrono::time_point<std::chrono::system_clock> update_time;
@ -67,11 +74,16 @@ private:
const std::string where;
const std::string update_field;
Block sample_block;
std::shared_ptr<Poco::Data::SessionPool> pool = nullptr;
ExternalQueryBuilder query_builder;
const std::string load_all_query;
std::string invalidate_query;
mutable std::string invalidate_query_response;
ODBCBridgeHelper odbc_bridge_helper;
Poco::URI bridge_url;
ConnectionTimeouts timeouts;
const Context & global_context;
};

View File

@ -1,73 +1,105 @@
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Storages/StorageODBC.h>
#include <Storages/StorageFactory.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageODBC.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Common/ShellCommand.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
StorageODBC::StorageODBC(
const std::string & name,
StorageODBC::StorageODBC(const std::string & table_name_,
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_)
: IStorage{columns_}
, name(name)
, remote_database_name(remote_database_name)
, remote_table_name(remote_table_name)
const std::string & remote_database_name_,
const std::string & remote_table_name_,
const ColumnsDescription & columns_,
const Context & context_)
: IStorageURLBase(Poco::URI(), context_, table_name_, ODBCBridgeHelper::DEFAULT_FORMAT, columns_)
, odbc_bridge_helper(context_, connection_string)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
, log(&Poco::Logger::get("StorageODBC"))
{
pool = createAndCheckResizePocoSessionPool([&]
{
return std::make_shared<Poco::Data::SessionPool>("ODBC", validateODBCConnectionString(connection_string));
});
const auto & config = context_global.getConfigRef();
size_t bridge_port = config.getUInt("odbc_bridge.port", ODBCBridgeHelper::DEFAULT_PORT);
std::string bridge_host = config.getString("odbc_bridge.host", ODBCBridgeHelper::DEFAULT_HOST);
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
}
BlockInputStreams StorageODBC::read(
const Names & column_names,
std::string StorageODBC::getReadMethod() const
{
return ODBCBridgeHelper::MAIN_METHOD;
}
std::vector<std::pair<std::string, std::string>> StorageODBC::getReadURIParams(const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t max_block_size) const
{
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = getColumn(name);
cols.emplace_back(column_data.name, column_data.type);
}
return odbc_bridge_helper.getURLParams(cols, max_block_size);
}
std::function<void(std::ostream &)> StorageODBC::getReadPOSTDataCallback(const Names & /*column_names*/,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
String query = transformQueryForExternalDatabase(
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::DoubleQuotes, remote_database_name, remote_table_name, context);
return [query](std::ostream & os) { os << "query=" << query; };
}
BlockInputStreams StorageODBC::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
unsigned num_streams)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
String query = transformQueryForExternalDatabase(
*query_info.query, getColumns().ordinary, IdentifierQuotingStyle::DoubleQuotes, remote_database_name, remote_table_name, context);
Block sample_block;
for (const String & name : column_names)
{
auto column_data = getColumn(name);
sample_block.insert({ column_data.type, column_data.name });
}
return { std::make_shared<ODBCBlockInputStream>(pool->get(), query, sample_block, max_block_size) };
odbc_bridge_helper.startODBCBridgeSync();
return IStorageURLBase::read(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
void registerStorageODBC(StorageFactory & factory)
{
factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args)
{
factory.registerStorage("ODBC", [](const StorageFactory::Arguments & args) {
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 3)
throw Exception(
"Storage ODBC requires exactly 3 parameters: ODBC('DSN', database, table).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage ODBC requires exactly 3 parameters: ODBC('DSN', database, table).", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 2; ++i)
engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[i], args.local_context);
@ -76,8 +108,8 @@ void registerStorageODBC(StorageFactory & factory)
static_cast<const ASTLiteral &>(*engine_args[0]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>(),
static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>(),
args.columns);
args.columns,
args.context);
});
}
}

View File

@ -1,44 +1,59 @@
#pragma once
#include <Storages/StorageURL.h>
#include <Common/ODBCBridgeHelper.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Poco/Data/SessionPool.h>
namespace DB
{
/** Implements storage in the ODBC database.
* Use ENGINE = odbc(connection_string, table_name)
* Example ENGINE = odbc('dsn=test', table)
* Read only.
*/
class StorageODBC : public ext::shared_ptr_helper<StorageODBC>, public IStorage
class StorageODBC : public ext::shared_ptr_helper<StorageODBC>, public IStorageURLBase
{
public:
StorageODBC(
const std::string & name,
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_);
std::string getName() const override
{
return "ODBC";
}
std::string getName() const override { return "ODBC"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
BlockInputStreams read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
protected:
StorageODBC(const std::string & table_name_,
const std::string & connection_string,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_);
private:
std::string name;
ODBCBridgeHelper odbc_bridge_helper;
std::string remote_database_name;
std::string remote_table_name;
std::shared_ptr<Poco::Data::SessionPool> pool;
Poco::Logger * log;
std::string getReadMethod() const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
};
}

View File

@ -24,12 +24,12 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StorageURL::StorageURL(const Poco::URI & uri_,
IStorageURLBase::IStorageURLBase(const Poco::URI & uri_,
const Context & context_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
Context & context_)
: IStorage(columns_), uri(uri_), format_name(format_name_), table_name(table_name_), context_global(context_)
const ColumnsDescription & columns_)
: IStorage(columns_), uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_)
{
}
@ -39,6 +39,8 @@ namespace
{
public:
StorageURLBlockInputStream(const Poco::URI & uri,
const std::string & method,
std::function<void(std::ostream &)> callback,
const String & format,
const String & name_,
const Block & sample_block,
@ -47,7 +49,7 @@ namespace
const ConnectionTimeouts & timeouts)
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_GET, nullptr, timeouts);
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, method, callback, timeouts);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
@ -89,7 +91,7 @@ namespace
StorageURLBlockOutputStream(const Poco::URI & uri,
const String & format,
const Block & sample_block_,
Context & context,
const Context & context,
const ConnectionTimeouts & timeouts)
: sample_block(sample_block_)
{
@ -127,16 +129,45 @@ namespace
}
BlockInputStreams StorageURL::read(
const Names & /*column_names*/,
std::string IStorageURLBase::getReadMethod() const
{
return Poco::Net::HTTPRequest::HTTP_GET;
}
std::vector<std::pair<std::string, std::string>> IStorageURLBase::getReadURIParams(const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & context,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return {};
}
std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(const Names & /*column_names*/,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t /*max_block_size*/) const
{
return nullptr;
}
BlockInputStreams IStorageURLBase::read(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
{
return {std::make_shared<StorageURLBlockInputStream>(
uri,
auto request_uri = uri;
auto params = getReadURIParams(column_names, query_info, context, processed_stage, max_block_size);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
return {std::make_shared<StorageURLBlockInputStream>(request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
getName(),
getSampleBlock(),
@ -145,9 +176,9 @@ BlockInputStreams StorageURL::read(
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))};
}
void StorageURL::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {}
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {}
BlockOutputStreamPtr StorageURL::write(const ASTPtr & /*query*/, const Settings & /*settings*/)
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Settings & /*settings*/)
{
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global.getSettingsRef()));
@ -155,8 +186,7 @@ BlockOutputStreamPtr StorageURL::write(const ASTPtr & /*query*/, const Settings
void registerStorageURL(StorageFactory & factory)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args)
{
factory.registerStorage("URL", [](const StorageFactory::Arguments & args) {
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
@ -175,5 +205,4 @@ void registerStorageURL(StorageFactory & factory)
return StorageURL::create(uri, args.table_name, format_name, args.columns, args.context);
});
}
}

View File

@ -13,14 +13,9 @@ namespace DB
* HTTP POST when insert is called. In POST request the data is send
* using Chunked transfer encoding, so server have to support it.
*/
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorage
class IStorageURLBase : public IStorage
{
public:
String getName() const override
{
return "URL";
}
String getTableName() const override
{
return table_name;
@ -38,18 +33,49 @@ public:
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
protected:
IStorageURLBase(const Poco::URI & uri_,
const Context & context_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_);
Poco::URI uri;
const Context & context_global;
private:
String format_name;
String table_name;
virtual std::string getReadMethod() const;
virtual std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
virtual std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
};
class StorageURL : public ext::shared_ptr_helper<StorageURL>, public IStorageURLBase
{
public:
StorageURL(const Poco::URI & uri_,
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
Context & context_);
Context & context_)
: IStorageURLBase(uri_, context_, table_name_, format_name_, columns_)
{
}
private:
Poco::URI uri;
String format_name;
String table_name;
Context & context_global;
Logger * log = &Logger::get("StorageURL");
String getName() const override
{
return "URL";
}
};
}

View File

@ -9,7 +9,6 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageODBC.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include <TableFunctions/ITableFunction.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/Exception.h>
@ -72,7 +71,7 @@ StoragePtr TableFunctionODBC::executeImpl(const ASTPtr & ast_function, const Con
for (int i = 0; i < 2; ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string connection_string = validateODBCConnectionString(static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>());
std::string connection_string = static_cast<const ASTLiteral &>(*args[0]).value.safeGet<String>();
std::string table_name = static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>();
Poco::Data::ODBC::SessionImpl session(connection_string, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
@ -110,7 +109,7 @@ StoragePtr TableFunctionODBC::executeImpl(const ASTPtr & ast_function, const Con
columns.emplace_back(reinterpret_cast<char *>(column_name), getDataType(type));
}
auto result = StorageODBC::create(table_name, connection_string, "", table_name, ColumnsDescription{columns});
auto result = StorageODBC::create(table_name, connection_string, "", table_name, ColumnsDescription{columns}, context);
result->startup();
return result;
}

View File

@ -145,6 +145,11 @@ public:
return layer; /// layer выставляется в классе-наследнике BaseDaemonApplication.
}
/// close all process FDs except
/// 0-2 -- stdin, stdout, stderr
/// also doesn't close global internal pipes for signal handling
void closeFDs();
protected:
/// Возвращает TaskManager приложения
/// все методы task_manager следует вызывать из одного потока
@ -159,6 +164,9 @@ protected:
/// thread safe
virtual void handleSignal(int signal_id);
/// initialize termination process and signal handlers
virtual void initializeTerminationAndSignalProcessing();
/// реализация обработки сигналов завершения через pipe не требует блокировки сигнала с помощью sigprocmask во всех потоках
void waitForTerminationRequest()
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 // in old upstream poco not vitrual

View File

@ -11,6 +11,7 @@
#include <signal.h>
#include <cxxabi.h>
#include <execinfo.h>
#include <unistd.h>
#if USE_UNWIND
#define UNW_LOCAL_ONLY
@ -54,6 +55,7 @@
#include <Poco/NumberFormatter.h>
#include <Poco/Condition.h>
#include <Poco/SyslogChannel.h>
#include <Poco/DirectoryIterator.h>
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromFileDescriptor.h>
@ -845,8 +847,40 @@ std::string BaseDaemon::getDefaultCorePath() const
return "/opt/cores/";
}
void BaseDaemon::closeFDs()
{
#if defined(__FreeBSD__) || (defined(__APPLE__) && defined(__MACH__))
Poco::File proc_path{"/dev/fd"};
#else
Poco::File proc_path{"/proc/self/fd"};
#endif
if (proc_path.isDirectory()) /// Hooray, proc exists
{
Poco::DirectoryIterator itr(proc_path), end;
for (; itr != end; ++itr)
{
long fd = DB::parse<long>(itr.name());
if (fd > 2 && fd != signal_pipe.read_fd && fd != signal_pipe.write_fd)
::close(fd);
}
}
else
{
long max_fd = -1;
#ifdef _SC_OPEN_MAX
max_fd = sysconf(_SC_OPEN_MAX);
if (max_fd == -1)
#endif
max_fd = 256; /// bad fallback
for (long fd = 3; fd < max_fd; ++fd)
if (fd != signal_pipe.read_fd && fd != signal_pipe.write_fd)
::close(fd);
}
}
void BaseDaemon::initialize(Application & self)
{
closeFDs();
task_manager.reset(new Poco::TaskManager);
ServerApplication::initialize(self);
@ -1031,6 +1065,19 @@ void BaseDaemon::initialize(Application & self)
throw Poco::Exception("Cannot change directory to " + core_path);
}
initializeTerminationAndSignalProcessing();
logRevision();
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
graphite_writers.emplace(key, std::make_unique<GraphiteWriter>(key));
}
}
void BaseDaemon::initializeTerminationAndSignalProcessing()
{
std::set_terminate(terminate_handler);
/// We want to avoid SIGPIPE when working with sockets and pipes, and just handle return value/errno instead.
@ -1071,15 +1118,9 @@ void BaseDaemon::initialize(Application & self)
static KillingErrorHandler killing_error_handler;
Poco::ErrorHandler::set(&killing_error_handler);
logRevision();
signal_listener.reset(new SignalListener(*this));
signal_listener_thread.start(*signal_listener);
for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))
{
graphite_writers.emplace(key, std::make_unique<GraphiteWriter>(key));
}
}
void BaseDaemon::logRevision() const