Prepare library-bridge for catboost integration

- Rename generic file and identifier names in library-bridge to
  something more dictionary-specific. This is needed because later on,
  catboost will be integrated into library-bridge.

- Also: Some smaller fixes like typos and un-inlining non-performance
  critical code.

- The logic remains unchanged in this commit.
This commit is contained in:
Robert Schulze 2022-08-03 11:19:13 +00:00
parent b33fe26d8c
commit ea73b98fb9
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
30 changed files with 598 additions and 607 deletions

View File

@ -44,15 +44,13 @@ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Table data obfuscator (convert real data t
# https://clickhouse.com/docs/en/operations/utilities/odbc-bridge/
# TODO Also needs NANODBC.
if (ENABLE_ODBC AND NOT USE_MUSL)
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver" ${ENABLE_CLICKHOUSE_ALL})
else ()
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver" OFF)
endif ()
if (NOT USE_MUSL)
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source"
${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source" ${ENABLE_CLICKHOUSE_ALL})
endif ()
# https://presentations.clickhouse.com/matemarketing_2020/

View File

@ -1,13 +1,13 @@
include(${ClickHouse_SOURCE_DIR}/cmake/split_debug_symbols.cmake)
set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
library-bridge.cpp
LibraryInterface.cpp
ExternalDictionaryLibraryAPI.cpp
ExternalDictionaryLibraryHandler.cpp
ExternalDictionaryLibraryHandlerFactory.cpp
LibraryBridge.cpp
Handlers.cpp
HandlerFactory.cpp
SharedLibraryHandler.cpp
SharedLibraryHandlerFactory.cpp
LibraryBridgeHandlerFactory.cpp
LibraryBridgeHandlers.cpp
library-bridge.cpp
)
if (OS_LINUX)

View File

@ -1,4 +1,4 @@
#include "LibraryInterface.h"
#include "ExternalDictionaryLibraryAPI.h"
#include <Common/logger_useful.h>
@ -7,24 +7,7 @@ namespace
const char DICT_LOGGER_NAME[] = "LibraryDictionarySourceExternal";
}
namespace ClickHouseLibrary
{
std::string_view LIBRARY_CREATE_NEW_FUNC_NAME = "ClickHouseDictionary_v3_libNew";
std::string_view LIBRARY_CLONE_FUNC_NAME = "ClickHouseDictionary_v3_libClone";
std::string_view LIBRARY_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_libDelete";
std::string_view LIBRARY_DATA_NEW_FUNC_NAME = "ClickHouseDictionary_v3_dataNew";
std::string_view LIBRARY_DATA_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_dataDelete";
std::string_view LIBRARY_LOAD_ALL_FUNC_NAME = "ClickHouseDictionary_v3_loadAll";
std::string_view LIBRARY_LOAD_IDS_FUNC_NAME = "ClickHouseDictionary_v3_loadIds";
std::string_view LIBRARY_LOAD_KEYS_FUNC_NAME = "ClickHouseDictionary_v3_loadKeys";
std::string_view LIBRARY_IS_MODIFIED_FUNC_NAME = "ClickHouseDictionary_v3_isModified";
std::string_view LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME = "ClickHouseDictionary_v3_supportsSelectiveLoad";
void log(LogLevel level, CString msg)
void ExternalDictionaryLibraryAPI::log(LogLevel level, CString msg)
{
auto & logger = Poco::Logger::get(DICT_LOGGER_NAME);
switch (level)
@ -63,5 +46,3 @@ void log(LogLevel level, CString msg)
break;
}
}
}

View File

