First version

This commit is contained in:
kssenii 2021-03-05 09:38:00 +00:00
parent 15b3f379a5
commit dd4a7b6e3d
17 changed files with 1434 additions and 203 deletions

View File

@ -36,6 +36,9 @@ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Table data obfuscator (convert real data t
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source"
${ENABLE_CLICKHOUSE_ALL})
# https://presentations.clickhouse.tech/matemarketing_2020/
option (ENABLE_CLICKHOUSE_GIT_IMPORT "A tool to analyze Git repositories"
${ENABLE_CLICKHOUSE_ALL})
@ -109,6 +112,12 @@ else()
message(STATUS "ODBC bridge mode: OFF")
endif()
if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
message(STATUS "Library bridge mode: ON")
else()
message(STATUS "Library bridge mode: OFF")
endif()
if (ENABLE_CLICKHOUSE_INSTALL)
message(STATUS "ClickHouse install: ON")
else()
@ -193,6 +202,10 @@ if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_subdirectory (odbc-bridge)
endif ()
if (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE)
add_subdirectory (library-bridge)
endif ()
if (CLICKHOUSE_ONE_SHARED)
add_library(clickhouse-lib SHARED ${CLICKHOUSE_SERVER_SOURCES} ${CLICKHOUSE_CLIENT_SOURCES} ${CLICKHOUSE_LOCAL_SOURCES} ${CLICKHOUSE_BENCHMARK_SOURCES} ${CLICKHOUSE_COPIER_SOURCES} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES} ${CLICKHOUSE_COMPRESSOR_SOURCES} ${CLICKHOUSE_FORMAT_SOURCES} ${CLICKHOUSE_OBFUSCATOR_SOURCES} ${CLICKHOUSE_GIT_IMPORT_SOURCES} ${CLICKHOUSE_ODBC_BRIDGE_SOURCES})
target_link_libraries(clickhouse-lib ${CLICKHOUSE_SERVER_LINK} ${CLICKHOUSE_CLIENT_LINK} ${CLICKHOUSE_LOCAL_LINK} ${CLICKHOUSE_BENCHMARK_LINK} ${CLICKHOUSE_COPIER_LINK} ${CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK} ${CLICKHOUSE_COMPRESSOR_LINK} ${CLICKHOUSE_FORMAT_LINK} ${CLICKHOUSE_OBFUSCATOR_LINK} ${CLICKHOUSE_GIT_IMPORT_LINK} ${CLICKHOUSE_ODBC_BRIDGE_LINK})

View File

@ -0,0 +1,30 @@
set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
library-bridge.cpp
library-log.cpp
LibraryBridge.cpp
Handlers.cpp
HandlerFactory.cpp
SharedLibraryHandler.cpp
)
if (OS_LINUX)
# clickhouse-library-bridge is always a separate binary.
# Reason: it must not export symbols from SSL, mariadb-client, etc. to not break ABI compatibility with ODBC drivers.
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-export-dynamic")
endif ()
add_executable(clickhouse-library-bridge ${CLICKHOUSE_LIBRARY_BRIDGE_SOURCES})
target_link_libraries(clickhouse-library-bridge PRIVATE
daemon
dbms
clickhouse_parsers
)
set_target_properties(clickhouse-library-bridge PROPERTIES RUNTIME_OUTPUT_DIRECTORY ..)
install(TARGETS clickhouse-library-bridge RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
#if (ENABLE_TESTS)
# add_subdirectory(tests)
#endif()

View File

@ -0,0 +1,38 @@
#include "HandlerFactory.h"
#include <Poco/Net/HTTPServerRequest.h>
#include <common/logger_useful.h>
#include "Handlers.h"
namespace DB
{
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<PingHandler>(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
/// Remove '/' in the beginning.
auto dictionary_id = uri.getPath().substr(1);
auto library_handler = library_handlers.find(dictionary_id);
if (library_handler == library_handlers.end())
{
auto library_handler_ptr = std::make_shared<SharedLibraryHandler>(dictionary_id);
library_handlers[dictionary_id] = library_handler_ptr;
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler_ptr);
}
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler->second);
}
return nullptr;
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Poco/Logger.h>
namespace DB
{
class SharedLibraryHandler;
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
/// Factory for '/ping', '/' handlers.
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory
{
public:
LibraryBridgeHandlerFactory(
const std::string & name_,
size_t keep_alive_timeout_,
Context & context_)
: log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
{
}
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
Context & context;
/// Map dictionary_id -> SharedLibraryHandler
std::unordered_map<std::string, SharedLibraryHandlerPtr> library_handlers;
};
}

View File

