Merge pull request #21509 from kssenii/library-bridge

clickhouse-library-bridge for library dictionary source
This commit is contained in:
alesapin 2021-04-06 12:26:08 +03:00 committed by GitHub
commit 86a843bb51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 2808 additions and 860 deletions

View File

@ -8,6 +8,7 @@ add_subdirectory (loggers)
add_subdirectory (pcg-random)
add_subdirectory (widechar_width)
add_subdirectory (readpassphrase)
add_subdirectory (bridge)
if (USE_MYSQL)
add_subdirectory (mysqlxx)

View File

@ -0,0 +1,7 @@
add_library (bridge
IBridge.cpp
)
target_include_directories (daemon PUBLIC ..)
target_link_libraries (bridge PRIVATE daemon dbms Poco::Data Poco::Data::ODBC)

238
base/bridge/IBridge.cpp Normal file
View File

@ -0,0 +1,238 @@
#include "IBridge.h"
#include <IO/ReadHelpers.h>
#include <boost/program_options.hpp>
#include <Poco/Net/NetException.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Formats/registerFormats.h>
#include <common/logger_useful.h>
#include <Common/SensitiveDataMasker.h>
#include <Server/HTTP/HTTPServer.h>
#if USE_ODBC
# include <Poco/Data/ODBC/Connector.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
}
}
void IBridge::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter help_formatter(options());
help_formatter.setCommand(commandName());
help_formatter.setHeader("HTTP-proxy for odbc requests");
help_formatter.setUsage("--http-port <port>");
help_formatter.format(std::cerr);
stopOptionsProcessing();
}
void IBridge::defineOptions(Poco::Util::OptionSet & options)
{
options.addOption(
Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true) .binding("http-port"));
options.addOption(
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
options.addOption(
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
options.addOption(
Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024").argument("max-server-connections").binding("max-server-connections"));
options.addOption(
Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10").argument("keep-alive-timeout").binding("keep-alive-timeout"));
options.addOption(
Poco::Util::Option("log-level", "", "sets log level, default info") .argument("log-level").binding("logger.level"));
options.addOption(
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
options.addOption(
Poco::Util::Option("err-log-path", "", "err log path for all logs, default no").argument("err-log-path").binding("logger.errorlog"));
options.addOption(
Poco::Util::Option("stdout-path", "", "stdout log path, default console").argument("stdout-path").binding("logger.stdout"));
options.addOption(
Poco::Util::Option("stderr-path", "", "stderr log path, default console").argument("stderr-path").binding("logger.stderr"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(
Poco::Util::Option("help", "", "produce this help message").binding("help").callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
}
void IBridge::initialize(Application & self)
{
BaseDaemon::closeFDs();
is_help = config().has("help");
if (is_help)
return;
config().setString("logger", bridgeName());
/// Redirect stdout, stderr to specified files.
/// Some libraries and sanitizers write to stderr in case of errors.
const auto stdout_path = config().getString("logger.stdout", "");
if (!stdout_path.empty())
{
if (!freopen(stdout_path.c_str(), "a+", stdout))
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
/// Disable buffering for stdout.
setbuf(stdout, nullptr);
}
const auto stderr_path = config().getString("logger.stderr", "");
if (!stderr_path.empty())
{
if (!freopen(stderr_path.c_str(), "a+", stderr))
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
/// Disable buffering for stderr.
setbuf(stderr, nullptr);
}
buildLoggers(config(), logger(), self.commandName());
BaseDaemon::logRevision();
log = &logger();
hostname = config().getString("listen-host", "127.0.0.1");
port = config().getUInt("http-port");
if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing();
#if USE_ODBC
if (bridgeName() == "ODBCBridge")
Poco::Data::ODBC::Connector::registerConnector();
#endif
ServerApplication::initialize(self); // NOLINT
}
void IBridge::uninitialize()
{
BaseDaemon::uninitialize();
}
int IBridge::main(const std::vector<std::string> & /*args*/)
{
if (is_help)
return Application::EXIT_OK;
registerFormats();
LOG_INFO(log, "Starting up {} on host: {}, port: {}", bridgeName(), hostname, port);
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
auto shared_context = Context::createShared();
Context context(Context::createGlobal(shared_context.get()));
context.makeGlobalContext();
if (config().has("query_masking_rules"))
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
auto server = HTTPServer(
context,
getHandlerFactoryPtr(context),
server_pool,
socket,
http_params);
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
server.stop();
for (size_t count : ext::range(1, 6))
{
if (server.currentConnections() == 0)
break;
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
server.start();
LOG_INFO(log, "Listening http://{}", address.toString());
waitForTerminationRequest();
return Application::EXIT_OK;
}
}

50
base/bridge/IBridge.h Normal file
View File

@ -0,0 +1,50 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Poco/Util/ServerApplication.h>
#include <Poco/Logger.h>
#include <daemon/BaseDaemon.h>
namespace DB
{
/// Class represents base for clickhouse-odbc-bridge and clickhouse-library-bridge servers.
/// Listens to incoming HTTP POST and GET requests on specified port and host.
/// Has two handlers '/' for all incoming POST requests and /ping for GET request about service status.
class IBridge : public BaseDaemon
{
public:
/// Define command line arguments
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
using HandlerFactoryPtr = std::shared_ptr<HTTPRequestHandlerFactory>;
void initialize(Application & self) override;
void uninitialize() override;
int main(const std::vector<std::string> & args) override;
virtual const std::string bridgeName() const = 0;
virtual HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const = 0;
size_t keep_alive_timeout;
private:
void handleHelp(const std::string &, const std::string &);
bool is_help;
std::string hostname;
size_t port;
std::string log_level;
size_t max_server_connections;
size_t http_timeout;
Poco::Logger * log;
};
}

View File

@ -1,5 +1,6 @@
usr/bin/clickhouse
usr/bin/clickhouse-odbc-bridge
usr/bin/clickhouse-library-bridge
usr/bin/clickhouse-extract-from-config
usr/share/bash-completion/completions
etc/security/limits.d/clickhouse.conf

View File

@ -19,7 +19,8 @@ RUN apt-get update \
tar \
krb5-user \
iproute2 \
lsof
lsof \
g++
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -31,6 +31,7 @@ RUN apt-get update \
software-properties-common \
libkrb5-dev \
krb5-user \
g++ \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -21,6 +21,7 @@ export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/clickhouse-config
export CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH=/clickhouse-odbc-bridge
export CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH=/clickhouse-library-bridge
export DOCKER_MYSQL_GOLANG_CLIENT_TAG=${DOCKER_MYSQL_GOLANG_CLIENT_TAG:=latest}
export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}

View File

@ -36,6 +36,9 @@ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Table data obfuscator (convert real data t
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source"
${ENABLE_CLICKHOUSE_ALL})
# https://presentations.clickhouse.tech/matemarketing_2020/
option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories"
${ENABLE_CLICKHOUSE_ALL})
@ -109,6 +112,12 @@ else()
message(STATUS "ODBC bridge mode: OFF")
endif()
if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
message(STATUS "Library bridge mode: ON")
else()
message(STATUS "Library bridge mode: OFF")
endif()
if (ENABLE_CLICKHOUSE_INSTALL)
message(STATUS "ClickHouse install: ON")
else()
@ -194,6 +203,10 @@ if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()
if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
add_subdirectory (library-bridge)
endif ()
if (CLICKHOUSE_ONE_SHARED)
add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK})
@ -209,6 +222,10 @@ if (CLICKHOUSE_SPLIT_BINARY)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-odbc-bridge)
endif ()
if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-library-bridge)
endif ()
set_target_properties(${CLICKHOUSE_ALL_TARGETS} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
add_custom_target (clickhouse-bundle ALL DEPENDS ${CLICKHOUSE_ALL_TARGETS})

View File

@ -15,3 +15,4 @@
#cmakedefine01 ENABLE_CLICKHOUSE_GIT_IMPORT
#cmakedefine01 ENABLE_CLICKHOUSE_INSTALL
#cmakedefine01 ENABLE_CLICKHOUSE_ODBC_BRIDGE
#cmakedefine01 ENABLE_CLICKHOUSE_LIBRARY_BRIDGE

View File

@ -0,0 +1,26 @@
set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
library-bridge.cpp
library-log.cpp
LibraryBridge.cpp
Handlers.cpp
HandlerFactory.cpp
SharedLibraryHandler.cpp
SharedLibraryHandlerFactory.cpp
)
if (OS_LINUX)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-export-dynamic")
endif ()
add_executable(clickhouse-library-bridge ${CLICKHOUSE_LIBRARY_BRIDGE_SOURCES})
target_link_libraries(clickhouse-library-bridge PRIVATE
daemon
dbms
clickhouse_parsers
bridge
)
set_target_properties(clickhouse-library-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
install(TARGETS clickhouse-library-bridge RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)

View File

@ -0,0 +1,23 @@
#include "HandlerFactory.h"
#include <Poco/Net/HTTPServerRequest.h>
#include <Server/HTTP/HTMLForm.h>
#include "Handlers.h"
namespace DB
{
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<PingHandler>(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context);
return nullptr;
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <common/logger_useful.h>
namespace DB
{
class SharedLibraryHandler;
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
/// Factory for '/ping', '/' handlers.
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory
{
public:
LibraryBridgeHandlerFactory(
const std::string & name_,
size_t keep_alive_timeout_,
Context & context_)
: log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
{
}
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
Context & context;
};
}

View File

@ -0,0 +1,288 @@
#include "Handlers.h"
#include "SharedLibraryHandlerFactory.h"
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h>
namespace DB
{
namespace
{
std::shared_ptr<Block> parseColumns(std::string && column_string)
{
auto sample_block = std::make_shared<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
{
ReadBufferFromString buf(ids_string);
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
}
std::vector<std::string> parseNamesFromBinary(const std::string & names_string)
{
ReadBufferFromString buf(names_string);
std::vector<std::string> names;
readVectorBinary(names, buf);
return names;
}
}
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(request);
if (!params.has("method"))
{
processError(response, "No 'method' in request URL");
return;
}
if (!params.has("dictionary_id"))
{
processError(response, "No 'dictionary_id in request URL");
return;
}
std::string method = params.get("method");
std::string dictionary_id = params.get("dictionary_id");
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
if (method == "libNew")
{
auto & read_buf = request.getStream();
params.read(read_buf);
if (!params.has("library_path"))
{
processError(response, "No 'library_path' in request URL");
return;
}
if (!params.has("library_settings"))
{
processError(response, "No 'library_settings' in request URL");
return;
}
std::string library_path = params.get("library_path");
const auto & settings_string = params.get("library_settings");
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
/// Needed for library dictionary
if (!params.has("attributes_names"))
{
processError(response, "No 'attributes_names' in request URL");
return;
}
const auto & attributes_string = params.get("attributes_names");
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
/// Needed to parse block from binary string format
if (!params.has("sample_block"))
{
processError(response, "No 'sample_block' in request URL");
return;
}
std::string sample_block_string = params.get("sample_block");
std::shared_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(sample_block_string));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'sample_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
if (!params.has("null_values"))
{
processError(response, "No 'null_values' in request URL");
return;
}
ReadBufferFromString read_block_buf(params.get("null_values"));
auto format = FormatFactory::instance().getInput(FORMAT, read_block_buf, *sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto sample_block_with_nulls = reader->read();
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
writeStringBinary("1", out);
}
else if (method == "libClone")
{
if (!params.has("from_dictionary_id"))
{
processError(response, "No 'from_dictionary_id' in request URL");
return;
}
std::string from_dictionary_id = params.get("from_dictionary_id");
LOG_TRACE(log, "Calling libClone from {} to {}", from_dictionary_id, dictionary_id);
SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
writeStringBinary("1", out);
}
else if (method == "libDelete")
{
SharedLibraryHandlerFactory::instance().remove(dictionary_id);
writeStringBinary("1", out);
}
else if (method == "isModified")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
bool res = library_handler->isModified();
writeStringBinary(std::to_string(res), out);
}
else if (method == "supportsSelectiveLoad")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
bool res = library_handler->supportsSelectiveLoad();
writeStringBinary(std::to_string(res), out);
}
else if (method == "loadAll")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
auto input = library_handler->loadAll();
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
copyData(*input, *output);
}
else if (method == "loadIds")
{
params.read(request.getStream());
if (!params.has("ids"))
{
processError(response, "No 'ids' in request URL");
return;
}
std::vector<uint64_t> ids = parseIdsFromBinary(params.get("ids"));
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
auto input = library_handler->loadIds(ids);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
copyData(*input, *output);
}
else if (method == "loadKeys")
{
if (!params.has("requested_block_sample"))
{
processError(response, "No 'requested_block_sample' in request URL");
return;
}
std::string requested_block_string = params.get("requested_block_sample");
std::shared_ptr<Block> requested_sample_block;
try
{
requested_sample_block = parseColumns(std::move(requested_block_string));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'requested_block' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
auto & read_buf = request.getStream();
auto format = FormatFactory::instance().getInput(FORMAT, read_buf, *requested_sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
auto input = library_handler->loadKeys(block.getColumns());
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, context);
copyData(*input, *output);
}
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
tryLogCurrentException(log);
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <common/logger_useful.h>
#include "SharedLibraryHandler.h"
namespace DB
{
/// Handler for requests to Library Dictionary Source, returns response in RowBinary format.
/// When a library dictionary source is created, it sends libNew request to library bridge (which is started on first
/// request to it, if it was not yet started). On this request a new sharedLibrayHandler is added to a
/// sharedLibraryHandlerFactory by a dictionary uuid. With libNew request come: library_path, library_settings,
/// names of dictionary attributes, sample block to parse block of null values, block of null values. Everything is
/// passed in binary format and is urlencoded. When dictionary is cloned, a new handler is created.
/// Each handler is unique to dictionary.
class LibraryRequestHandler : public HTTPRequestHandler
{
public:
LibraryRequestHandler(
size_t keep_alive_timeout_,
Context & context_)
: log(&Poco::Logger::get("LibraryRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
static constexpr inline auto FORMAT = "RowBinary";
void processError(HTTPServerResponse & response, const std::string & message);
Poco::Logger * log;
size_t keep_alive_timeout;
Context & context;
};
class PingHandler : public HTTPRequestHandler
{
public:
explicit PingHandler(size_t keep_alive_timeout_)
: keep_alive_timeout(keep_alive_timeout_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
const size_t keep_alive_timeout;
};
}

View File

@ -0,0 +1,17 @@
#include "LibraryBridge.h"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
{
DB::LibraryBridge app;
try
{
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Interpreters/Context.h>
#include <bridge/IBridge.h>
#include "HandlerFactory.h"
namespace DB
{
class LibraryBridge : public IBridge
{
protected:
const std::string bridgeName() const override
{
return "LibraryBridge";
}
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
{
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context);
}
};
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <Common/StringUtils/StringUtils.h>
#include <Dictionaries/LibraryDictionarySourceExternal.h>
#include <Core/Block.h>
#include <ext/bit_cast.h>
#include <ext/range.h>
namespace DB
{
class CStringsHolder
{
public:
using Container = std::vector<std::string>;
explicit CStringsHolder(const Container & strings_pass)
{
strings_holder = strings_pass;
strings.size = strings_holder.size();
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
strings.data = ptr_holder.get();
size_t i = 0;
for (auto & str : strings_holder)
{
strings.data[i] = str.c_str();
++i;
}
}
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
Container strings_holder;
};
}

View File

@ -0,0 +1,219 @@
#include "SharedLibraryHandler.h"
#include <ext/scope_guard.h>
#include <IO/ReadHelpers.h>
#include <common/find_symbols.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
SharedLibraryHandler::SharedLibraryHandler(
const std::string & library_path_,
const std::vector<std::string> & library_settings,
const Block & sample_block_,
const std::vector<std::string> & attributes_names_)
: library_path(library_path_)
, sample_block(sample_block_)
, attributes_names(attributes_names_)
{
library = std::make_shared<SharedLibrary>(library_path, RTLD_LAZY);
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(library_settings));
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
else
throw Exception("Method libNew failed", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
}
SharedLibraryHandler::SharedLibraryHandler(const SharedLibraryHandler & other)
: library_path{other.library_path}
, sample_block{other.sample_block}
, attributes_names{other.attributes_names}
, library{other.library}
, settings_holder{other.settings_holder}
{
auto lib_clone = library->tryGet<ClickHouseLibrary::LibraryCloneFunc>(ClickHouseLibrary::LIBRARY_CLONE_FUNC_NAME);
if (lib_clone)
{
lib_data = lib_clone(other.lib_data);
}
else
{
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
}
}
SharedLibraryHandler::~SharedLibraryHandler()
{
auto lib_delete = library->tryGet<ClickHouseLibrary::LibraryDeleteFunc>(ClickHouseLibrary::LIBRARY_DELETE_FUNC_NAME);
if (lib_delete)
lib_delete(lib_data);
}
bool SharedLibraryHandler::isModified()
{
auto func_is_modified = library->tryGet<ClickHouseLibrary::LibraryIsModifiedFunc>(ClickHouseLibrary::LIBRARY_IS_MODIFIED_FUNC_NAME);
if (func_is_modified)
return func_is_modified(lib_data, &settings_holder->strings);
return true;
}
bool SharedLibraryHandler::supportsSelectiveLoad()
{
auto func_supports_selective_load = library->tryGet<ClickHouseLibrary::LibrarySupportsSelectiveLoadFunc>(ClickHouseLibrary::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME);
if (func_supports_selective_load)
return func_supports_selective_load(lib_data, &settings_holder->strings);
return true;
}
BlockInputStreamPtr SharedLibraryHandler::loadAll()
{
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
for (size_t i = 0; i < attributes_names.size(); ++i)
columns.data[i] = attributes_names[i].c_str();
auto load_all_func = library->get<ClickHouseLibrary::LibraryLoadAllFunc>(ClickHouseLibrary::LIBRARY_LOAD_ALL_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
auto block = dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
}
BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
{
const ClickHouseLibrary::VectorUInt64 ids_data{ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
auto load_ids_func = library->get<ClickHouseLibrary::LibraryLoadIdsFunc>(ClickHouseLibrary::LIBRARY_LOAD_IDS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
auto block = dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
}
BlockInputStreamPtr SharedLibraryHandler::loadKeys(const Columns & key_columns)
{
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(key_columns[i]->size());
for (size_t j = 0; j < key_columns[i]->size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(j);
cell_holder[j] = ClickHouseLibrary::Field{
.data = static_cast<const void *>(data_ref.data),
.size = data_ref.size};
}
holder[i] = ClickHouseLibrary::Row{
.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()),
.size = key_columns[i]->size()};
column_data_holders.push_back(std::move(cell_holder));
}
ClickHouseLibrary::Table request_cols{
.data = static_cast<ClickHouseLibrary::Row *>(holder.get()),
.size = key_columns.size()};
auto load_keys_func = library->get<ClickHouseLibrary::LibraryLoadKeysFunc>(ClickHouseLibrary::LIBRARY_LOAD_KEYS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
auto block = dataToBlock(data);
return std::make_shared<OneBlockInputStream>(block);
}
Block SharedLibraryHandler::dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <Common/SharedLibrary.h>
#include <common/logger_useful.h>
#include <DataStreams/OneBlockInputStream.h>
#include "LibraryUtils.h"
namespace DB
{
/// A class that manages all operations with library dictionary.
/// Every library dictionary source has its own object of this class, accessed by UUID.
class SharedLibraryHandler
{
public:
SharedLibraryHandler(
const std::string & library_path_,
const std::vector<std::string> & library_settings,
const Block & sample_block_,
const std::vector<std::string> & attributes_names_);
SharedLibraryHandler(const SharedLibraryHandler & other);
~SharedLibraryHandler();
BlockInputStreamPtr loadAll();
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids);
BlockInputStreamPtr loadKeys(const Columns & key_columns);
bool isModified();
bool supportsSelectiveLoad();
const Block & getSampleBlock() { return sample_block; }
private:
Block dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data);
std::string library_path;
const Block sample_block;
std::vector<std::string> attributes_names;
SharedLibraryPtr library;
std::shared_ptr<CStringsHolder> settings_holder;
void * lib_data;
};
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
}

View File

@ -0,0 +1,67 @@
#include "SharedLibraryHandlerFactory.h"
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
auto library_handler = library_handlers.find(dictionary_id);
if (library_handler != library_handlers.end())
return library_handler->second;
return nullptr;
}
void SharedLibraryHandlerFactory::create(
const std::string & dictionary_id,
const std::string & library_path,
const std::vector<std::string> & library_settings,
const Block & sample_block,
const std::vector<std::string> & attributes_names)
{
std::lock_guard lock(mutex);
library_handlers[dictionary_id] = std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names);
}
void SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
{
std::lock_guard lock(mutex);
auto from_library_handler = library_handlers.find(from_dictionary_id);
/// This is not supposed to happen as libClone is called from copy constructor of LibraryDictionarySource
/// object, and shared library handler of from_dictionary is removed only in its destructor.
/// And if for from_dictionary there was no shared library handler, it would have received and exception in
/// its constructor, so no libClone would be made from it.
if (from_library_handler == library_handlers.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No shared library handler found");
/// libClone method will be called in copy constructor
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
}
void SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
/// libDelete is called in destructor.
library_handlers.erase(dictionary_id);
}
SharedLibraryHandlerFactory & SharedLibraryHandlerFactory::instance()
{
static SharedLibraryHandlerFactory ret;
return ret;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include "SharedLibraryHandler.h"
#include <unordered_map>
#include <mutex>
namespace DB
{
/// Each library dictionary source has unique UUID. When clone() method is called, a new UUID is generated.
/// There is a unique mapping from diciotnary UUID to sharedLibraryHandler.
class SharedLibraryHandlerFactory final : private boost::noncopyable
{
public:
static SharedLibraryHandlerFactory & instance();
SharedLibraryHandlerPtr get(const std::string & dictionary_id);
void create(
const std::string & dictionary_id,
const std::string & library_path,
const std::vector<std::string> & library_settings,
const Block & sample_block,
const std::vector<std::string> & attributes_names);
void clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
void remove(const std::string & dictionary_id);
private:
/// map: dict_id -> sharedLibraryHandler
std::unordered_map<std::string, SharedLibraryHandlerPtr> library_handlers;
std::mutex mutex;
};
}

View File

@ -0,0 +1,3 @@
int mainEntryClickHouseLibraryBridge(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseLibraryBridge(argc_, argv_); }

View File

@ -0,0 +1,66 @@
#include <Dictionaries/LibraryDictionarySourceExternal.h>
#include <common/logger_useful.h>
namespace
{
const char DICT_LOGGER_NAME[] = "LibraryDictionarySourceExternal";
}
namespace ClickHouseLibrary
{
std::string_view LIBRARY_CREATE_NEW_FUNC_NAME = "ClickHouseDictionary_v3_libNew";
std::string_view LIBRARY_CLONE_FUNC_NAME = "ClickHouseDictionary_v3_libClone";
std::string_view LIBRARY_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_libDelete";
std::string_view LIBRARY_DATA_NEW_FUNC_NAME = "ClickHouseDictionary_v3_dataNew";
std::string_view LIBRARY_DATA_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_dataDelete";
std::string_view LIBRARY_LOAD_ALL_FUNC_NAME = "ClickHouseDictionary_v3_loadAll";
std::string_view LIBRARY_LOAD_IDS_FUNC_NAME = "ClickHouseDictionary_v3_loadIds";
std::string_view LIBRARY_LOAD_KEYS_FUNC_NAME = "ClickHouseDictionary_v3_loadKeys";
std::string_view LIBRARY_IS_MODIFIED_FUNC_NAME = "ClickHouseDictionary_v3_isModified";
std::string_view LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME = "ClickHouseDictionary_v3_supportsSelectiveLoad";
void log(LogLevel level, CString msg)
{
auto & logger = Poco::Logger::get(DICT_LOGGER_NAME);
switch (level)
{
case LogLevel::TRACE:
if (logger.trace())
logger.trace(msg);
break;
case LogLevel::DEBUG:
if (logger.debug())
logger.debug(msg);
break;
case LogLevel::INFORMATION:
if (logger.information())
logger.information(msg);
break;
case LogLevel::NOTICE:
if (logger.notice())
logger.notice(msg);
break;
case LogLevel::WARNING:
if (logger.warning())
logger.warning(msg);
break;
case LogLevel::ERROR:
if (logger.error())
logger.error(msg);
break;
case LogLevel::CRITICAL:
if (logger.critical())
logger.critical(msg);
break;
case LogLevel::FATAL:
if (logger.fatal())
logger.fatal(msg);
break;
}
}
}

View File

@ -24,6 +24,7 @@ add_executable(clickhouse-odbc-bridge ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
target_link_libraries(clickhouse-odbc-bridge PRIVATE
daemon
dbms
bridge
clickhouse_parsers
Poco::Data
Poco::Data::ODBC

View File

@ -8,7 +8,7 @@
namespace DB
{
std::unique_ptr<HTTPRequestHandler> HandlerFactory::createRequestHandler(const HTTPServerRequest & request)
std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_TRACE(log, "Request URI: {}", uri.toString());

View File

@ -20,10 +20,10 @@ namespace DB
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public HTTPRequestHandlerFactory
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory
{
public:
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, Context & context_)
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
pool_map = std::make_shared<ODBCHandler::PoolMap>();
@ -38,4 +38,5 @@ private:
Context & context;
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
};
}

View File

@ -1,244 +1,4 @@
#include "ODBCBridge.h"
#include "HandlerFactory.h"
#include <string>
#include <errno.h>
#include <IO/ReadHelpers.h>
#include <boost/program_options.hpp>
#if USE_ODBC
// It doesn't make much sense to build this bridge without ODBC, but we still do this.
# include <Poco/Data/ODBC/Connector.h>
#endif
#include <Poco/Net/NetException.h>
#include <Poco/String.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config.h>
#include <Formats/registerFormats.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <ext/range.h>
#include <Common/SensitiveDataMasker.h>
#include <Server/HTTP/HTTPServer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
}
}
void ODBCBridge::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter help_formatter(options());
help_formatter.setCommand(commandName());
help_formatter.setHeader("HTTP-proxy for odbc requests");
help_formatter.setUsage("--http-port <port>");
help_formatter.format(std::cerr);
stopOptionsProcessing();
}
void ODBCBridge::defineOptions(Poco::Util::OptionSet & options)
{
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
options.addOption(
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
options.addOption(
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
.argument("max-server-connections")
.binding("max-server-connections"));
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
.argument("keep-alive-timeout")
.binding("keep-alive-timeout"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
options.addOption(
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
.argument("err-log-path")
.binding("logger.errorlog"));
options.addOption(Poco::Util::Option("stdout-path", "", "stdout log path, default console")
.argument("stdout-path")
.binding("logger.stdout"));
options.addOption(Poco::Util::Option("stderr-path", "", "stderr log path, default console")
.argument("stderr-path")
.binding("logger.stderr"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message")
.binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
}
void ODBCBridge::initialize(Application & self)
{
BaseDaemon::closeFDs();
is_help = config().has("help");
if (is_help)
return;
config().setString("logger", "ODBCBridge");
/// Redirect stdout, stderr to specified files.
/// Some libraries and sanitizers write to stderr in case of errors.
const auto stdout_path = config().getString("logger.stdout", "");
if (!stdout_path.empty())
{
if (!freopen(stdout_path.c_str(), "a+", stdout))
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
/// Disable buffering for stdout.
setbuf(stdout, nullptr);
}
const auto stderr_path = config().getString("logger.stderr", "");
if (!stderr_path.empty())
{
if (!freopen(stderr_path.c_str(), "a+", stderr))
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
/// Disable buffering for stderr.
setbuf(stderr, nullptr);
}
buildLoggers(config(), logger(), self.commandName());
BaseDaemon::logRevision();
log = &logger();
hostname = config().getString("listen-host", "127.0.0.1");
port = config().getUInt("http-port");
if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing();
#if USE_ODBC
// It doesn't make much sense to build this bridge without ODBC, but we
// still do this.
Poco::Data::ODBC::Connector::registerConnector();
#endif
ServerApplication::initialize(self); // NOLINT
}
void ODBCBridge::uninitialize()
{
BaseDaemon::uninitialize();
}
int ODBCBridge::main(const std::vector<std::string> & /*args*/)
{
if (is_help)
return Application::EXIT_OK;
registerFormats();
LOG_INFO(log, "Starting up");
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
auto shared_context = Context::createShared();
Context context(Context::createGlobal(shared_context.get()));
context.makeGlobalContext();
if (config().has("query_masking_rules"))
{
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
}
auto server = HTTPServer(
context,
std::make_shared<HandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context),
server_pool,
socket,
http_params);
server.start();
LOG_INFO(log, "Listening http://{}", address.toString());
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
server.stop();
for (size_t count : ext::range(1, 6))
{
if (server.currentConnections() == 0)
break;
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
waitForTerminationRequest();
return Application::EXIT_OK;
}
}
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseODBCBridge(int argc, char ** argv)

View File

@ -2,38 +2,25 @@
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <daemon/BaseDaemon.h>
#include <bridge/IBridge.h>
#include "HandlerFactory.h"
namespace DB
{
/** Class represents clickhouse-odbc-bridge server, which listen
* incoming HTTP POST and GET requests on specified port and host.
* Has two handlers '/' for all incoming POST requests to ODBC driver
* and /ping for GET request about service status
*/
class ODBCBridge : public BaseDaemon
class ODBCBridge : public IBridge
{
public:
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
void initialize(Application & self) override;
const std::string bridgeName() const override
{
return "ODBCBridge";
}
void uninitialize() override;
int main(const std::vector<std::string> & args) override;
private:
void handleHelp(const std::string &, const std::string &);
bool is_help;
std::string hostname;
size_t port;
size_t http_timeout;
std::string log_level;
size_t max_server_connections;
size_t keep_alive_timeout;
Poco::Logger * log;
HandlerFactoryPtr getHandlerFactoryPtr(Context & context) const override
{
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
}
};
}

View File

@ -0,0 +1,132 @@
#include "IBridgeHelper.h"
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
Poco::URI IBridgeHelper::getMainURI() const
{
auto uri = createBaseURI();
uri.setPath(MAIN_HANDLER);
return uri;
}
Poco::URI IBridgeHelper::getPingURI() const
{
auto uri = createBaseURI();
uri.setPath(PING_HANDLER);
return uri;
}
bool IBridgeHelper::checkBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(
getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void IBridgeHelper::startBridgeSync() const
{
if (!checkBridgeIsRunning())
{
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
startBridge(startBridgeCommand());
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
if (checkBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception("BridgeHelper: " + serviceAlias() + " is not responding",
ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand() const
{
if (startBridgeManually())
throw Exception(serviceAlias() + " is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
const auto & config = getConfig();
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
std::vector<std::string> cmd_args;
path.setFileName(serviceFileName());
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", getDefaultPort())));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(getHTTPTimeout().totalMicroseconds()));
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
LOG_TRACE(getLog(), "Starting {}", serviceAlias());
return ShellCommand::executeDirect(path.toString(), cmd_args, ShellCommandDestructorStrategy(true));
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/ShellCommand.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
namespace DB
{
/// Common base class for XDBC and Library bridge helpers.
/// Contains helper methods to check/start bridge sync.
class IBridgeHelper
{
public:
static constexpr inline auto DEFAULT_HOST = "127.0.0.1";
static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
static const inline std::string PING_METHOD = Poco::Net::HTTPRequest::HTTP_GET;
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
virtual ~IBridgeHelper() = default;
void startBridgeSync() const;
Poco::URI getMainURI() const;
Poco::URI getPingURI() const;
protected:
/// clickhouse-odbc-bridge, clickhouse-library-bridge
virtual const String serviceAlias() const = 0;
virtual const String serviceFileName() const = 0;
virtual size_t getDefaultPort() const = 0;
virtual bool startBridgeManually() const = 0;
virtual void startBridge(std::unique_ptr<ShellCommand> cmd) const = 0;
virtual const String configPrefix() const = 0;
virtual const Context & getContext() const = 0;
virtual const Poco::Util::AbstractConfiguration & getConfig() const = 0;
virtual Poco::Logger * getLog() const = 0;
virtual const Poco::Timespan & getHTTPTimeout() const = 0;
virtual Poco::URI createBaseURI() const = 0;
private:
bool checkBridgeIsRunning() const;
std::unique_ptr<ShellCommand> startBridgeCommand() const;
};
}

View File

@ -0,0 +1,183 @@
#include "LibraryBridgeHelper.h"
#include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <Formats/FormatFactory.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <Core/Field.h>
#include <Common/escapeForFileName.h>
namespace DB
{
LibraryBridgeHelper::LibraryBridgeHelper(
const Context & context_,
const Block & sample_block_,
const Field & dictionary_id_)
: log(&Poco::Logger::get("LibraryBridgeHelper"))
, context(context_)
, sample_block(sample_block_)
, config(context.getConfigRef())
, http_timeout(context.getSettingsRef().http_receive_timeout.value.totalSeconds())
, dictionary_id(dictionary_id_)
{
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
}
Poco::URI LibraryBridgeHelper::createRequestURI(const String & method) const
{
auto uri = getMainURI();
uri.addQueryParameter("dictionary_id", toString(dictionary_id));
uri.addQueryParameter("method", method);
return uri;
}
Poco::URI LibraryBridgeHelper::createBaseURI() const
{
Poco::URI uri;
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
return uri;
}
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
{
context.addBridgeCommand(std::move(cmd));
}
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names)
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
/// Sample block must contain null values
WriteBufferFromOwnString out;
auto output_stream = context.getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
formatBlock(output_stream, sample_block);
auto block_string = out.str();
auto out_stream_callback = [library_path, library_settings, attributes_names, block_string, this](std::ostream & os)
{
os << "library_path=" << escapeForFileName(library_path) << "&";
os << "library_settings=" << escapeForFileName(library_settings) << "&";
os << "attributes_names=" << escapeForFileName(attributes_names) << "&";
os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
os << "null_values=" << escapeForFileName(block_string);
};
return executeRequest(uri, out_stream_callback);
}
bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
{
startBridgeSync();
auto uri = createRequestURI(LIB_CLONE_METHOD);
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
return executeRequest(uri);
}
bool LibraryBridgeHelper::removeLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_DELETE_METHOD);
return executeRequest(uri);
}
bool LibraryBridgeHelper::isModified()
{
startBridgeSync();
auto uri = createRequestURI(IS_MODIFIED_METHOD);
return executeRequest(uri);
}
bool LibraryBridgeHelper::supportsSelectiveLoad()
{
startBridgeSync();
auto uri = createRequestURI(SUPPORTS_SELECTIVE_LOAD_METHOD);
return executeRequest(uri);
}
BlockInputStreamPtr LibraryBridgeHelper::loadAll()
{
startBridgeSync();
auto uri = createRequestURI(LOAD_ALL_METHOD);
return loadBase(uri);
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
return loadBase(uri, [ids_string](std::ostream & os) { os << "ids=" << ids_string; });
}
BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_KEYS_METHOD);
/// Sample block to parse block from callback
uri.addQueryParameter("requested_block_sample", requested_block.getNamesAndTypesList().toString());
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
{
WriteBufferFromOStream out_buffer(os);
auto output_stream = context.getOutputStream(
LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, sample_block);
formatBlock(output_stream, requested_block);
};
return loadBase(uri, out_stream_callback);
}
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
{
ReadWriteBufferFromHTTP buf(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(context));
bool res;
readBoolText(res, buf);
return res;
}
BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
{
auto read_buf_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
ConnectionTimeouts::getHTTPTimeouts(context),
0,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
auto input_stream = context.getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
}
}

View File

@ -0,0 +1,88 @@
#pragma once
#include <Interpreters/Context.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
#include <Bridge/IBridgeHelper.h>
namespace DB
{
class LibraryBridgeHelper : public IBridgeHelper
{
public:
static constexpr inline size_t DEFAULT_PORT = 9012;
LibraryBridgeHelper(const Context & context_, const Block & sample_block, const Field & dictionary_id_);
bool initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names);
bool cloneLibrary(const Field & other_dictionary_id);
bool removeLibrary();
bool isModified();
bool supportsSelectiveLoad();
BlockInputStreamPtr loadAll();
BlockInputStreamPtr loadIds(const std::string ids_string);
BlockInputStreamPtr loadKeys(const Block & requested_block);
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
protected:
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
const String serviceAlias() const override { return "clickhouse-library-bridge"; }
const String serviceFileName() const override { return serviceAlias(); }
size_t getDefaultPort() const override { return DEFAULT_PORT; }
bool startBridgeManually() const override { return false; }
const String configPrefix() const override { return "library_bridge"; }
const Context & getContext() const override { return context; }
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
Poco::Logger * getLog() const override { return log; }
const Poco::Timespan & getHTTPTimeout() const override { return http_timeout; }
Poco::URI createBaseURI() const override;
private:
static constexpr inline auto LIB_NEW_METHOD = "libNew";
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
static constexpr inline auto LIB_DELETE_METHOD = "libDelete";
static constexpr inline auto LOAD_ALL_METHOD = "loadAll";
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
Poco::URI createRequestURI(const String & method) const;
Poco::Logger * log;
const Context & context;
const Block sample_block;
const Poco::Util::AbstractConfiguration & config;
const Poco::Timespan http_timeout;
Field dictionary_id;
std::string bridge_host;
size_t bridge_port;
};
}

View File

@ -0,0 +1,266 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Interpreters/Context.h>
#include <Access/AccessType.h>
#include <Parsers/IdentifierQuotingStyle.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <Bridge/IBridgeHelper.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/// Class for Helpers for XDBC-bridges, provide utility methods, not main request.
class IXDBCBridgeHelper : public IBridgeHelper
{
public:
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual const String getName() const = 0;
};
using BridgeHelperPtr = std::shared_ptr<IXDBCBridgeHelper>;
template <typename BridgeHelperMixin>
class XDBCBridgeHelper : public IXDBCBridgeHelper
{
public:
static constexpr inline auto DEFAULT_PORT = BridgeHelperMixin::DEFAULT_PORT;
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
XDBCBridgeHelper(
const Context & global_context_,
const Poco::Timespan & http_timeout_,
const std::string & connection_string_)
: log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, context(global_context_)
, config(context.getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
protected:
auto getConnectionString() const { return connection_string; }
const String getName() const override { return BridgeHelperMixin::getName(); }
size_t getDefaultPort() const override { return DEFAULT_PORT; }
const String serviceAlias() const override { return BridgeHelperMixin::serviceAlias(); }
/// Same for odbc and jdbc
const String serviceFileName() const override { return "clickhouse-odbc-bridge"; }
const String configPrefix() const override { return BridgeHelperMixin::configPrefix(); }
const Context & getContext() const override { return context; }
const Poco::Timespan & getHTTPTimeout() const override { return http_timeout; }
const Poco::Util::AbstractConfiguration & getConfig() const override { return config; }
Poco::Logger * getLog() const override { return log; }
bool startBridgeManually() const override { return BridgeHelperMixin::startBridgeManually(); }
Poco::URI createBaseURI() const override
{
Poco::URI uri;
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
return uri;
}
void startBridge(std::unique_ptr<ShellCommand> cmd) const override
{
context.addBridgeCommand(std::move(cmd));
}
private:
using Configuration = Poco::Util::AbstractConfiguration;
Poco::Logger * log;
std::string connection_string;
const Poco::Timespan & http_timeout;
std::string bridge_host;
size_t bridge_port;
const Context & context;
const Configuration & config;
std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
Poco::URI getColumnsInfoURI() const override
{
auto uri = createBaseURI();
uri.setPath(COL_INFO_HANDLER);
return uri;
}
URLParams getURLParams(const std::string & cols, UInt64 max_block_size) const override
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols);
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;
}
bool isSchemaAllowed() override
{
if (!is_schema_allowed.has_value())
{
startBridgeSync();
auto uri = createBaseURI();
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
bool res;
readBoolText(res, buf);
is_schema_allowed = res;
}
return *is_schema_allowed;
}
IdentifierQuotingStyle getIdentifierQuotingStyle() override
{
if (!quote_style.has_value())
{
startBridgeSync();
auto uri = createBaseURI();
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
std::string character;
readStringBinary(character, buf);
if (character.length() > 1)
throw Exception("Failed to parse quoting style from '" + character + "' for service " + BridgeHelperMixin::serviceAlias(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (character.length() == 0)
quote_style = IdentifierQuotingStyle::None;
else if (character[0] == '`')
quote_style = IdentifierQuotingStyle::Backticks;
else if (character[0] == '"')
quote_style = IdentifierQuotingStyle::DoubleQuotes;
else
throw Exception("Can not map quote identifier '" + character + "' to enum value", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return *quote_style;
}
};
struct JDBCBridgeMixin
{
static constexpr inline auto DEFAULT_PORT = 9019;
static const String configPrefix()
{
return "jdbc_bridge";
}
static const String serviceAlias()
{
return "clickhouse-jdbc-bridge";
}
static const String getName()
{
return "JDBC";
}
static AccessType getSourceAccessType()
{
return AccessType::JDBC;
}
static bool startBridgeManually()
{
return true;
}
};
struct ODBCBridgeMixin
{
static constexpr inline auto DEFAULT_PORT = 9018;
static const String configPrefix()
{
return "odbc_bridge";
}
static const String serviceAlias()
{
return "clickhouse-odbc-bridge";
}
static const String getName()
{
return "ODBC";
}
static AccessType getSourceAccessType()
{
return AccessType::ODBC;
}
static bool startBridgeManually()
{
return false;
}
};
}

17
src/Bridge/ya.make Normal file
View File

@ -0,0 +1,17 @@
# This file is generated automatically, do not edit. See 'ya.make.in' and use 'utils/generate-ya-make' to regenerate it.
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
IBridgeHelper.cpp
LibraryBridgeHelper.cpp
)
END()

14
src/Bridge/ya.make.in Normal file
View File

@ -0,0 +1,14 @@
OWNER(g:clickhouse)
LIBRARY()
PEERDIR(
clickhouse/src/Common
)
SRCS(
<? find . -name '*.cpp' | grep -v -F tests | sed 's/^\.\// /' | sort ?>
)
END()

View File

@ -190,6 +190,7 @@ add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge Bridge)
add_object_library(clickhouse_server Server)
add_object_library(clickhouse_server_http Server/HTTP)
add_object_library(clickhouse_formats Formats)

View File

@ -1,351 +0,0 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Interpreters/Context.h>
#include <Access/AccessType.h>
#include <Parsers/IdentifierQuotingStyle.h>
#include <Poco/File.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
/**
* Class for Helpers for Xdbc-bridges, provide utility methods, not main request
*/
class IXDBCBridgeHelper
{
public:
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
virtual void startBridgeSync() const = 0;
virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual String getName() const = 0;
virtual ~IXDBCBridgeHelper() = default;
};
using BridgeHelperPtr = std::shared_ptr<IXDBCBridgeHelper>;
template <typename BridgeHelperMixin>
class XDBCBridgeHelper : public IXDBCBridgeHelper
{
private:
Poco::Timespan http_timeout;
std::string connection_string;
Poco::URI ping_url;
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");
std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;
protected:
auto getConnectionString() const
{
return connection_string;
}
public:
using Configuration = Poco::Util::AbstractConfiguration;
const Context & context;
const Configuration & config;
static constexpr inline auto DEFAULT_HOST = "127.0.0.1";
static constexpr inline auto DEFAULT_PORT = BridgeHelperMixin::DEFAULT_PORT;
static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
: http_timeout(http_timeout_), connection_string(connection_string_), context(global_context_), config(context.getConfigRef())
{
size_t bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
std::string bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
ping_url.setHost(bridge_host);
ping_url.setPort(bridge_port);
ping_url.setScheme("http");
ping_url.setPath(PING_HANDLER);
}
String getName() const override
{
return BridgeHelperMixin::getName();
}
IdentifierQuotingStyle getIdentifierQuotingStyle() override
{
if (!quote_style.has_value())
{
startBridgeSync();
auto uri = createBaseURI();
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
std::string character;
readStringBinary(character, buf);
if (character.length() > 1)
throw Exception("Failed to parse quoting style from '" + character + "' for service " + BridgeHelperMixin::serviceAlias(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
else if (character.length() == 0)
quote_style = IdentifierQuotingStyle::None;
else if (character[0] == '`')
quote_style = IdentifierQuotingStyle::Backticks;
else if (character[0] == '"')
quote_style = IdentifierQuotingStyle::DoubleQuotes;
else
throw Exception("Can not map quote identifier '" + character + "' to enum value", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
return *quote_style;
}
bool isSchemaAllowed() override
{
if (!is_schema_allowed.has_value())
{
startBridgeSync();
auto uri = createBaseURI();
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
bool res;
readBoolText(res, buf);
is_schema_allowed = res;
}
return *is_schema_allowed;
}
/**
* @todo leaky abstraction - used by external API's
*/
std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const override
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols);
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;
}
/**
* Performs spawn of external daemon
*/
void startBridgeSync() const override
{
if (!checkBridgeIsRunning())
{
LOG_TRACE(log, "{} is not running, will try to start it", BridgeHelperMixin::serviceAlias());
startBridge();
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(log, "Checking {} is running, try {}", BridgeHelperMixin::serviceAlias(), counter);
if (checkBridgeIsRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception(BridgeHelperMixin::getName() + "BridgeHelper: " + BridgeHelperMixin::serviceAlias() + " is not responding",
ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
/**
* URI to fetch the data from external service
*/
Poco::URI getMainURI() const override
{
auto uri = createBaseURI();
uri.setPath(MAIN_HANDLER);
return uri;
}
/**
* URI to retrieve column description from external service
*/
Poco::URI getColumnsInfoURI() const override
{
auto uri = createBaseURI();
uri.setPath(COL_INFO_HANDLER);
return uri;
}
protected:
Poco::URI createBaseURI() const
{
Poco::URI uri;
uri.setHost(ping_url.getHost());
uri.setPort(ping_url.getPort());
uri.setScheme("http");
return uri;
}
private:
bool checkBridgeIsRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(
ping_url, Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(context));
return checkString(XDBCBridgeHelper::PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
/* Contains logic for instantiation of the bridge instance */
void startBridge() const
{
auto cmd = BridgeHelperMixin::startBridge(config, log, http_timeout);
context.addXDBCBridgeCommand(std::move(cmd));
}
};
struct JDBCBridgeMixin
{
static constexpr inline auto DEFAULT_PORT = 9019;
static const String configPrefix()
{
return "jdbc_bridge";
}
static const String serviceAlias()
{
return "clickhouse-jdbc-bridge";
}
static const String getName()
{
return "JDBC";
}
static AccessType getSourceAccessType()
{
return AccessType::JDBC;
}
static std::unique_ptr<ShellCommand> startBridge(const Poco::Util::AbstractConfiguration &, const Poco::Logger *, const Poco::Timespan &)
{
throw Exception("jdbc-bridge is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
};
struct ODBCBridgeMixin
{
static constexpr inline auto DEFAULT_PORT = 9018;
static const String configPrefix()
{
return "odbc_bridge";
}
static const String serviceAlias()
{
return "clickhouse-odbc-bridge";
}
static const String getName()
{
return "ODBC";
}
static AccessType getSourceAccessType()
{
return AccessType::ODBC;
}
static std::unique_ptr<ShellCommand> startBridge(
const Poco::Util::AbstractConfiguration & config, Poco::Logger * log, const Poco::Timespan & http_timeout)
{
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
std::vector<std::string> cmd_args;
path.setFileName("clickhouse-odbc-bridge");
#if !CLICKHOUSE_SPLIT_BINARY
cmd_args.push_back("odbc-bridge");
#endif
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt(configPrefix() + ".port", DEFAULT_PORT)));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString(configPrefix() + ".listen_host", XDBCBridgeHelper<ODBCBridgeMixin>::DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(http_timeout.totalMicroseconds()));
if (config.has("logger." + configPrefix() + "_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_log"));
}
if (config.has("logger." + configPrefix() + "_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_errlog"));
}
if (config.has("logger." + configPrefix() + "_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stdout"));
}
if (config.has("logger." + configPrefix() + "_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_stderr"));
}
if (config.has("logger." + configPrefix() + "_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger." + configPrefix() + "_level"));
}
LOG_TRACE(log, "Starting {}", serviceAlias());
return ShellCommand::executeDirect(path.toString(), cmd_args, ShellCommandDestructorStrategy(true));
}
};
}

View File

@ -1,4 +1,5 @@
#include "LibraryDictionarySource.h"
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
@ -6,294 +7,171 @@
#include <ext/bit_cast.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
#include <Common/StringUtils/StringUtils.h>
#include "DictionarySourceFactory.h"
#include "DictionarySourceHelpers.h"
#include "DictionaryStructure.h"
#include "LibraryDictionarySourceExternal.h"
#include "registerDictionaries.h"
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int FILE_DOESNT_EXIST;
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int PATH_ACCESS_DENIED;
}
class CStringsHolder
{
public:
using Container = std::vector<std::string>;
explicit CStringsHolder(const Container & strings_pass)
{
strings_holder = strings_pass;
strings.size = strings_holder.size();
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
strings.data = ptr_holder.get();
size_t i = 0;
for (auto & str : strings_holder)
{
strings.data[i] = str.c_str();
++i;
}
}
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
Container strings_holder;
};
namespace
{
constexpr auto lib_config_settings = ".settings";
CStringsHolder getLibSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
CStringsHolder::Container strings;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
strings.emplace_back(key_name);
strings.emplace_back(config.getString(config_root + "." + key));
}
return CStringsHolder(strings);
}
Block dataToBlock(const Block & sample_block, const ClickHouseLibrary::RawClickHouseLibraryTable data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " "
+ (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size)
+ ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}
LibraryDictionarySource::LibraryDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix_,
Block & sample_block_,
const Context & context,
const Context & context_,
bool check_config)
: log(&Poco::Logger::get("LibraryDictionarySource"))
, dict_struct{dict_struct_}
, config_prefix{config_prefix_}
, path{config.getString(config_prefix + ".path", "")}
, dictionary_id(getDictID())
, sample_block{sample_block_}
, context(context_)
{
if (check_config)
{
const String dictionaries_lib_path = context.getDictionariesLibPath();
if (!startsWith(path, dictionaries_lib_path))
throw Exception("LibraryDictionarySource: Library path " + path + " is not inside " + dictionaries_lib_path, ErrorCodes::PATH_ACCESS_DENIED);
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "LibraryDictionarySource: Library path {} is not inside {}", path, dictionaries_lib_path);
}
if (!Poco::File(path).exists())
throw Exception(
"LibraryDictionarySource: Can't load library " + Poco::File(path).path() + ": file doesn't exist",
ErrorCodes::FILE_DOESNT_EXIST);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", Poco::File(path).path());
description.init(sample_block);
library = std::make_shared<SharedLibrary>(path, RTLD_LAZY
#if defined(RTLD_DEEPBIND) && !defined(ADDRESS_SANITIZER) // Does not exists in FreeBSD. Cannot work with Address Sanitizer.
| RTLD_DEEPBIND
#endif
);
settings = std::make_shared<CStringsHolder>(getLibSettings(config, config_prefix + lib_config_settings));
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"), getDictAttributesString());
if (auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME))
lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
if (!res)
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
}
LibraryDictionarySource::~LibraryDictionarySource()
{
bridge_helper->removeLibrary();
}
LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other)
: log(&Poco::Logger::get("LibraryDictionarySource"))
, dict_struct{other.dict_struct}
, config_prefix{other.config_prefix}
, path{other.path}
, dictionary_id{getDictID()}
, sample_block{other.sample_block}
, library{other.library}
, context(other.context)
, description{other.description}
, settings{other.settings}
{
if (auto lib_clone = library->tryGet<ClickHouseLibrary::LibraryCloneFunc>(ClickHouseLibrary::LIBRARY_CLONE_FUNC_NAME))
lib_data = lib_clone(other.lib_data);
else if (auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME))
lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
bridge_helper->cloneLibrary(other.dictionary_id);
}
LibraryDictionarySource::~LibraryDictionarySource()
bool LibraryDictionarySource::isModified() const
{
if (auto lib_delete = library->tryGet<ClickHouseLibrary::LibraryDeleteFunc>(ClickHouseLibrary::LIBRARY_DELETE_FUNC_NAME))
lib_delete(lib_data);
return bridge_helper->isModified();
}
bool LibraryDictionarySource::supportsSelectiveLoad() const
{
return bridge_helper->supportsSelectiveLoad();
}
BlockInputStreamPtr LibraryDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll {}", toString());
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_struct.attributes.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()),
dict_struct.attributes.size()};
size_t i = 0;
for (const auto & a : dict_struct.attributes)
{
columns.data[i] = a.name.c_str();
++i;
}
auto load_all_func = library->get<ClickHouseLibrary::LibraryLoadAllFunc>(ClickHouseLibrary::LIBRARY_LOAD_ALL_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings->strings, &columns);
auto block = dataToBlock(description.sample_block, data);
return std::make_shared<OneBlockInputStream>(block);
return bridge_helper->loadAll();
}
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
const ClickHouseLibrary::VectorUInt64 ids_data{ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_struct.attributes.size());
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()),
dict_struct.attributes.size()};
size_t i = 0;
for (const auto & a : dict_struct.attributes)
{
columns_pass.data[i] = a.name.c_str();
++i;
}
auto load_ids_func = library->get<ClickHouseLibrary::LibraryLoadIdsFunc>(ClickHouseLibrary::LIBRARY_LOAD_IDS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings->strings, &columns_pass, &ids_data);
auto block = dataToBlock(description.sample_block, data);
return std::make_shared<OneBlockInputStream>(block);
return bridge_helper->loadIds(getDictIdsString(ids));
}
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(requested_rows.size());
for (size_t j = 0; j < requested_rows.size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(requested_rows[j]);
cell_holder[j] = ClickHouseLibrary::Field{.data = static_cast<const void *>(data_ref.data), .size = data_ref.size};
}
holder[i]
= ClickHouseLibrary::Row{.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()), .size = requested_rows.size()};
column_data_holders.push_back(std::move(cell_holder));
}
ClickHouseLibrary::Table request_cols{.data = static_cast<ClickHouseLibrary::Row *>(holder.get()), .size = key_columns.size()};
auto load_keys_func = library->get<ClickHouseLibrary::LibraryLoadKeysFunc>(ClickHouseLibrary::LIBRARY_LOAD_KEYS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings->strings, &request_cols);
auto block = dataToBlock(description.sample_block, data);
return std::make_shared<OneBlockInputStream>(block);
auto block = blockForKeys(dict_struct, key_columns, requested_rows);
return bridge_helper->loadKeys(block);
}
bool LibraryDictionarySource::isModified() const
{
if (auto func_is_modified = library->tryGet<ClickHouseLibrary::LibraryIsModifiedFunc>(
ClickHouseLibrary::LIBRARY_IS_MODIFIED_FUNC_NAME))
return func_is_modified(lib_data, &settings->strings);
return true;
}
bool LibraryDictionarySource::supportsSelectiveLoad() const
{
if (auto func_supports_selective_load = library->tryGet<ClickHouseLibrary::LibrarySupportsSelectiveLoadFunc>(
ClickHouseLibrary::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME))
return func_supports_selective_load(lib_data, &settings->strings);
return true;
}
DictionarySourcePtr LibraryDictionarySource::clone() const
{
return std::make_unique<LibraryDictionarySource>(*this);
}
std::string LibraryDictionarySource::toString() const
{
return path;
}
String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
WriteBufferFromOwnString out;
std::vector<std::string> settings;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
settings.push_back(key_name);
settings.push_back(config.getString(config_root + "." + key));
}
writeVectorBinary(settings, out);
return out.str();
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
String LibraryDictionarySource::getDictAttributesString()
{
std::vector<String> attributes_names(dict_struct.attributes.size());
for (size_t i = 0; i < dict_struct.attributes.size(); ++i)
attributes_names[i] = dict_struct.attributes[i].name;
WriteBufferFromOwnString out;
writeVectorBinary(attributes_names, out);
return out.str();
}
void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
@ -306,7 +184,9 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
{
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, context, check_config);
};
factory.registerSource("library", create_table_source);
}
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <Common/SharedLibrary.h>
#include <Bridge/LibraryBridgeHelper.h>
#include <common/LocalDateTime.h>
#include <Core/UUID.h>
#include "DictionaryStructure.h"
#include <Core/ExternalResultDescription.h>
#include "IDictionarySource.h"
@ -17,18 +19,17 @@ namespace Util
}
}
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class CStringsHolder;
/// Allows loading dictionaries from dynamic libraries (.so)
/// Experimental version
/// Example: tests/external_dictionaries/dictionary_library/dictionary_library.cpp
class CStringsHolder;
using LibraryBridgeHelperPtr = std::shared_ptr<LibraryBridgeHelper>;
class LibraryDictionarySource final : public IDictionarySource
{
public:
@ -37,7 +38,7 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix_,
Block & sample_block_,
const Context & context,
const Context & context_,
bool check_config);
LibraryDictionarySource(const LibraryDictionarySource & other);
@ -68,18 +69,26 @@ public:
std::string toString() const override;
private:
Poco::Logger * log;
static String getDictIdsString(const std::vector<UInt64> & ids);
LocalDateTime getLastModification() const;
String getDictAttributesString();
static String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);
static Field getDictID() { return UUIDHelpers::generateV4(); }
Poco::Logger * log;
const DictionaryStructure dict_struct;
const std::string config_prefix;
const std::string path;
const Field dictionary_id;
Block sample_block;
SharedLibraryPtr library;
Context context;
LibraryBridgeHelperPtr bridge_helper;
ExternalResultDescription description;
std::shared_ptr<CStringsHolder> settings;
void * lib_data = nullptr;
};
}