@ -0,0 +1,106 @@
#pragma once
#include <cstdint>
#include <string>
#define CLICKHOUSE_DICTIONARY_LIBRARY_API 1
struct ExternalDictionaryLibraryAPI
{
using CString = const char *;
using ColumnName = CString;
using ColumnNames = ColumnName[];
struct CStrings
{
CString * data = nullptr;
uint64_t size = 0;
};
struct VectorUInt64
{
const uint64_t * data = nullptr;
uint64_t size = 0;
};
struct ColumnsUInt64
{
VectorUInt64 * data = nullptr;
uint64_t size = 0;
};
struct Field
{
const void * data = nullptr;
uint64_t size = 0;
};
struct Row
{
const Field * data = nullptr;
uint64_t size = 0;
};
struct Table
{
const Row * data = nullptr;
uint64_t size = 0;
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
const char * error_string = nullptr;
};
enum LogLevel
{
FATAL = 1,
CRITICAL,
ERROR,
WARNING,
NOTICE,
INFORMATION,
DEBUG,
TRACE,
};
static void log(LogLevel level, CString msg);
using LibraryContext = void *;
using LibraryLoggerFunc = void (*)(LogLevel, CString /* message */);
using LibrarySettings = CStrings *;
using LibraryData = void *;
using RawClickHouseLibraryTable = void *;
/// Can be safely casted into const Table * with static_cast<const ClickHouseLibrary::Table *>
using RequestedColumnsNames = CStrings *;
using RequestedIds = const VectorUInt64 *;
using RequestedKeys = Table *;
using LibraryNewFunc = LibraryContext (*)(LibrarySettings, LibraryLoggerFunc);
static constexpr const char * LIBRARY_CREATE_NEW_FUNC_NAME = "ClickHouseDictionary_v3_libNew";
using LibraryCloneFunc = LibraryContext (*)(LibraryContext);
static constexpr const char * LIBRARY_CLONE_FUNC_NAME = "ClickHouseDictionary_v3_libClone";
using LibraryDeleteFunc = void (*)(LibraryContext);
static constexpr const char * LIBRARY_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_libDelete";
using LibraryDataNewFunc = LibraryData (*)(LibraryContext);
static constexpr const char * LIBRARY_DATA_NEW_FUNC_NAME = "ClickHouseDictionary_v3_dataNew";
using LibraryDataDeleteFunc = void (*)(LibraryContext, LibraryData);
static constexpr const char * LIBRARY_DATA_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_dataDelete";
using LibraryLoadAllFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames);
static constexpr const char * LIBRARY_LOAD_ALL_FUNC_NAME = "ClickHouseDictionary_v3_loadAll";
using LibraryLoadIdsFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames, RequestedIds);
static constexpr const char * LIBRARY_LOAD_IDS_FUNC_NAME = "ClickHouseDictionary_v3_loadIds";
/// There are no requested column names for load keys func
using LibraryLoadKeysFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedKeys);
static constexpr const char * LIBRARY_LOAD_KEYS_FUNC_NAME = "ClickHouseDictionary_v3_loadKeys";
using LibraryIsModifiedFunc = bool (*)(LibraryContext, LibrarySettings);
static constexpr const char * LIBRARY_IS_MODIFIED_FUNC_NAME = "ClickHouseDictionary_v3_isModified";
using LibrarySupportsSelectiveLoadFunc = bool (*)(LibraryContext, LibrarySettings);
static constexpr const char * LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME = "ClickHouseDictionary_v3_supportsSelectiveLoad";
};

View File

@ -0,0 +1,214 @@
#include "ExternalDictionaryLibraryHandler.h"
#include <base/scope_guard.h>
#include <base/bit_cast.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
ExternalDictionaryLibraryHandler::ExternalDictionaryLibraryHandler(
const std::string & library_path_,
const std::vector<std::string> & library_settings,
const Block & sample_block_,
const std::vector<std::string> & attributes_names_)
: library_path(library_path_)
, sample_block(sample_block_)
, attributes_names(attributes_names_)
{
library = std::make_shared<SharedLibrary>(library_path);
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(library_settings));
auto lib_new = library->tryGet<ExternalDictionaryLibraryAPI::LibraryNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ExternalDictionaryLibraryAPI::log);
else
throw Exception("Method extDict_libNew failed", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
}
ExternalDictionaryLibraryHandler::ExternalDictionaryLibraryHandler(const ExternalDictionaryLibraryHandler & other)
: library_path{other.library_path}
, sample_block{other.sample_block}
, attributes_names{other.attributes_names}
, library{other.library}
, settings_holder{other.settings_holder}
{
auto lib_clone = library->tryGet<ExternalDictionaryLibraryAPI::LibraryCloneFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CLONE_FUNC_NAME);
if (lib_clone)
{
lib_data = lib_clone(other.lib_data);
}
else
{
auto lib_new = library->tryGet<ExternalDictionaryLibraryAPI::LibraryNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ExternalDictionaryLibraryAPI::log);
}
}
ExternalDictionaryLibraryHandler::~ExternalDictionaryLibraryHandler()
{
auto lib_delete = library->tryGet<ExternalDictionaryLibraryAPI::LibraryDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DELETE_FUNC_NAME);
if (lib_delete)
lib_delete(lib_data);
}
bool ExternalDictionaryLibraryHandler::isModified()
{
auto func_is_modified = library->tryGet<ExternalDictionaryLibraryAPI::LibraryIsModifiedFunc>(ExternalDictionaryLibraryAPI::LIBRARY_IS_MODIFIED_FUNC_NAME);
if (func_is_modified)
return func_is_modified(lib_data, &settings_holder->strings);
return true;
}
bool ExternalDictionaryLibraryHandler::supportsSelectiveLoad()
{
auto func_supports_selective_load = library->tryGet<ExternalDictionaryLibraryAPI::LibrarySupportsSelectiveLoadFunc>(ExternalDictionaryLibraryAPI::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME);
if (func_supports_selective_load)
return func_supports_selective_load(lib_data, &settings_holder->strings);
return true;
}
Block ExternalDictionaryLibraryHandler::loadAll()
{
auto columns_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(attributes_names.size());
ExternalDictionaryLibraryAPI::CStrings columns{static_cast<decltype(ExternalDictionaryLibraryAPI::CStrings::data)>(columns_holder.get()), attributes_names.size()};
for (size_t i = 0; i < attributes_names.size(); ++i)
columns.data[i] = attributes_names[i].c_str();
auto load_all_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadAllFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_ALL_FUNC_NAME);
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
return dataToBlock(data);
}
Block ExternalDictionaryLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
{
const ExternalDictionaryLibraryAPI::VectorUInt64 ids_data{bit_cast<decltype(ExternalDictionaryLibraryAPI::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(attributes_names.size());
ExternalDictionaryLibraryAPI::CStrings columns_pass{static_cast<decltype(ExternalDictionaryLibraryAPI::CStrings::data)>(columns_holder.get()), attributes_names.size()};
auto load_ids_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadIdsFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_IDS_FUNC_NAME);
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
return dataToBlock(data);
}
Block ExternalDictionaryLibraryHandler::loadKeys(const Columns & key_columns)
{
auto holder = std::make_unique<ExternalDictionaryLibraryAPI::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ExternalDictionaryLibraryAPI::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ExternalDictionaryLibraryAPI::Field[]>(key_columns[i]->size());
for (size_t j = 0; j < key_columns[i]->size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(j);
cell_holder[j] = ExternalDictionaryLibraryAPI::Field{
.data = static_cast<const void *>(data_ref.data),
.size = data_ref.size};
}
holder[i] = ExternalDictionaryLibraryAPI::Row{
.data = static_cast<ExternalDictionaryLibraryAPI::Field *>(cell_holder.get()),
.size = key_columns[i]->size()};
column_data_holders.push_back(std::move(cell_holder));
}
ExternalDictionaryLibraryAPI::Table request_cols{
.data = static_cast<ExternalDictionaryLibraryAPI::Row *>(holder.get()),
.size = key_columns.size()};
auto load_keys_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadKeysFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_KEYS_FUNC_NAME);
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
return dataToBlock(data);
}
Block ExternalDictionaryLibraryHandler::dataToBlock(ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ExternalDictionaryLibraryAPI::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -2,7 +2,7 @@
#include <Common/SharedLibrary.h>
#include <Common/logger_useful.h>
#include "LibraryUtils.h"
#include "ExternalDictionaryLibraryUtils.h"
namespace DB
@ -10,21 +10,21 @@ namespace DB
/// A class that manages all operations with library dictionary.
/// Every library dictionary source has its own object of this class, accessed by UUID.
class SharedLibraryHandler
class ExternalDictionaryLibraryHandler
{
public:
SharedLibraryHandler(
ExternalDictionaryLibraryHandler(
const std::string & library_path_,
const std::vector<std::string> & library_settings,
const Block & sample_block_,
const std::vector<std::string> & attributes_names_);
SharedLibraryHandler(const SharedLibraryHandler & other);
ExternalDictionaryLibraryHandler(const ExternalDictionaryLibraryHandler & other);
SharedLibraryHandler & operator=(const SharedLibraryHandler & other) = delete;
ExternalDictionaryLibraryHandler & operator=(const ExternalDictionaryLibraryHandler & other) = delete;
~SharedLibraryHandler();
~ExternalDictionaryLibraryHandler();
Block loadAll();
@ -39,7 +39,7 @@ public:
const Block & getSampleBlock() { return sample_block; }
private:
Block dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data);
Block dataToBlock(ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data);
std::string library_path;
const Block sample_block;
@ -50,6 +50,6 @@ private:
void * lib_data;
};
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
using SharedLibraryHandlerPtr = std::shared_ptr<ExternalDictionaryLibraryHandler>;
}