@ -0,0 +1,175 @@
#include "Handlers.h"
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromIStream.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <common/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>
#include <mutex>
#include <memory>
namespace DB
{
namespace
{
std::shared_ptr<Block> parseColumns(std::string && column_string)
{
auto sample_block = std::make_shared<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
}
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Dictionary ID: {}", library_handler->getDictID());
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(request);
params.read(request.getStream());
if (!params.has("method"))
{
processError(response, "No 'method' in request URL");
return;
}
std::string method = params.get("method");
LOG_TRACE(log, "Library method: '{}'", method);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try
{
if (method == "libNew")
{
if (!params.has("library_path"))
{
processError(response, "No 'library_path' in request URL");
return;
}
if (!params.has("library_settings"))
{
processError(response, "No 'library_settings' in request URL");
return;
}
std::string library_path = params.get("library_path");
std::string library_settings = params.get("library_settings");
LOG_TRACE(log, "Library path: '{}', library_settings: '{}'", library_path, library_settings);
library_handler->libNew(library_path, library_settings);
writeStringBinary("Ok.", out);
}
else if (method == "libDelete")
{
//library_handler->libDelete();
writeStringBinary("Ok.", out);
}
else if (method == "loadAll")
{
if (!params.has("attributes"))
{
processError(response, "No 'attributes' in request URL");
return;
}
std::string attributes = params.get("attributes");
std::string columns = params.get("columns");
std::shared_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
return;
}
auto input = library_handler->loadAll(attributes, *sample_block);
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream("RowBinary", out, *sample_block, context);
copyData(*input, *output);
}
}
catch (...)
{
LOG_TRACE(log, "KSSENII caught an error");
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, because of too soon response sending
try
{
writeStringBinary(message, out);
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
tryLogCurrentException(log);
}
try
{
out.finalize();
}
catch (...)
{
tryLogCurrentException(log);
}
}
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_WARNING(log, message);
}
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}

View File