View File

@ -11,7 +11,6 @@
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/XDBCBridgeHelper.h>
#include <common/LocalDateTime.h>
#include <common/logger_useful.h>
#include "DictionarySourceFactory.h"

View File

@ -3,7 +3,7 @@
#include <IO/ConnectionTimeouts.h>
#include <Poco/Data/SessionPool.h>
#include <Poco/URI.h>
#include <Common/XDBCBridgeHelper.h>
#include <Bridge/XDBCBridgeHelper.h>
#include "DictionaryStructure.h"
#include "ExternalQueryBuilder.h"
#include "IDictionarySource.h"

View File

@ -2358,7 +2358,7 @@ void Context::setQueryParameter(const String & name, const String & value)
}
void Context::addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
{
auto lock = getLock();
shared->bridge_commands.emplace_back(std::move(cmd));

View File

@ -747,7 +747,7 @@ public:
void setQueryParameters(const NameToNameMap & parameters) { query_parameters = parameters; }
/// Add started bridge command. It will be killed after context destruction
void addXDBCBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;
IHostContextPtr & getHostContext();
const IHostContextPtr & getHostContext() const;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Storages/StorageURL.h>
#include <Common/XDBCBridgeHelper.h>
#include <Bridge/XDBCBridgeHelper.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Storages/StorageXDBC.h>
#include <TableFunctions/ITableFunction.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/XDBCBridgeHelper.h>
#include <Bridge/XDBCBridgeHelper.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>

View File

@ -5,6 +5,7 @@ LIBRARY()
PEERDIR(
clickhouse/src/Access
clickhouse/src/AggregateFunctions
clickhouse/src/Bridge
clickhouse/src/Client
clickhouse/src/Columns
clickhouse/src/Common

View File

@ -15,6 +15,7 @@ MAX_RETRY = 2
SLEEP_BETWEEN_RETRIES = 5
CLICKHOUSE_BINARY_PATH = "/usr/bin/clickhouse"
CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-odbc-bridge"
CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH = "/usr/bin/clickhouse-library-bridge"
TRIES_COUNT = 10
MAX_TIME_SECONDS = 3600
@ -238,10 +239,13 @@ class ClickhouseIntegrationTestsRunner:
logging.info("All packages installed")
os.chmod(CLICKHOUSE_BINARY_PATH, 0o777)
os.chmod(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, 0o777)
os.chmod(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, 0o777)
result_path_bin = os.path.join(str(self.base_path()), "clickhouse")
result_path_bridge = os.path.join(str(self.base_path()), "clickhouse-odbc-bridge")
result_path_odbc_bridge = os.path.join(str(self.base_path()), "clickhouse-odbc-bridge")
result_path_library_bridge = os.path.join(str(self.base_path()), "clickhouse-library-bridge")
shutil.copy(CLICKHOUSE_BINARY_PATH, result_path_bin)
shutil.copy(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, result_path_bridge)
shutil.copy(CLICKHOUSE_ODBC_BRIDGE_BINARY_PATH, result_path_odbc_bridge)
shutil.copy(CLICKHOUSE_LIBRARY_BRIDGE_BINARY_PATH, result_path_library_bridge)
return None, None
def _compress_logs(self, path, result_path):

View File

@ -75,6 +75,15 @@ def get_odbc_bridge_path():
return '/usr/bin/clickhouse-odbc-bridge'
return path
def get_library_bridge_path():
path = os.environ.get('CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH')
if path is None:
server_path = os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH')
if server_path is not None:
return os.path.join(os.path.dirname(server_path), 'clickhouse-library-bridge')
else:
return '/usr/bin/clickhouse-library-bridge'
return path
def get_docker_compose_path():
compose_path = os.environ.get('DOCKER_COMPOSE_DIR')
@ -98,7 +107,7 @@ class ClickHouseCluster:
"""
def __init__(self, base_path, name=None, base_config_dir=None, server_bin_path=None, client_bin_path=None,
odbc_bridge_bin_path=None, zookeeper_config_path=None, custom_dockerd_host=None):
odbc_bridge_bin_path=None, library_bridge_bin_path=None, zookeeper_config_path=None, custom_dockerd_host=None):
for param in list(os.environ.keys()):
print("ENV %40s %s" % (param, os.environ[param]))
self.base_dir = p.dirname(base_path)
@ -109,6 +118,7 @@ class ClickHouseCluster:
self.server_bin_path = p.realpath(
server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse'))
self.odbc_bridge_bin_path = p.realpath(odbc_bridge_bin_path or get_odbc_bridge_path())
self.library_bridge_bin_path = p.realpath(library_bridge_bin_path or get_library_bridge_path())
self.client_bin_path = p.realpath(
client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client'))
self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(
@ -236,6 +246,7 @@ class ClickHouseCluster:
with_cassandra=with_cassandra,
server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
library_bridge_bin_path=self.library_bridge_bin_path,
clickhouse_path_dir=clickhouse_path_dir,
with_odbc_drivers=with_odbc_drivers,
hostname=hostname,
@ -893,6 +904,7 @@ services:
- /etc/passwd:/etc/passwd:ro
{binary_volume}
{odbc_bridge_volume}
{library_bridge_volume}
{odbc_ini_path}
{keytab_path}
{krb5_conf}
@ -930,7 +942,7 @@ class ClickHouseInstance:
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_mysql_cluster, with_kafka, with_kerberized_kafka, with_rabbitmq, with_kerberized_hdfs,
with_mongo, with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test", tag="latest",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None):
@ -954,6 +966,7 @@ class ClickHouseInstance:
self.server_bin_path = server_bin_path
self.odbc_bridge_bin_path = odbc_bridge_bin_path
self.library_bridge_bin_path = library_bridge_bin_path
self.with_mysql = with_mysql
self.with_mysql_cluster = with_mysql_cluster
@ -1422,9 +1435,11 @@ class ClickHouseInstance:
if not self.with_installed_binary:
binary_volume = "- " + self.server_bin_path + ":/usr/bin/clickhouse"
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/bin/clickhouse-odbc-bridge"
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/bin/clickhouse-library-bridge"
else:
binary_volume = "- " + self.server_bin_path + ":/usr/share/clickhouse_fresh"
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/share/clickhouse-library-bridge_fresh"
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
@ -1434,6 +1449,7 @@ class ClickHouseInstance:
hostname=self.hostname,
binary_volume=binary_volume,
odbc_bridge_volume=odbc_bridge_volume,
library_bridge_volume=library_bridge_volume,
instance_config_dir=instance_config_dir,
config_d_dir=self.config_d_dir,
db_dir=db_dir,

View File

@ -33,10 +33,15 @@ def check_args_and_update_paths(args):
if not os.path.isabs(args.binary):
args.binary = os.path.abspath(os.path.join(CURRENT_WORK_DIR, args.binary))
if not args.bridge_binary:
args.bridge_binary = os.path.join(os.path.dirname(args.binary), 'clickhouse-odbc-bridge')
elif not os.path.isabs(args.bridge_binary):
args.bridge_binary = os.path.abspath(os.path.join(CURRENT_WORK_DIR, args.bridge_binary))
if not args.odbc_bridge_binary:
args.odbc_bridge_binary = os.path.join(os.path.dirname(args.binary), 'clickhouse-odbc-bridge')
elif not os.path.isabs(args.odbc_bridge_binary):
args.odbc_bridge_binary = os.path.abspath(os.path.join(CURRENT_WORK_DIR, args.odbc_bridge_binary))
if not args.library_bridge_binary:
args.library_bridge_binary = os.path.join(os.path.dirname(args.binary), 'clickhouse-library-bridge')
elif not os.path.isabs(args.library_bridge_binary):
args.library_bridge_binary = os.path.abspath(os.path.join(CURRENT_WORK_DIR, args.library_bridge_binary))
if args.base_configs_dir:
if not os.path.isabs(args.base_configs_dir):
@ -61,7 +66,7 @@ def check_args_and_update_paths(args):
logging.info("base_configs_dir: {}, binary: {}, cases_dir: {} ".format(args.base_configs_dir, args.binary, args.cases_dir))
for path in [args.binary, args.bridge_binary, args.base_configs_dir, args.cases_dir, CLICKHOUSE_ROOT]:
for path in [args.binary, args.odbc_bridge_binary, args.library_bridge_binary, args.base_configs_dir, args.cases_dir, CLICKHOUSE_ROOT]:
if not os.path.exists(path):
raise Exception("Path {} doesn't exist".format(path))
@ -82,7 +87,8 @@ signal.signal(signal.SIGINT, docker_kill_handler_handler)
# To run integration tests following artfacts should be sufficient:
# - clickhouse binaries (env CLICKHOUSE_TESTS_SERVER_BIN_PATH or --binary arg)
# - clickhouse default configs(config.xml, users.xml) from same version as binary (env CLICKHOUSE_TESTS_BASE_CONFIG_DIR or --base-configs-dir arg)
# - odbc bridge binary (env CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH or --bridge-binary arg)
# - odbc bridge binary (env CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH or --odbc-bridge-binary arg)
# - library bridge binary (env CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH or --library-bridge-binary)
# - tests/integration directory with all test cases and configs (env CLICKHOUSE_TESTS_INTEGRATION_PATH or --cases-dir)
#
# 1) --clickhouse-root is only used to determine other paths on default places
@ -98,10 +104,15 @@ if __name__ == "__main__":
help="Path to clickhouse binary. For example /usr/bin/clickhouse")
parser.add_argument(
"--bridge-binary",
"--odbc-bridge-binary",
default=os.environ.get("CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH", ""),
help="Path to clickhouse-odbc-bridge binary. Defaults to clickhouse-odbc-bridge in the same dir as clickhouse.")
parser.add_argument(
"--library-bridge-binary",
default=os.environ.get("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH", ""),
help="Path to clickhouse-library-bridge binary. Defaults to clickhouse-library-bridge in the same dir as clickhouse.")
parser.add_argument(
"--base-configs-dir",
default=os.environ.get("CLICKHOUSE_TESTS_BASE_CONFIG_DIR"),
@ -185,14 +196,17 @@ if __name__ == "__main__":
if sys.stdout.isatty() and sys.stdin.isatty():
tty = "-it"
cmd = "docker run {net} {tty} --rm --name {name} --privileged --volume={bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \
cmd = "docker run {net} {tty} --rm --name {name} --privileged \
--volume={odbc_bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \
--volume={library_bridge_bin}:/clickhouse-library-bridge --volume={bin}:/clickhouse \
--volume={base_cfg}:/clickhouse-config --volume={cases_dir}:/ClickHouse/tests/integration \
--volume={src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos \
--volume={name}_volume:/var/lib/docker {env_tags} -e PYTEST_OPTS='{opts}' {img} {command}".format(
net=net,
tty=tty,
bin=args.binary,
bridge_bin=args.bridge_binary,
odbc_bridge_bin=args.odbc_bridge_binary,
library_bridge_bin=args.library_bridge_binary,
base_cfg=args.base_configs_dir,
cases_dir=args.cases_dir,
src_dir=args.src_dir,

View File

@ -0,0 +1,12 @@
<yandex>
<dictionaries_lib_path>/etc/clickhouse-server/config.d/dictionaries_lib</dictionaries_lib_path>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,298 @@
/// c++ sample dictionary library
#include <cstdint>
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <vector>
namespace ClickHouseLibrary
{
using CString = const char *;
using ColumnName = CString;
using ColumnNames = ColumnName[];
struct CStrings
{
CString * data = nullptr;
uint64_t size = 0;
};
struct VectorUInt64
{
const uint64_t * data = nullptr;
uint64_t size = 0;
};
struct ColumnsUInt64
{
VectorUInt64 * data = nullptr;
uint64_t size = 0;
};
struct Field
{
const void * data = nullptr;
uint64_t size = 0;
};
struct Row
{
const Field * data = nullptr;
uint64_t size = 0;
};
struct Table
{
const Row * data = nullptr;
uint64_t size = 0;
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
const char * error_string = nullptr;
};
enum LogLevel
{
FATAL = 1,
CRITICAL,
ERROR,
WARNING,
NOTICE,
INFORMATION,
DEBUG,
TRACE,
};
void log(LogLevel level, CString msg);
}
#define LOG(logger, message) \
do \
{ \
std::stringstream builder; \
builder << message; \
(logger)(ClickHouseLibrary::INFORMATION, builder.str().c_str()); \
} while (false)
struct LibHolder
{
std::function<void(ClickHouseLibrary::LogLevel, ClickHouseLibrary::CString)> log;
};
struct DataHolder
{
std::vector<std::vector<uint64_t>> dataHolder; // Actual data storage
std::vector<std::vector<ClickHouseLibrary::Field>> fieldHolder; // Pointers and sizes of data
std::unique_ptr<ClickHouseLibrary::Row[]> rowHolder;
ClickHouseLibrary::Table ctable; // Result data prepared for transfer via c-style interface
LibHolder * lib = nullptr;
size_t num_rows;
size_t num_cols;
};
template <typename T>
void MakeColumnsFromVector(T * ptr)
{
if (ptr->dataHolder.empty())
{
LOG(ptr->lib->log, "generating null values, cols: " << ptr->num_cols);
std::vector<ClickHouseLibrary::Field> fields;
for (size_t i = 0; i < ptr->num_cols; ++i)
fields.push_back({nullptr, 0});
ptr->fieldHolder.push_back(fields);
}
else
{
for (const auto & row : ptr->dataHolder)
{
std::vector<ClickHouseLibrary::Field> fields;
for (const auto & field : row)
fields.push_back({&field, sizeof(field)});
ptr->fieldHolder.push_back(fields);
}
}
const auto rows_num = ptr->fieldHolder.size();
ptr->rowHolder = std::make_unique<ClickHouseLibrary::Row[]>(rows_num);
size_t i = 0;
for (auto & row : ptr->fieldHolder)
{
ptr->rowHolder[i].size = row.size();
ptr->rowHolder[i].data = row.data();
++i;
}
ptr->ctable.size = rows_num;
ptr->ctable.data = ptr->rowHolder.get();
}
extern "C"
{
void * ClickHouseDictionary_v3_loadIds(void * data_ptr,
ClickHouseLibrary::CStrings * settings,
ClickHouseLibrary::CStrings * columns,
const struct ClickHouseLibrary::VectorUInt64 * ids)
{
auto ptr = static_cast<DataHolder *>(data_ptr);
if (ids)
LOG(ptr->lib->log, "loadIds lib call ptr=" << data_ptr << " => " << ptr << " size=" << ids->size);
if (!ptr)
return nullptr;
if (settings)
{
LOG(ptr->lib->log, "settings passed: " << settings->size);
for (size_t i = 0; i < settings->size; ++i)
{
LOG(ptr->lib->log, "setting " << i << " :" << settings->data[i]);
}
}
if (columns)
{
LOG(ptr->lib->log, "columns passed:" << columns->size);
for (size_t i = 0; i < columns->size; ++i)
{
LOG(ptr->lib->log, "column " << i << " :" << columns->data[i]);
}
}
if (ids)
{
LOG(ptr->lib->log, "ids passed: " << ids->size);
for (size_t i = 0; i < ids->size; ++i)
{
LOG(ptr->lib->log, "id " << i << " :" << ids->data[i] << " generating.");
ptr->dataHolder.emplace_back(std::vector<uint64_t>{ids->data[i], ids->data[i] + 100, ids->data[i] + 200, ids->data[i] + 300});
}
}
MakeColumnsFromVector(ptr);
return static_cast<void *>(&ptr->ctable);
}
void * ClickHouseDictionary_v3_loadAll(void * data_ptr, ClickHouseLibrary::CStrings * settings, ClickHouseLibrary::CStrings * /*columns*/)
{
auto ptr = static_cast<DataHolder *>(data_ptr);
LOG(ptr->lib->log, "loadAll lib call ptr=" << data_ptr << " => " << ptr);
if (!ptr)
return nullptr;
size_t num_rows = 0, num_cols = 4;
std::string test_type;
std::vector<std::string> settings_values;
if (settings)
{
LOG(ptr->lib->log, "settings size: " << settings->size);
for (size_t i = 0; i < settings->size; ++i)
{
std::string setting_name = settings->data[i];
std::string setting_value = settings->data[++i];
LOG(ptr->lib->log, "setting " + std::to_string(i) + " name " + setting_name + " value " + setting_value);
if (setting_name == "num_rows")
num_rows = std::atoi(setting_value.data());
else if (setting_name == "num_cols")
num_cols = std::atoi(setting_value.data());
else if (setting_name == "test_type")
test_type = setting_value;
else
{
LOG(ptr->lib->log, "Adding setting " + setting_name);
settings_values.push_back(setting_value);
}
}
}
if (test_type == "test_simple")
{
for (size_t i = 0; i < 10; ++i)
{
LOG(ptr->lib->log, "id " << i << " :" << " generating.");
ptr->dataHolder.emplace_back(std::vector<uint64_t>{i, i + 10, i + 20, i + 30});
}
}
else if (test_type == "test_many_rows" && num_rows)
{
for (size_t i = 0; i < num_rows; ++i)
{
ptr->dataHolder.emplace_back(std::vector<uint64_t>{i, i, i, i});
}
}
ptr->num_cols = num_cols;
ptr->num_rows = num_rows;
MakeColumnsFromVector(ptr);
return static_cast<void *>(&ptr->ctable);
}
void * ClickHouseDictionary_v3_loadKeys(void * data_ptr, ClickHouseLibrary::CStrings * settings, ClickHouseLibrary::Table * requested_keys)
{
auto ptr = static_cast<DataHolder *>(data_ptr);
LOG(ptr->lib->log, "loadKeys lib call ptr=" << data_ptr << " => " << ptr);
if (settings)
{
LOG(ptr->lib->log, "settings passed: " << settings->size);
for (size_t i = 0; i < settings->size; ++i)
{
LOG(ptr->lib->log, "setting " << i << " :" << settings->data[i]);
}
}
if (requested_keys)
{
LOG(ptr->lib->log, "requested_keys columns passed: " << requested_keys->size);
for (size_t i = 0; i < requested_keys->size; ++i)
{
LOG(ptr->lib->log, "requested_keys at column " << i << " passed: " << requested_keys->data[i].size);
ptr->dataHolder.emplace_back(std::vector<uint64_t>{i, i + 100, i + 200, i + 300});
}
}
MakeColumnsFromVector(ptr);
return static_cast<void *>(&ptr->ctable);
}
void * ClickHouseDictionary_v3_libNew(
ClickHouseLibrary::CStrings * /*settings*/, void (*logFunc)(ClickHouseLibrary::LogLevel, ClickHouseLibrary::CString))
{
auto lib_ptr = new LibHolder;
lib_ptr->log = logFunc;
return lib_ptr;
}
void ClickHouseDictionary_v3_libDelete(void * lib_ptr)
{
auto ptr = static_cast<LibHolder *>(lib_ptr);
delete ptr;
return;
}
void * ClickHouseDictionary_v3_dataNew(void * lib_ptr)
{
auto data_ptr = new DataHolder;
data_ptr->lib = static_cast<decltype(data_ptr->lib)>(lib_ptr);
return data_ptr;
}
void ClickHouseDictionary_v3_dataDelete(void * /*lib_ptr*/, void * data_ptr)
{
auto ptr = static_cast<DataHolder *>(data_ptr);
delete ptr;
return;
}
}

View File

@ -0,0 +1,86 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>dict1</name>
<source>
<library>
<path>/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so</path>
<settings>
<test_type>test_simple</test_type>
<key>nice key</key>
<value>interesting, nice value</value>
<table>//home/interesting-path/to-/interesting_data</table>
<pass>11</pass>
<user>user-u -user</user>
</settings>
</library>
</source>
<layout>
<hashed/>
</layout>
<lifetime>
<max>1</max>
<min>1</min>
</lifetime>
<structure>
<id>
<name>key</name>
<type>UInt64</type>
</id>
<attribute>
<name>value1</name>
<null_value></null_value>
<type>UInt64</type>
</attribute>
<attribute>
<name>value2</name>
<null_value></null_value>
<type>UInt64</type>
</attribute>
<attribute>
<name>value3</name>
<null_value></null_value>
<type>UInt64</type>
</attribute>
</structure>
</dictionary>
<dictionary>
<name>dict2</name>
<source>
<library>
<path>/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so</path>
<settings>
<test_type>test_nulls</test_type>
</settings>
</library>
</source>
<layout>
<hashed/>
</layout>
<lifetime>
<max>1</max>
<min>1</min>
</lifetime>
<structure>
<id>
<name>key</name>
<type>UInt64</type>
</id>
<attribute>
<name>value1</name>
<null_value>12</null_value>
<type>UInt64</type>
</attribute>
<attribute>
<name>value2</name>
<null_value>12</null_value>
<type>UInt64</type>
</attribute>
<attribute>
<name>value3</name>
<null_value>12</null_value>
<type>UInt64</type>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,4 @@
<?xml version="1.0"?>
<yandex>
<dictionaries_config>/etc/clickhouse-server/config.d/dict*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,17 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
<library_bridge_log>/var/log/clickhouse-server/clickhouse-library-bridge.log</library_bridge_log>
<library_bridge_errlog>/var/log/clickhouse-server/clickhouse-library-bridge.err.log</library_bridge_errlog>
<library_bridge_stdout>/var/log/clickhouse-server/clickhouse-library-bridge.stdout</library_bridge_stdout>
<library_bridge_stderr>/var/log/clickhouse-server/clickhouse-library-bridge.stderr</library_bridge_stderr>
<library_bridge_level>trace</library_bridge_level>
</logger>
</yandex>

View File

@ -0,0 +1,154 @@
import os
import os.path as p
import pytest
import time
from helpers.cluster import ClickHouseCluster, run_and_check
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=[
'configs/enable_dict.xml',
'configs/config.d/config.xml',
'configs/dictionaries/dict1.xml',
'configs/log_conf.xml'])
@pytest.fixture(scope="module")
def ch_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
container_lib_path = '/etc/clickhouse-server/config.d/dictionarites_lib/dict_lib.cpp'
instance.copy_file_to_container(os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs/dict_lib.cpp"),
"/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.cpp")
instance.query("SYSTEM RELOAD CONFIG")
instance.exec_in_container(
['bash', '-c',
'/usr/bin/g++ -shared -o /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so -fPIC /etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.cpp'],
user='root')
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def setup_teardown():
yield # run test
def test_load_all(ch_cluster):
instance.query('''
CREATE DICTIONARY lib_dict (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key
SOURCE(library(
PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'
SETTINGS (test_type test_simple)))
LAYOUT(HASHED())
LIFETIME (MIN 0 MAX 10)
''')
result = instance.query('SELECT * FROM lib_dict ORDER BY key')
expected = (
"0\t10\t20\t30\n" +
"1\t11\t21\t31\n" +
"2\t12\t22\t32\n" +
"3\t13\t23\t33\n" +
"4\t14\t24\t34\n" +
"5\t15\t25\t35\n" +
"6\t16\t26\t36\n" +
"7\t17\t27\t37\n" +
"8\t18\t28\t38\n" +
"9\t19\t29\t39\n"
)
instance.query('SYSTEM RELOAD DICTIONARY dict1')
instance.query('DROP DICTIONARY lib_dict')
assert(result == expected)
instance.query("""
CREATE TABLE IF NOT EXISTS `dict1_table` (
key UInt64, value1 UInt64, value2 UInt64, value3 UInt64
) ENGINE = Dictionary(dict1)
""")
result = instance.query('SELECT * FROM dict1_table ORDER BY key')
assert(result == expected)
def test_load_ids(ch_cluster):
instance.query('''
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'))
LAYOUT(CACHE(
SIZE_IN_CELLS 10000000
BLOCK_SIZE 4096
FILE_SIZE 16777216
READ_BUFFER_SIZE 1048576
MAX_STORED_KEYS 1048576))
LIFETIME(2) ;
''')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
assert(result.strip() == '100')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.query('DROP DICTIONARY lib_dict_c')
def test_load_keys(ch_cluster):
instance.query('''
CREATE DICTIONARY lib_dict_ckc (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key
SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'))
LAYOUT(COMPLEX_KEY_CACHE( SIZE_IN_CELLS 10000000))
LIFETIME(2);
''')
result = instance.query('''select dictGet(lib_dict_ckc, 'value1', tuple(toUInt64(0)));''')
assert(result.strip() == '100')
result = instance.query('''select dictGet(lib_dict_ckc, 'value2', tuple(toUInt64(0)));''')
assert(result.strip() == '200')
instance.query('DROP DICTIONARY lib_dict_ckc')
def test_load_all_many_rows(ch_cluster):
num_rows = [1000, 10000, 100000, 1000000]
for num in num_rows:
instance.query('''
CREATE DICTIONARY lib_dict (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
PRIMARY KEY key
SOURCE(library(
PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'
SETTINGS (num_rows {} test_type test_many_rows)))
LAYOUT(HASHED())
LIFETIME (MIN 0 MAX 10)
'''.format(num))
result = instance.query('SELECT * FROM lib_dict ORDER BY key')
expected = instance.query('SELECT number, number, number, number FROM numbers({})'.format(num))
instance.query('DROP DICTIONARY lib_dict')
assert(result == expected)
def test_null_values(ch_cluster):
instance.query('SYSTEM RELOAD DICTIONARY dict2')
instance.query("""
CREATE TABLE IF NOT EXISTS `dict2_table` (
key UInt64, value1 UInt64, value2 UInt64, value3 UInt64
) ENGINE = Dictionary(dict2)
""")
result = instance.query('SELECT * FROM dict2_table ORDER BY key')
expected = "0\t12\t12\t12\n"
assert(result == expected)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()