View File

@ -0,0 +1,62 @@
#include "ExternalDictionaryLibraryHandlerFactory.h"
namespace DB
{
SharedLibraryHandlerPtr ExternalDictionaryLibraryHandlerFactory::get(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
auto library_handler = library_handlers.find(dictionary_id);
if (library_handler != library_handlers.end())
return library_handler->second;
return nullptr;
}
void ExternalDictionaryLibraryHandlerFactory::create(
const std::string & dictionary_id,
const std::string & library_path,
const std::vector<std::string> & library_settings,
const Block & sample_block,
const std::vector<std::string> & attributes_names)
{
std::lock_guard lock(mutex);
if (!library_handlers.contains(dictionary_id))
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<ExternalDictionaryLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
else
LOG_WARNING(&Poco::Logger::get("ExternalDictionaryLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
}
bool ExternalDictionaryLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
{
std::lock_guard lock(mutex);
auto from_library_handler = library_handlers.find(from_dictionary_id);
if (from_library_handler == library_handlers.end())
return false;
/// extDict_libClone method will be called in copy constructor
library_handlers[to_dictionary_id] = std::make_shared<ExternalDictionaryLibraryHandler>(*from_library_handler->second);
return true;
}
bool ExternalDictionaryLibraryHandlerFactory::remove(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
/// extDict_libDelete is called in destructor.
return library_handlers.erase(dictionary_id);
}
ExternalDictionaryLibraryHandlerFactory & ExternalDictionaryLibraryHandlerFactory::instance()
{
static ExternalDictionaryLibraryHandlerFactory instance;
return instance;
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include "SharedLibraryHandler.h"
#include "ExternalDictionaryLibraryHandler.h"
#include <base/defines.h>
#include <unordered_map>
@ -11,11 +11,11 @@ namespace DB
{
/// Each library dictionary source has unique UUID. When clone() method is called, a new UUID is generated.
/// There is a unique mapping from diciotnary UUID to sharedLibraryHandler.
class SharedLibraryHandlerFactory final : private boost::noncopyable
/// There is a unique mapping from dictionary UUID to sharedLibraryHandler.
class ExternalDictionaryLibraryHandlerFactory final : private boost::noncopyable
{
public:
static SharedLibraryHandlerFactory & instance();
static ExternalDictionaryLibraryHandlerFactory & instance();
SharedLibraryHandlerPtr get(const std::string & dictionary_id);

View File

@ -5,7 +5,7 @@
#include <base/bit_cast.h>
#include <base/range.h>
#include "LibraryInterface.h"
#include "ExternalDictionaryLibraryAPI.h"
namespace DB
@ -22,7 +22,7 @@ public:
strings_holder = strings_pass;
strings.size = strings_holder.size();
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
ptr_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(strings.size);
strings.data = ptr_holder.get();
size_t i = 0;
@ -33,10 +33,10 @@ public:
}
}
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
ExternalDictionaryLibraryAPI::CStrings strings; // will pass pointer to lib
private:
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
std::unique_ptr<ExternalDictionaryLibraryAPI::CString[]> ptr_holder = nullptr;
Container strings_holder;
};

View File

@ -1,23 +0,0 @@
#include "HandlerFactory.h"
#include <Poco/Net/HTTPServerRequest.h>
#include <Server/HTTP/HTMLForm.h>
#include "Handlers.h"
namespace DB
{
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<LibraryExistsHandler>(keep_alive_timeout, getContext());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());
return nullptr;
}
}

View File

@ -1,37 +0,0 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Common/logger_useful.h>
namespace DB
{
class SharedLibraryHandler;
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
/// Factory for '/ping', '/' handlers.
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
{
public:
LibraryBridgeHandlerFactory(
const std::string & name_,
size_t keep_alive_timeout_,
ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
{
}
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
};
}

View File

@ -1,6 +1,5 @@
#include "LibraryBridge.h"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
{
DB::LibraryBridge app;
@ -15,3 +14,18 @@ int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
return code ? code : 1;
}
}
namespace DB
{
std::string LibraryBridge::bridgeName() const
{
return "LibraryBridge";
}
LibraryBridge::HandlerFactoryPtr LibraryBridge::getHandlerFactoryPtr(ContextPtr context) const
{
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory", keep_alive_timeout, context);
}
}

View File

@ -2,7 +2,7 @@
#include <Interpreters/Context.h>
#include <Bridge/IBridge.h>
#include "HandlerFactory.h"
#include "LibraryBridgeHandlerFactory.h"
namespace DB
@ -12,15 +12,8 @@ class LibraryBridge : public IBridge
{
protected:
std::string bridgeName() const override
{
return "LibraryBridge";
}
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
{
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context);
}
std::string bridgeName() const override;
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override;
};
}