@ -0,0 +1,60 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Poco/Logger.h>
#include "SharedLibraryHandler.h"
namespace DB
{
/** Handler for requests to Library Dictionary Source, returns response in RowBinary format
*/
class LibraryRequestHandler : public HTTPRequestHandler
{
public:
LibraryRequestHandler(
size_t keep_alive_timeout_,
Context & context_,
SharedLibraryHandlerPtr library_handler_)
: log(&Poco::Logger::get("LibraryRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
, library_handler(library_handler_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
void processError(HTTPServerResponse & response, const std::string & message);
Poco::Logger * log;
size_t keep_alive_timeout;
Context & context;
SharedLibraryHandlerPtr library_handler;
};
/// Simple ping handler, answers "Ok." to GET request
class PingHandler : public HTTPRequestHandler
{
public:
explicit PingHandler(size_t keep_alive_timeout_)
: keep_alive_timeout(keep_alive_timeout_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
size_t keep_alive_timeout;
};
}

View File

@ -0,0 +1,250 @@
#include "LibraryBridge.h"
#include "HandlerFactory.h"
#include <string>
#include <errno.h>
#include <IO/ReadHelpers.h>
#include <boost/program_options.hpp>
#include <Poco/Net/NetException.h>
#include <Poco/String.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config.h>
#include <Formats/registerFormats.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <ext/range.h>
#include <Common/SensitiveDataMasker.h>
#include <Server/HTTP/HTTPServer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log, "Cannot resolve listen_host ({}), error {}: {}. If it is an IPv6 address and your host has disabled IPv6, then consider to specify IPv4 address to listen in <listen_host> element of configuration file. Example: <listen_host>0.0.0.0</listen_host>", host, e.code(), e.message());
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
}
}
void LibraryBridge::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter help_formatter(options());
help_formatter.setCommand(commandName());
help_formatter.setHeader("HTTP-proxy for library dictionary requests");
help_formatter.setUsage("--http-port <port>");
help_formatter.format(std::cerr);
stopOptionsProcessing();
}
void LibraryBridge::defineOptions(Poco::Util::OptionSet & options)
{
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
options.addOption(
Poco::Util::Option("listen-host", "", "hostname or address to listen, default 127.0.0.1").argument("listen-host").binding("listen-host"));
options.addOption(
Poco::Util::Option("http-timeout", "", "http timeout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
.argument("max-server-connections")
.binding("max-server-connections"));
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
.argument("keep-alive-timeout")
.binding("keep-alive-timeout"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
options.addOption(
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
.argument("err-log-path")
.binding("logger.errorlog"));
options.addOption(Poco::Util::Option("stdout-path", "", "stdout log path, default console")
.argument("stdout-path")
.binding("logger.stdout"));
options.addOption(Poco::Util::Option("stderr-path", "", "stderr log path, default console")
.argument("stderr-path")
.binding("logger.stderr"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message")
.binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
ServerApplication::defineOptions(options); // NOLINT Don't need complex BaseDaemon's .xml config
}
void LibraryBridge::initialize(Application & self)
{
BaseDaemon::closeFDs();
config().setString("logger", "LibraryBridge");
/// Redirect stdout, stderr to specified files.
/// Some libraries and sanitizers write to stderr in case of errors.
const auto stdout_path = config().getString("logger.stdout", "");
if (!stdout_path.empty())
{
if (!freopen(stdout_path.c_str(), "a+", stdout))
throw Poco::OpenFileException("Cannot attach stdout to " + stdout_path);
/// Disable buffering for stdout.
setbuf(stdout, nullptr);
}
const auto stderr_path = config().getString("logger.stderr", "");
if (!stderr_path.empty())
{
if (!freopen(stderr_path.c_str(), "a+", stderr))
throw Poco::OpenFileException("Cannot attach stderr to " + stderr_path);
/// Disable buffering for stderr.
setbuf(stderr, nullptr);
}
buildLoggers(config(), logger(), self.commandName());
BaseDaemon::logRevision();
log = &logger();
hostname = config().getString("listen-host", "127.0.0.1");
port = config().getUInt("http-port");
if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing();
ServerApplication::initialize(self); // NOLINT
}
void LibraryBridge::uninitialize()
{
BaseDaemon::uninitialize();
}
int LibraryBridge::main(const std::vector<std::string> & /*args*/)
{
registerFormats();
LOG_INFO(log, "KSSENII Starting up");
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
auto shared_context = Context::createShared();
Context context(Context::createGlobal(shared_context.get()));
context.makeGlobalContext();
if (config().has("query_masking_rules"))
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
auto server = HTTPServer(
context,
std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context),
server_pool,
socket,
http_params);
server.start();
LOG_INFO(log, "Listening http://{}", address.toString());
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
server.stop();
for (size_t count : ext::range(1, 6))
{
if (server.currentConnections() == 0)
break;
LOG_DEBUG(log, "Waiting for {} connections, try {}", server.currentConnections(), count);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
waitForTerminationRequest();
return Application::EXIT_OK;
}
}
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
{
DB::LibraryBridge app;
try
{
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <Interpreters/Context.h>
#include <daemon/BaseDaemon.h>
#include <common/logger_useful.h>
namespace DB
{
class LibraryBridge : public BaseDaemon
{
public:
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
void initialize(Application & self) override;
void uninitialize() override;
int main(const std::vector<std::string> & args) override;
private:
void handleHelp(const std::string &, const std::string &);
std::string hostname;
size_t port;
size_t http_timeout;
std::string log_level;
size_t max_server_connections;
size_t keep_alive_timeout;
Poco::Logger * log;
};
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <Common/StringUtils/StringUtils.h>
#include <Dictionaries/LibraryDictionarySourceExternal.h>
#include <Core/Block.h>
#include <ext/bit_cast.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int FILE_DOESNT_EXIST;
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int PATH_ACCESS_DENIED;
}
class CStringsHolder
{
public:
using Container = std::vector<std::string>;
explicit CStringsHolder(const Container & strings_pass)
{
strings_holder = strings_pass;
strings.size = strings_holder.size();
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
strings.data = ptr_holder.get();
size_t i = 0;
for (auto & str : strings_holder)
{
strings.data[i] = str.c_str();
++i;
}
}
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
Container strings_holder;
};
}

View File

@ -0,0 +1,159 @@
#include "SharedLibraryHandler.h"
#include <boost/algorithm/string/split.hpp>
#include <ext/scope_guard.h>
#include <IO/ReadHelpers.h>
namespace DB
{
SharedLibraryHandler::SharedLibraryHandler(const std::string & dictionary_id_)
: log(&Poco::Logger::get("SharedLibraryHandler"))
, dictionary_id(dictionary_id_)
{
}
SharedLibraryHandler::~SharedLibraryHandler()
{
auto lib_delete = library->tryGet<void (*)(decltype(lib_data))>("ClickHouseDictionary_v3_libDelete");
if (lib_delete)
lib_delete(lib_data);
}
void SharedLibraryHandler::libNew(const std::string & path, const std::string & settings)
{
library_path = path;
library = std::make_shared<SharedLibrary>(library_path, RTLD_LAZY
#if defined(RTLD_DEEPBIND) && !defined(ADDRESS_SANITIZER) // Does not exists in FreeBSD. Cannot work with Address Sanitizer.
| RTLD_DEEPBIND
#endif
);
std::vector<std::string> lib_settings;
boost::split(lib_settings, settings, [](char c) { return c == ' '; });
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(lib_settings));
auto lib_new = library->tryGet<decltype(lib_data) (*)(
decltype(&settings_holder->strings), decltype(&ClickHouseLibrary::log))>("ClickHouseDictionary_v3_libNew");
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
}
//void SharedLibraryHandler::libCloneOrNew()
//{
// if (auto lib_clone = library->tryGet<decltype(lib_data) (*)(decltype(other.lib_data))>("ClickHouseDictionary_v3_libClone"))
// lib_data = lib_clone(other.lib_data);
// else if (auto lib_new = library->tryGet<decltype(lib_data) (*)(decltype(&settings->strings), decltype(&ClickHouseLibrary::log))>("ClickHouseDictionary_v3_libNew"))
// lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
//}
BlockInputStreamPtr SharedLibraryHandler::loadAll(const std::string & attributes_string, const Block & sample_block)
{
std::vector<std::string> dict_attributes;
boost::split(dict_attributes, attributes_string, [](char c) { return c == ','; });
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_attributes.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), dict_attributes.size()};
size_t i = 0;
for (const auto & attr : dict_attributes)
columns.data[i++] = attr.c_str();
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_all = library->get<void * (*)(
decltype(data_ptr), decltype(&settings_holder->strings), decltype(&columns))>("ClickHouseDictionary_v3_loadAll");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_all(data_ptr, &settings_holder->strings, &columns);
auto block = dataToBlock(sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
}
BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::string & attributes_string, const std::string & ids_string, const Block & sample_block)
{
std::vector<std::string> dict_string_ids;
boost::split(dict_string_ids, ids_string, [](char c) { return c == ','; });
std::vector<UInt64> dict_ids;
for (const auto & id : dict_string_ids)
dict_ids.push_back(parseFromString<UInt64>(id));
std::vector<std::string> dict_attributes;
boost::split(dict_attributes, attributes_string, [](char c) { return c == ','; });
const ClickHouseLibrary::VectorUInt64 ids_data{ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(dict_ids.data()), dict_ids.size()};
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_attributes.size());
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), dict_attributes.size()};
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_ids = library->get<void * (*)(
decltype(data_ptr), decltype(&settings_holder->strings), decltype(&columns_pass), decltype(&ids_data))>("ClickHouseDictionary_v3_loadIds");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_ids(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
auto block = dataToBlock(sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
}
Block SharedLibraryHandler::dataToBlock(const Block & sample_block, const void * data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns(sample_block.columns());
for (const auto i : ext::range(0, columns.size()))
columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Common/SharedLibrary.h>
#include <common/logger_useful.h>
#include <DataStreams/OneBlockInputStream.h>
#include "LibraryUtils.h"
namespace DB
{
class SharedLibraryHandler
{
public:
SharedLibraryHandler(const std::string & dictionary_id_);
~SharedLibraryHandler();
const std::string & getDictID() { return dictionary_id; }
//void libDelete();
void libNew(const std::string & path, const std::string & settings);
BlockInputStreamPtr loadAll(const std::string & attributes_string, const Block & sample_block);
BlockInputStreamPtr loadIds(const std::string & ids_string, const std::string & attributes_string, const Block & sample_block);
private:
Block dataToBlock(const Block & sample_block, const void * data);
Poco::Logger * log;
std::string dictionary_id;
std::string library_path;
SharedLibraryPtr library;
std::shared_ptr<CStringsHolder> settings_holder;
void * lib_data;
};
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
}

View File

@ -0,0 +1,3 @@
int mainEntryClickHouseLibraryBridge(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseLibraryBridge(argc_, argv_); }

View File

@ -0,0 +1,49 @@
#include <Dictionaries/LibraryDictionarySourceExternal.h>
#include <common/logger_useful.h>
namespace
{
const char DICT_LOGGER_NAME[] = "LibraryDictionarySourceExternal";
}
void ClickHouseLibrary::log(ClickHouseLibrary::LogLevel level, ClickHouseLibrary::CString msg)
{
using ClickHouseLibrary::LogLevel;
auto & logger = Poco::Logger::get(DICT_LOGGER_NAME);
switch (level)
{
case LogLevel::TRACE:
if (logger.trace())
logger.trace(msg);
break;
case LogLevel::DEBUG:
if (logger.debug())
logger.debug(msg);
break;
case LogLevel::INFORMATION:
if (logger.information())
logger.information(msg);
break;
case LogLevel::NOTICE:
if (logger.notice())
logger.notice(msg);
break;
case LogLevel::WARNING:
if (logger.warning())
logger.warning(msg);
break;
case LogLevel::ERROR:
if (logger.error())
logger.error(msg);
break;
case LogLevel::CRITICAL:
if (logger.critical())
logger.critical(msg);
break;
case LogLevel::FATAL:
if (logger.fatal())
logger.fatal(msg);
break;
}
}

View File

@ -0,0 +1,245 @@
#include <Common/LibraryBridgeHelper.h>
#include <sstream>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Formats/FormatFactory.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
#include <common/logger_useful.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
LibraryBridgeHelper::LibraryBridgeHelper(
const Context & context_,
const std::string & dictionary_id_)
: log(&Poco::Logger::get("LibraryBridgeHelper"))
, context(context_)
, dictionary_id(dictionary_id_)
{
const auto & config = context.getConfigRef();
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
}
Poco::URI LibraryBridgeHelper::getPingURI() const
{
auto uri = getBaseURI();
uri.setPath(PING_HANDLER);
return uri;
}
Poco::URI LibraryBridgeHelper::getDictionaryURI() const
{
auto uri = getBaseURI();
uri.setPath('/' + dictionary_id);
return uri;
}
Poco::URI LibraryBridgeHelper::getBaseURI() const
{
Poco::URI uri;
uri.setHost(bridge_host);
uri.setPort(bridge_port);
uri.setScheme("http");
return uri;
}
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings)
{
startLibraryBridgeSync();
auto uri = getDictionaryURI();
uri.addQueryParameter("method", LIBNEW_METHOD);
uri.addQueryParameter("library_path", library_path);
uri.addQueryParameter("library_settings", library_settings);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
bool res;
readBoolText(res, buf);
return res;
}
bool LibraryBridgeHelper::deleteLibrary()
{
startLibraryBridgeSync();
auto uri = getDictionaryURI();
uri.addQueryParameter("method", LIBDELETE_METHOD);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
bool res;
readBoolText(res, buf);
return res;
}
BlockInputStreamPtr LibraryBridgeHelper::loadAll(const std::string attributes_string, const Block & sample_block)
{
startLibraryBridgeSync();
auto uri = getDictionaryURI();
uri.addQueryParameter("method", LOADALL_METHOD);
uri.addQueryParameter("attributes", attributes_string);
uri.addQueryParameter("columns", sample_block.getNamesAndTypesList().toString());
/// TODO: timeouts?
ReadWriteBufferFromHTTP read_buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, {});
auto format = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, read_buf, sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
return std::make_shared<OneBlockInputStream>(block);
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string attributes_string, const std::string ids_string, const Block & sample_block)
{
startLibraryBridgeSync();
auto uri = getDictionaryURI();
uri.addQueryParameter("method", LOADALL_METHOD);
uri.addQueryParameter("attributes", attributes_string);
uri.addQueryParameter("ids", ids_string);
uri.addQueryParameter("columns", sample_block.getNamesAndTypesList().toString());
/// TODO: timeouts?
ReadWriteBufferFromHTTP read_buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, {});
auto format = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, read_buf, sample_block, context, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
return std::make_shared<OneBlockInputStream>(block);
}
bool LibraryBridgeHelper::isLibraryBridgeRunning() const
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(context));
return checkString(LibraryBridgeHelper::PING_OK_ANSWER, buf);
}
catch (...)
{
return false;
}
}
void LibraryBridgeHelper::startLibraryBridgeSync() const
{
if (!isLibraryBridgeRunning())
{
LOG_TRACE(log, "clickhouse-library-bridge is not running, will try to start it");
startLibraryBridge();
bool started = false;
uint64_t milliseconds_to_wait = 10; /// Exponential backoff
uint64_t counter = 0;
while (milliseconds_to_wait < 10000)
{
++counter;
LOG_TRACE(log, "Checking clickhouse-library-bridge is running, try {}", counter);
if (isLibraryBridgeRunning())
{
started = true;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds_to_wait));
milliseconds_to_wait *= 2;
}
if (!started)
throw Exception("LibraryBridgeHelper: clickhouse-library-bridge is not responding", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
}
}
void LibraryBridgeHelper::startLibraryBridge() const
{
const auto & config = context.getConfigRef();
const auto & settings = context.getSettingsRef();
/// Path to executable folder
Poco::Path path{config.getString("application.dir", "/usr/bin")};
std::vector<std::string> cmd_args;
path.setFileName("clickhouse-library-bridge");
cmd_args.push_back("--http-port");
cmd_args.push_back(std::to_string(config.getUInt("library_bridge.port", DEFAULT_PORT)));
cmd_args.push_back("--listen-host");
cmd_args.push_back(config.getString("librray_bridge.listen_host", DEFAULT_HOST));
cmd_args.push_back("--http-timeout");
cmd_args.push_back(std::to_string(settings.http_receive_timeout.value.totalSeconds()));
if (config.has("logger.library_bridge_log"))
{
cmd_args.push_back("--log-path");
cmd_args.push_back(config.getString("logger.library_bridge_log"));
}
if (config.has("logger.library_bridge_errlog"))
{
cmd_args.push_back("--err-log-path");
cmd_args.push_back(config.getString("logger.library_bridge_errlog"));
}
if (config.has("logger.library_bridge_stdout"))
{
cmd_args.push_back("--stdout-path");
cmd_args.push_back(config.getString("logger.library_bridge_stdout"));
}
if (config.has("logger.library_bridge_stderr"))
{
cmd_args.push_back("--stderr-path");
cmd_args.push_back(config.getString("logger.library_bridge_stderr"));
}
if (config.has("logger.library_level"))
{
cmd_args.push_back("--log-level");
cmd_args.push_back(config.getString("logger.library_bridge_level"));
}
LOG_TRACE(log, "Starting clickhouse-library-bridge");
auto cmd =ShellCommand::executeDirect(path.toString(), cmd_args, true);
cmd->wait();
}
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_EXECUTABLE_NOT_FOUND;
}
class LibraryBridgeHelper
{
public:
LibraryBridgeHelper(
const Context & context_,
const std::string & dictionary_id_);
std::vector<std::pair<std::string, std::string>> getURLParams(const NamesAndTypesList & cols, size_t max_block_size) const;
bool initLibrary(const std::string & library_path, const std::string librray_settings);
bool deleteLibrary();
BlockInputStreamPtr loadAll(const std::string attributes_string, const Block & sample_block);
BlockInputStreamPtr loadIds(const std::string attributes_string, const std::string ids_string, const Block & sample_block);
static constexpr inline size_t DEFAULT_PORT = 9018;
static constexpr inline auto DEFAULT_HOST = "127.0.0.1";
static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto LIBNEW_METHOD = "libNew";
static constexpr inline auto LIBDELETE_METHOD = "libDelete";
static constexpr inline auto LOADALL_METHOD = "loadAll";
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
static constexpr inline auto PING_OK_ANSWER = "Ok.";
static const inline std::string PING_METHOD = Poco::Net::HTTPRequest::HTTP_GET;
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
bool isLibraryBridgeRunning() const;
void startLibraryBridge() const;
void startLibraryBridgeSync() const;
private:
Poco::URI getDictionaryURI() const;
Poco::URI getPingURI() const;
Poco::URI getBaseURI() const;
Poco::Logger * log;
const Context & context;
const std::string dictionary_id;
std::string bridge_host;
size_t bridge_port;
};
}