View File

@ -0,0 +1,34 @@
#include "LibraryBridgeHandlerFactory.h"
#include <Poco/Net/HTTPServerRequest.h>
#include <Server/HTTP/HTMLForm.h>
#include "LibraryBridgeHandlers.h"
namespace DB
{
LibraryBridgeHandlerFactory::LibraryBridgeHandlerFactory(
const std::string & name_,
size_t keep_alive_timeout_,
ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
{
}
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_DEBUG(log, "Request URI: {}", uri.toString());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return std::make_unique<LibraryBridgeExistsHandler>(keep_alive_timeout, getContext());
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
return std::make_unique<LibraryBridgeRequestHandler>(keep_alive_timeout, getContext());
return nullptr;
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Common/logger_useful.h>
namespace DB
{
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
{
public:
LibraryBridgeHandlerFactory(
const std::string & name_,
size_t keep_alive_timeout_,
ContextPtr context_);
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
private:
Poco::Logger * log;
const std::string name;
const size_t keep_alive_timeout;
};
}

View File

@ -1,5 +1,5 @@
#include "Handlers.h"
#include "SharedLibraryHandlerFactory.h"
#include "LibraryBridgeHandlers.h"
#include "ExternalDictionaryLibraryHandlerFactory.h"
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
@ -78,8 +78,14 @@ static void writeData(Block data, OutputFormatPtr format)
executor.execute();
}
LibraryBridgeRequestHandler::LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("LibraryBridgeRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
{
}
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void LibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(getContext()->getSettingsRef(), request);
@ -104,8 +110,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
try
{
bool lib_new = (method == "libNew");
if (method == "libClone")
bool lib_new = (method == "extDict_libNew");
if (method == "extDict_libClone")
{
if (!params.has("from_dictionary_id"))
{
@ -115,7 +121,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
std::string from_dictionary_id = params.get("from_dictionary_id");
bool cloned = false;
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
cloned = ExternalDictionaryLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
if (cloned)
{
@ -123,7 +129,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
else
{
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead", from_dictionary_id);
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call extDict_libNew instead", from_dictionary_id);
lib_new = true;
}
}
@ -138,13 +144,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
return;
}
std::string library_path = params.get("library_path");
if (!params.has("library_settings"))
{
processError(response, "No 'library_settings' in request URL");
return;
}
std::string library_path = params.get("library_path");
const auto & settings_string = params.get("library_settings");
LOG_DEBUG(log, "Parsing library settings from binary string");
@ -197,12 +204,12 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
ExternalDictionaryLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
writeStringBinary("1", out);
}
else if (method == "libDelete")
else if (method == "extDict_libDelete")
{
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
bool deleted = ExternalDictionaryLibraryHandlerFactory::instance().remove(dictionary_id);
/// Do not throw, a warning is ok.
if (!deleted)
@ -210,57 +217,57 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
writeStringBinary("1", out);
}
else if (method == "isModified")
else if (method == "extDict_isModified")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->isModified();
writeStringBinary(std::to_string(res), out);
}
else if (method == "supportsSelectiveLoad")
else if (method == "extDict_supportsSelectiveLoad")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
bool res = library_handler->supportsSelectiveLoad();
writeStringBinary(std::to_string(res), out);
}
else if (method == "loadAll")
else if (method == "extDict_loadAll")
{
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
LOG_DEBUG(log, "Calling extDict_loadAll() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadAll();
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
}
else if (method == "loadIds")
else if (method == "extDict_loadIds")
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
String ids_string;
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
LOG_DEBUG(log, "Calling extDict_loadIds() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
}
else if (method == "loadKeys")
else if (method == "extDict_loadKeys")
{
if (!params.has("requested_block_sample"))
{
@ -289,18 +296,22 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
Block block;
executor.pull(block);
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
LOG_DEBUG(log, "Calling extDict_loadKeys() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
writeData(std::move(input), std::move(output));
}
else
{
LOG_WARNING(log, "Unknown library method: '{}'", method);
}
}
catch (...)
{
@ -329,8 +340,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
}
}
LibraryBridgeExistsHandler::LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("LibraryBridgeExistsHandler"))
{
}
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
void LibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
try
{
@ -344,15 +361,12 @@ void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServer
}
std::string dictionary_id = params.get("dictionary_id");
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
String res;
if (library_handler)
res = "1";
else
res = "0";
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
String res = library_handler ? "1" : "0";
setResponseDefaultHeaders(response, keep_alive_timeout);
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
LOG_TRACE(log, "Sending ping response: {} (dictionary id: {})", res, dictionary_id);
response.sendBuffer(res.data(), res.size());
}
catch (...)

View File

@ -3,7 +3,7 @@
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Common/logger_useful.h>
#include "SharedLibraryHandler.h"
#include "ExternalDictionaryLibraryHandler.h"
namespace DB
@ -11,23 +11,16 @@ namespace DB
/// Handler for requests to Library Dictionary Source, returns response in RowBinary format.
/// When a library dictionary source is created, it sends libNew request to library bridge (which is started on first
/// When a library dictionary source is created, it sends 'extDict_libNew' request to library bridge (which is started on first
/// request to it, if it was not yet started). On this request a new sharedLibrayHandler is added to a
/// sharedLibraryHandlerFactory by a dictionary uuid. With libNew request come: library_path, library_settings,
/// sharedLibraryHandlerFactory by a dictionary uuid. With 'extDict_libNew' request come: library_path, library_settings,
/// names of dictionary attributes, sample block to parse block of null values, block of null values. Everything is
/// passed in binary format and is urlencoded. When dictionary is cloned, a new handler is created.
/// Each handler is unique to dictionary.
class LibraryRequestHandler : public HTTPRequestHandler, WithContext
class LibraryBridgeRequestHandler : public HTTPRequestHandler, WithContext
{
public:
LibraryRequestHandler(
size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
, keep_alive_timeout(keep_alive_timeout_)
{
}
LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
@ -39,22 +32,16 @@ private:
};
class LibraryExistsHandler : public HTTPRequestHandler, WithContext
class LibraryBridgeExistsHandler : public HTTPRequestHandler, WithContext
{
public:
explicit LibraryExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, keep_alive_timeout(keep_alive_timeout_)
, log(&Poco::Logger::get("LibraryRequestHandler"))
{
}
LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
const size_t keep_alive_timeout;
Poco::Logger * log;
};
}

View File

@ -1,110 +0,0 @@
#pragma once
#include <cstdint>
#include <string>
#define CLICKHOUSE_DICTIONARY_LIBRARY_API 1
namespace ClickHouseLibrary
{
using CString = const char *;
using ColumnName = CString;
using ColumnNames = ColumnName[];
struct CStrings
{
CString * data = nullptr;
uint64_t size = 0;
};
struct VectorUInt64
{
const uint64_t * data = nullptr;
uint64_t size = 0;
};
struct ColumnsUInt64
{
VectorUInt64 * data = nullptr;
uint64_t size = 0;
};
struct Field
{
const void * data = nullptr;
uint64_t size = 0;
};
struct Row
{
const Field * data = nullptr;
uint64_t size = 0;
};
struct Table
{
const Row * data = nullptr;
uint64_t size = 0;
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
const char * error_string = nullptr;
};
enum LogLevel
{
FATAL = 1,
CRITICAL,
ERROR,
WARNING,
NOTICE,
INFORMATION,
DEBUG,
TRACE,
};
void log(LogLevel level, CString msg);
extern std::string_view LIBRARY_CREATE_NEW_FUNC_NAME;
extern std::string_view LIBRARY_CLONE_FUNC_NAME;
extern std::string_view LIBRARY_DELETE_FUNC_NAME;
extern std::string_view LIBRARY_DATA_NEW_FUNC_NAME;
extern std::string_view LIBRARY_DATA_DELETE_FUNC_NAME;
extern std::string_view LIBRARY_LOAD_ALL_FUNC_NAME;
extern std::string_view LIBRARY_LOAD_IDS_FUNC_NAME;
extern std::string_view LIBRARY_LOAD_KEYS_FUNC_NAME;
extern std::string_view LIBRARY_IS_MODIFIED_FUNC_NAME;
extern std::string_view LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME;
using LibraryContext = void *;
using LibraryLoggerFunc = void (*)(LogLevel, CString /* message */);
using LibrarySettings = CStrings *;
using LibraryNewFunc = LibraryContext (*)(LibrarySettings, LibraryLoggerFunc);
using LibraryCloneFunc = LibraryContext (*)(LibraryContext);
using LibraryDeleteFunc = void (*)(LibraryContext);
using LibraryData = void *;
using LibraryDataNewFunc = LibraryData (*)(LibraryContext);
using LibraryDataDeleteFunc = void (*)(LibraryContext, LibraryData);
/// Can be safely casted into const Table * with static_cast<const ClickHouseLibrary::Table *>
using RawClickHouseLibraryTable = void *;
using RequestedColumnsNames = CStrings *;
using LibraryLoadAllFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames);
using RequestedIds = const VectorUInt64 *;
using LibraryLoadIdsFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames, RequestedIds);
using RequestedKeys = Table *;
/// There are no requested column names for load keys func
using LibraryLoadKeysFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedKeys);
using LibraryIsModifiedFunc = bool (*)(LibraryContext, LibrarySettings);
using LibrarySupportsSelectiveLoadFunc = bool (*)(LibraryContext, LibrarySettings);
}