View File

@ -1,4 +1,5 @@
#include "LibraryDictionarySource.h"
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Poco/File.h>
@ -11,9 +12,12 @@
#include "DictionaryStructure.h"
#include "LibraryDictionarySourceExternal.h"
#include "registerDictionaries.h"
#include <IO/WriteBufferFromString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
@ -23,274 +27,196 @@ namespace ErrorCodes
}
class CStringsHolder
{
public:
using Container = std::vector<std::string>;
explicit CStringsHolder(const Container & strings_pass)
{
strings_holder = strings_pass;
strings.size = strings_holder.size();
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
strings.data = ptr_holder.get();
size_t i = 0;
for (auto & str : strings_holder)
{
strings.data[i] = str.c_str();
++i;
}
}
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
Container strings_holder;
};
namespace
{
constexpr auto lib_config_settings = ".settings";
CStringsHolder getLibSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
CStringsHolder::Container strings;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
strings.emplace_back(key_name);
strings.emplace_back(config.getString(config_root + "." + key));
}
return CStringsHolder(strings);
}
Block dataToBlock(const Block & sample_block, const void * data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " "
+ (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns(sample_block.columns());
for (const auto i : ext::range(0, columns.size()))
columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size)
+ ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}
LibraryDictionarySource::LibraryDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix_,
Block & sample_block_,
const Context & context,
const Context & context_,
bool check_config)
: log(&Poco::Logger::get("LibraryDictionarySource"))
, dict_struct{dict_struct_}
, config_prefix{config_prefix_}
, path{config.getString(config_prefix + ".path", "")}
, dictionary_id(createDictID())
, sample_block{sample_block_}
, context(context_)
{
if (check_config)
{
const String dictionaries_lib_path = context.getDictionariesLibPath();
if (!startsWith(path, dictionaries_lib_path))
throw Exception("LibraryDictionarySource: Library path " + path + " is not inside " + dictionaries_lib_path, ErrorCodes::PATH_ACCESS_DENIED);
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "LibraryDictionarySource: Library path {} is not inside {}", path, dictionaries_lib_path);
}
if (!Poco::File(path).exists())
throw Exception(
"LibraryDictionarySource: Can't load library " + Poco::File(path).path() + ": file doesn't exist",
ErrorCodes::FILE_DOESNT_EXIST);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", Poco::File(path).path());
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, dictionary_id);
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"));
if (!res)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to create shared library from path: {}", path);
description.init(sample_block);
library = std::make_shared<SharedLibrary>(path, RTLD_LAZY
#if defined(RTLD_DEEPBIND) && !defined(ADDRESS_SANITIZER) // Does not exists in FreeBSD. Cannot work with Address Sanitizer.
| RTLD_DEEPBIND
#endif
);
settings = std::make_shared<CStringsHolder>(getLibSettings(config, config_prefix + lib_config_settings));
if (auto lib_new = library->tryGet<decltype(lib_data) (*)(decltype(&settings->strings), decltype(&ClickHouseLibrary::log))>(
"ClickHouseDictionary_v3_libNew"))
lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
}
LibraryDictionarySource::~LibraryDictionarySource()
{
}
LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource & other)
: log(&Poco::Logger::get("LibraryDictionarySource"))
, dict_struct{other.dict_struct}
, config_prefix{other.config_prefix}
, path{other.path}
, dictionary_id{other.dictionary_id}
, sample_block{other.sample_block}
, library{other.library}
, context(other.context)
, description{other.description}
, settings{other.settings}
{
if (auto lib_clone = library->tryGet<decltype(lib_data) (*)(decltype(other.lib_data))>("ClickHouseDictionary_v3_libClone"))
lib_data = lib_clone(other.lib_data);
else if (
auto lib_new = library->tryGet<decltype(lib_data) (*)(decltype(&settings->strings), decltype(&ClickHouseLibrary::log))>(
"ClickHouseDictionary_v3_libNew"))
lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, dictionary_id);
}
LibraryDictionarySource::~LibraryDictionarySource()
String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
if (auto lib_delete = library->tryGet<void (*)(decltype(lib_data))>("ClickHouseDictionary_v3_libDelete"))
lib_delete(lib_data);
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
std::string res;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
if (!res.empty())
res += ' ';
res += key_name + ' ' + config.getString(config_root + "." + key);
}
return res;
}
String LibraryDictionarySource::getDictAttributesString()
{
std::string res;
for (const auto & attr : dict_struct.attributes)
{
if (!res.empty())
res += ',';
res += attr.name;
}
return res;
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
std::string res;
for (const auto & id : ids)
{
if (!res.empty())
res += ',';
res += std::to_string(id);
}
return res;
}
BlockInputStreamPtr LibraryDictionarySource::loadAll()
{
LOG_TRACE(log, "loadAll {}", toString());
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_struct.attributes.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()),
dict_struct.attributes.size()};
size_t i = 0;
for (const auto & a : dict_struct.attributes)
{
columns.data[i] = a.name.c_str();
++i;
}
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_all
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns))>("ClickHouseDictionary_v3_loadAll");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_all(data_ptr, &settings->strings, &columns);
auto block = dataToBlock(description.sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
return bridge_helper->loadAll(getDictAttributesString(), description.sample_block);
}
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
const ClickHouseLibrary::VectorUInt64 ids_data{ext::bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_struct.attributes.size());
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()),
dict_struct.attributes.size()};
size_t i = 0;
for (const auto & a : dict_struct.attributes)
{
columns_pass.data[i] = a.name.c_str();
++i;
}
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_ids
= library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&columns_pass), decltype(&ids_data))>(
"ClickHouseDictionary_v3_loadIds");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_ids(data_ptr, &settings->strings, &columns_pass, &ids_data);
auto block = dataToBlock(description.sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
return bridge_helper->loadIds(getDictAttributesString(), getDictIdsString(ids), description.sample_block);
}
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns & key_columns, const std::vector<std::size_t> & requested_rows)
BlockInputStreamPtr LibraryDictionarySource::loadKeys(const Columns &/* key_columns */, const std::vector<std::size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
return {};
//return bridge_helper->loadIds(getDictAttributesStrig(), getDictIdsString(), escription.sample_block);
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(requested_rows.size());
for (size_t j = 0; j < requested_rows.size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(requested_rows[j]);
cell_holder[j] = ClickHouseLibrary::Field{.data = static_cast<const void *>(data_ref.data), .size = data_ref.size};
}
holder[i]
= ClickHouseLibrary::Row{.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()), .size = requested_rows.size()};
//auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
//std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
column_data_holders.push_back(std::move(cell_holder));
}
//for (size_t i = 0; i < key_columns.size(); ++i)
//{
// auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(requested_rows.size());
ClickHouseLibrary::Table request_cols{.data = static_cast<ClickHouseLibrary::Row *>(holder.get()), .size = key_columns.size()};
// for (size_t j = 0; j < requested_rows.size(); ++j)
// {
// auto data_ref = key_columns[i]->getDataAt(requested_rows[j]);
// cell_holder[j] = ClickHouseLibrary::Field{.data = static_cast<const void *>(data_ref.data), .size = data_ref.size};
// }
void * data_ptr = nullptr;
/// Get function pointer before dataNew call because library->get may throw.
auto func_load_keys = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&request_cols))>(
"ClickHouseDictionary_v3_loadKeys");
data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
auto * data = func_load_keys(data_ptr, &settings->strings, &request_cols);
auto block = dataToBlock(description.sample_block, data);
SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
return std::make_shared<OneBlockInputStream>(block);
// holder[i] = ClickHouseLibrary::Row{.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()), .size = requested_rows.size()};
// column_data_holders.push_back(std::move(cell_holder));
//}
//ClickHouseLibrary::Table request_cols{.data = static_cast<ClickHouseLibrary::Row *>(holder.get()), .size = key_columns.size()};
//void * data_ptr = nullptr;
///// Get function pointer before dataNew call because library->get may throw.
//auto func_load_keys = library->get<void * (*)(decltype(data_ptr), decltype(&settings->strings), decltype(&request_cols))>("ClickHouseDictionary_v3_loadKeys");
//data_ptr = library->get<decltype(data_ptr) (*)(decltype(lib_data))>("ClickHouseDictionary_v3_dataNew")(lib_data);
//auto * data = func_load_keys(data_ptr, &settings->strings, &request_cols);
//auto block = dataToBlock(description.sample_block, data);
//SCOPE_EXIT(library->get<void (*)(decltype(lib_data), decltype(data_ptr))>("ClickHouseDictionary_v3_dataDelete")(lib_data, data_ptr));
}
bool LibraryDictionarySource::isModified() const
{
if (auto func_is_modified
= library->tryGet<bool (*)(decltype(lib_data), decltype(&settings->strings))>("ClickHouseDictionary_v3_isModified"))
return func_is_modified(lib_data, &settings->strings);
//if (auto func_is_modified = library->tryGet<bool (*)(decltype(lib_data), decltype(&settings->strings))>("ClickHouseDictionary_v3_isModified"))
// return func_is_modified(lib_data, &settings->strings);
return true;
}
bool LibraryDictionarySource::supportsSelectiveLoad() const
{
if (auto func_supports_selective_load
= library->tryGet<bool (*)(decltype(lib_data), decltype(&settings->strings))>("ClickHouseDictionary_v3_supportsSelectiveLoad"))
return func_supports_selective_load(lib_data, &settings->strings);
//if (auto func_supports_selective_load = library->tryGet<bool (*)(decltype(lib_data), decltype(&settings->strings))>("ClickHouseDictionary_v3_supportsSelectiveLoad"))
// return func_supports_selective_load(lib_data, &settings->strings);
return true;
}
DictionarySourcePtr LibraryDictionarySource::clone() const
{
return std::make_unique<LibraryDictionarySource>(*this);
}
std::string LibraryDictionarySource::toString() const
{
return path;
}
void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & dict_struct,
@ -303,7 +229,9 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
{
return std::make_unique<LibraryDictionarySource>(dict_struct, config, config_prefix + ".library", sample_block, context, check_config);
};
factory.registerSource("library", create_table_source);
}
}