View File

@ -1,214 +0,0 @@
#include "SharedLibraryHandler.h"
#include <base/scope_guard.h>
#include <base/bit_cast.h>
#include <base/find_symbols.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
SharedLibraryHandler::SharedLibraryHandler(
const std::string & library_path_,
const std::vector<std::string> & library_settings,
const Block & sample_block_,
const std::vector<std::string> & attributes_names_)
: library_path(library_path_)
, sample_block(sample_block_)
, attributes_names(attributes_names_)
{
library = std::make_shared<SharedLibrary>(library_path, RTLD_LAZY);
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(library_settings));
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
else
throw Exception("Method libNew failed", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
}
SharedLibraryHandler::SharedLibraryHandler(const SharedLibraryHandler & other)
: library_path{other.library_path}
, sample_block{other.sample_block}
, attributes_names{other.attributes_names}
, library{other.library}
, settings_holder{other.settings_holder}
{
auto lib_clone = library->tryGet<ClickHouseLibrary::LibraryCloneFunc>(ClickHouseLibrary::LIBRARY_CLONE_FUNC_NAME);
if (lib_clone)
{
lib_data = lib_clone(other.lib_data);
}
else
{
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
if (lib_new)
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
}
}
SharedLibraryHandler::~SharedLibraryHandler()
{
auto lib_delete = library->tryGet<ClickHouseLibrary::LibraryDeleteFunc>(ClickHouseLibrary::LIBRARY_DELETE_FUNC_NAME);
if (lib_delete)
lib_delete(lib_data);
}
bool SharedLibraryHandler::isModified()
{
auto func_is_modified = library->tryGet<ClickHouseLibrary::LibraryIsModifiedFunc>(ClickHouseLibrary::LIBRARY_IS_MODIFIED_FUNC_NAME);
if (func_is_modified)
return func_is_modified(lib_data, &settings_holder->strings);
return true;
}
bool SharedLibraryHandler::supportsSelectiveLoad()
{
auto func_supports_selective_load = library->tryGet<ClickHouseLibrary::LibrarySupportsSelectiveLoadFunc>(ClickHouseLibrary::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME);
if (func_supports_selective_load)
return func_supports_selective_load(lib_data, &settings_holder->strings);
return true;
}
Block SharedLibraryHandler::loadAll()
{
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
for (size_t i = 0; i < attributes_names.size(); ++i)
columns.data[i] = attributes_names[i].c_str();
auto load_all_func = library->get<ClickHouseLibrary::LibraryLoadAllFunc>(ClickHouseLibrary::LIBRARY_LOAD_ALL_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
return dataToBlock(data);
}
Block SharedLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
{
const ClickHouseLibrary::VectorUInt64 ids_data{bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
auto load_ids_func = library->get<ClickHouseLibrary::LibraryLoadIdsFunc>(ClickHouseLibrary::LIBRARY_LOAD_IDS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
return dataToBlock(data);
}
Block SharedLibraryHandler::loadKeys(const Columns & key_columns)
{
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
for (size_t i = 0; i < key_columns.size(); ++i)
{
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(key_columns[i]->size());
for (size_t j = 0; j < key_columns[i]->size(); ++j)
{
auto data_ref = key_columns[i]->getDataAt(j);
cell_holder[j] = ClickHouseLibrary::Field{
.data = static_cast<const void *>(data_ref.data),
.size = data_ref.size};
}
holder[i] = ClickHouseLibrary::Row{
.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()),
.size = key_columns[i]->size()};
column_data_holders.push_back(std::move(cell_holder));
}
ClickHouseLibrary::Table request_cols{
.data = static_cast<ClickHouseLibrary::Row *>(holder.get()),
.size = key_columns.size()};
auto load_keys_func = library->get<ClickHouseLibrary::LibraryLoadKeysFunc>(ClickHouseLibrary::LIBRARY_LOAD_KEYS_FUNC_NAME);
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
return dataToBlock(data);
}
Block SharedLibraryHandler::dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data)
{
if (!data)
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
if (columns_received->error_code)
throw Exception(
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
MutableColumns columns = sample_block.cloneEmptyColumns();
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
{
if (columns.size() != columns_received->data[col_n].size)
throw Exception(
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
{
const auto & field = columns_received->data[col_n].data[row_n];
if (!field.data)
{
/// sample_block contains null_value (from config) inside corresponding column
const auto & col = sample_block.getByPosition(row_n);
columns[row_n]->insertFrom(*(col.column), 0);
}
else
{
const auto & size = field.size;
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
}
}
}
return sample_block.cloneWithColumns(std::move(columns));
}
}

View File

@ -1,62 +0,0 @@
#include "SharedLibraryHandlerFactory.h"
namespace DB
{
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
auto library_handler = library_handlers.find(dictionary_id);
if (library_handler != library_handlers.end())
return library_handler->second;
return nullptr;
}
void SharedLibraryHandlerFactory::create(
const std::string & dictionary_id,
const std::string & library_path,
const std::vector<std::string> & library_settings,
const Block & sample_block,
const std::vector<std::string> & attributes_names)
{
std::lock_guard lock(mutex);
if (!library_handlers.contains(dictionary_id))
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
else
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
}
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
{
std::lock_guard lock(mutex);
auto from_library_handler = library_handlers.find(from_dictionary_id);
if (from_library_handler == library_handlers.end())
return false;
/// libClone method will be called in copy constructor
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
return true;
}
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
{
std::lock_guard lock(mutex);
/// libDelete is called in destructor.
return library_handlers.erase(dictionary_id);
}
SharedLibraryHandlerFactory & SharedLibraryHandlerFactory::instance()
{
static SharedLibraryHandlerFactory ret;
return ret;
}
}

View File

@ -1,3 +1,2 @@
int mainEntryClickHouseLibraryBridge(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseLibraryBridge(argc_, argv_); }

View File

@ -2,17 +2,17 @@ include(${ClickHouse_SOURCE_DIR}/cmake/split_debug_symbols.cmake)
set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
ColumnInfoHandler.cpp
getIdentifierQuote.cpp
HandlerFactory.cpp
IdentifierQuoteHandler.cpp
MainHandler.cpp
ODBCBlockInputStream.cpp
ODBCBlockOutputStream.cpp
ODBCBridge.cpp
ODBCHandlerFactory.cpp
PingHandler.cpp
SchemaAllowedHandler.cpp
validateODBCConnectionString.cpp
getIdentifierQuote.cpp
odbc-bridge.cpp
validateODBCConnectionString.cpp
)
if (OS_LINUX)

View File

@ -1,6 +1,5 @@
#include "ODBCBridge.h"
#pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
{
DB::ODBCBridge app;
@ -15,3 +14,18 @@ int mainEntryClickHouseODBCBridge(int argc, char ** argv)
return code ? code : 1;
}
}
namespace DB
{
std::string ODBCBridge::bridgeName() const
{
return "ODBCBridge";
}
ODBCBridge::HandlerFactoryPtr ODBCBridge::getHandlerFactoryPtr(ContextPtr context) const
{
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
}
}

View File

@ -3,7 +3,7 @@
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Bridge/IBridge.h>
#include "HandlerFactory.h"
#include "ODBCHandlerFactory.h"
namespace DB
@ -13,14 +13,7 @@ class ODBCBridge : public IBridge
{
protected:
std::string bridgeName() const override
{
return "ODBCBridge";
}
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
{
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
}
std::string bridgeName() const override;
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override;
};
}

View File

@ -1,4 +1,4 @@
#include "HandlerFactory.h"
#include "ODBCHandlerFactory.h"
#include "PingHandler.h"
#include "ColumnInfoHandler.h"
#include <Common/config.h>
@ -9,6 +9,14 @@
namespace DB
{
ODBCBridgeHandlerFactory::ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
{
}
std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};

View File

@ -17,13 +17,7 @@ namespace DB
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
{
public:
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
: WithContext(context_)
, log(&Poco::Logger::get(name_))
, name(name_)
, keep_alive_timeout(keep_alive_timeout_)
{
}
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_);
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;

View File

@ -41,7 +41,6 @@ protected:
/// Check bridge is running. Can also check something else in the mean time.
virtual bool bridgeHandShake() = 0;
/// clickhouse-odbc-bridge, clickhouse-library-bridge
virtual String serviceAlias() const = 0;
virtual String serviceFileName() const = 0;

View File