View File

@ -1,7 +1,9 @@
#pragma once
#include <Common/SharedLibrary.h>
#include <Common/LibraryBridgeHelper.h>
#include <common/LocalDateTime.h>
#include <Common/thread_local_rng.h>
#include "DictionaryStructure.h"
#include <Core/ExternalResultDescription.h>
#include "IDictionarySource.h"
@ -17,18 +19,17 @@ namespace Util
}
}
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class CStringsHolder;
/// Allows loading dictionaries from dynamic libraries (.so)
/// Experimental version
/// Example: tests/external_dictionaries/dictionary_library/dictionary_library.cpp
class CStringsHolder;
using LibraryBridgeHelperPtr = std::shared_ptr<LibraryBridgeHelper>;
class LibraryDictionarySource final : public IDictionarySource
{
public:
@ -37,7 +38,7 @@ public:
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix_,
Block & sample_block_,
const Context & context,
const Context & context_,
bool check_config);
LibraryDictionarySource(const LibraryDictionarySource & other);
@ -68,18 +69,40 @@ public:
std::string toString() const override;
private:
Poco::Logger * log;
String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);
String getDictAttributesString();
String getDictIdsString(const std::vector<UInt64> & ids);
LocalDateTime getLastModification() const;
Poco::Logger * log;
const DictionaryStructure dict_struct;
const std::string config_prefix;
const std::string path;
const std::string dictionary_id;
Block sample_block;
SharedLibraryPtr library;
Context context;
LibraryBridgeHelperPtr bridge_helper;
ExternalResultDescription description;
std::shared_ptr<CStringsHolder> settings;
void * lib_data = nullptr;
String createDictID()
{
std::uniform_int_distribution<int> distribution('a', 'z');
String random_str(16, ' ');
for (auto & c : random_str)
c = distribution(thread_local_rng);
return random_str;
}
};
}