@ -38,10 +38,10 @@ LibraryBridgeHelper::LibraryBridgeHelper(
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, library_data(library_data_)
, dictionary_id(dictionary_id_)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
{
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
}
@ -91,12 +91,12 @@ bool LibraryBridgeHelper::bridgeHandShake()
* 2. Bridge crashed or restarted for some reason while server did not.
**/
if (result.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.", result);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check that bridge and server have the same version.", result);
UInt8 dictionary_id_exists;
auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check that bridge and server have the same version.",
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
@ -113,7 +113,7 @@ bool LibraryBridgeHelper::bridgeHandShake()
bool reinitialized = false;
try
{
auto uri = createRequestURI(LIB_NEW_METHOD);
auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
reinitialized = executeRequest(uri, getInitLibraryCallback());
}
catch (...)
@ -153,7 +153,7 @@ ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCa
bool LibraryBridgeHelper::initLibrary()
{
startBridgeSync();
auto uri = createRequestURI(LIB_NEW_METHOD);
auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
library_initialized = executeRequest(uri, getInitLibraryCallback());
return library_initialized;
}
@ -162,7 +162,7 @@ bool LibraryBridgeHelper::initLibrary()
bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
{
startBridgeSync();
auto uri = createRequestURI(LIB_CLONE_METHOD);
auto uri = createRequestURI(EXT_DICT_LIB_CLONE_METHOD);
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
/// We also pass initialization settings in order to create a library handler
/// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
@ -177,7 +177,7 @@ bool LibraryBridgeHelper::removeLibrary()
/// because in this case after restart it will not have this dictionaty id in memory anyway.
if (bridgeHandShake())
{
auto uri = createRequestURI(LIB_DELETE_METHOD);
auto uri = createRequestURI(EXT_DICT_LIB_DELETE_METHOD);
return executeRequest(uri);
}
return true;
@ -187,7 +187,7 @@ bool LibraryBridgeHelper::removeLibrary()
bool LibraryBridgeHelper::isModified()
{
startBridgeSync();
auto uri = createRequestURI(IS_MODIFIED_METHOD);
auto uri = createRequestURI(EXT_DICT_IS_MODIFIED_METHOD);
return executeRequest(uri);
}
@ -195,7 +195,7 @@ bool LibraryBridgeHelper::isModified()
bool LibraryBridgeHelper::supportsSelectiveLoad()
{
startBridgeSync();
auto uri = createRequestURI(SUPPORTS_SELECTIVE_LOAD_METHOD);
auto uri = createRequestURI(EXT_DICT_SUPPORTS_SELECTIVE_LOAD_METHOD);
return executeRequest(uri);
}
@ -203,7 +203,7 @@ bool LibraryBridgeHelper::supportsSelectiveLoad()
QueryPipeline LibraryBridgeHelper::loadAll()
{
startBridgeSync();
auto uri = createRequestURI(LOAD_ALL_METHOD);
auto uri = createRequestURI(EXT_DICT_LOAD_ALL_METHOD);
return QueryPipeline(loadBase(uri));
}
@ -211,7 +211,7 @@ QueryPipeline LibraryBridgeHelper::loadAll()
QueryPipeline LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
auto uri = createRequestURI(EXT_DICT_LOAD_IDS_METHOD);
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
auto ids_string = getDictIdsString(ids);
return QueryPipeline(loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; }));
@ -221,7 +221,7 @@ QueryPipeline LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
QueryPipeline LibraryBridgeHelper::loadKeys(const Block & requested_block)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_KEYS_METHOD);
auto uri = createRequestURI(EXT_DICT_LOAD_KEYS_METHOD);
/// Sample block to parse block from callback
uri.addQueryParameter("requested_block_sample", requested_block.getNamesAndTypesList().toString());
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)

View File

@ -45,10 +45,6 @@ public:
QueryPipeline loadKeys(const Block & requested_block);
QueryPipeline loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
LibraryInitData getLibraryData() const { return library_data; }
protected:
@ -74,18 +70,22 @@ protected:
Poco::URI createBaseURI() const override;
QueryPipeline loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
private:
static constexpr inline auto LIB_NEW_METHOD = "libNew";
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
static constexpr inline auto LIB_DELETE_METHOD = "libDelete";
static constexpr inline auto LOAD_ALL_METHOD = "loadAll";
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
static constexpr inline auto EXT_DICT_LIB_NEW_METHOD = "extDict_libNew";
static constexpr inline auto EXT_DICT_LIB_CLONE_METHOD = "extDict_libClone";
static constexpr inline auto EXT_DICT_LIB_DELETE_METHOD = "extDict_libDelete";
static constexpr inline auto EXT_DICT_LOAD_ALL_METHOD = "extDict_loadAll";
static constexpr inline auto EXT_DICT_LOAD_IDS_METHOD = "extDict_loadIds";
static constexpr inline auto EXT_DICT_LOAD_KEYS_METHOD = "extDict_loadKeys";
static constexpr inline auto EXT_DICT_IS_MODIFIED_METHOD = "extDict_isModified";
static constexpr inline auto PING = "ping";
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
static constexpr inline auto EXT_DICT_SUPPORTS_SELECTIVE_LOAD_METHOD = "extDict_supportsSelectiveLoad";
Poco::URI createRequestURI(const String & method) const;

View File

@ -12,8 +12,6 @@
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/registerDictionaries.h>
namespace fs = std::filesystem;
namespace DB
{
@ -44,6 +42,8 @@ LibraryDictionarySource::LibraryDictionarySource(
if (created_from_ddl && !fileOrSymlinkPathStartsWith(path, dictionaries_lib_path))
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, dictionaries_lib_path);
namespace fs = std::filesystem;
if (!fs::exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);