mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #39904 from ClickHouse/library-bridge-refactoring
Prepare library-bridge for catboost integration
This commit is contained in:
commit
ad0d060dc1
@ -49,15 +49,13 @@ option (ENABLE_CLICKHOUSE_OBFUSCATOR "Table data obfuscator (convert real data t
|
||||
# https://clickhouse.com/docs/en/operations/utilities/odbc-bridge/
|
||||
# TODO Also needs NANODBC.
|
||||
if (ENABLE_ODBC AND NOT USE_MUSL)
|
||||
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver"
|
||||
${ENABLE_CLICKHOUSE_ALL})
|
||||
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver" ${ENABLE_CLICKHOUSE_ALL})
|
||||
else ()
|
||||
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "HTTP-server working like a proxy to ODBC driver" OFF)
|
||||
endif ()
|
||||
|
||||
if (NOT USE_MUSL)
|
||||
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source"
|
||||
${ENABLE_CLICKHOUSE_ALL})
|
||||
option (ENABLE_CLICKHOUSE_LIBRARY_BRIDGE "HTTP-server working like a proxy to Library dictionary source" ${ENABLE_CLICKHOUSE_ALL})
|
||||
endif ()
|
||||
|
||||
# https://presentations.clickhouse.com/matemarketing_2020/
|
||||
|
@ -1,13 +1,13 @@
|
||||
include(${ClickHouse_SOURCE_DIR}/cmake/split_debug_symbols.cmake)
|
||||
|
||||
set (CLICKHOUSE_LIBRARY_BRIDGE_SOURCES
|
||||
library-bridge.cpp
|
||||
LibraryInterface.cpp
|
||||
ExternalDictionaryLibraryAPI.cpp
|
||||
ExternalDictionaryLibraryHandler.cpp
|
||||
ExternalDictionaryLibraryHandlerFactory.cpp
|
||||
LibraryBridge.cpp
|
||||
Handlers.cpp
|
||||
HandlerFactory.cpp
|
||||
SharedLibraryHandler.cpp
|
||||
SharedLibraryHandlerFactory.cpp
|
||||
LibraryBridgeHandlerFactory.cpp
|
||||
LibraryBridgeHandlers.cpp
|
||||
library-bridge.cpp
|
||||
)
|
||||
|
||||
if (OS_LINUX)
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include "LibraryInterface.h"
|
||||
#include "ExternalDictionaryLibraryAPI.h"
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
@ -7,24 +7,7 @@ namespace
|
||||
const char DICT_LOGGER_NAME[] = "LibraryDictionarySourceExternal";
|
||||
}
|
||||
|
||||
namespace ClickHouseLibrary
|
||||
{
|
||||
|
||||
std::string_view LIBRARY_CREATE_NEW_FUNC_NAME = "ClickHouseDictionary_v3_libNew";
|
||||
std::string_view LIBRARY_CLONE_FUNC_NAME = "ClickHouseDictionary_v3_libClone";
|
||||
std::string_view LIBRARY_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_libDelete";
|
||||
|
||||
std::string_view LIBRARY_DATA_NEW_FUNC_NAME = "ClickHouseDictionary_v3_dataNew";
|
||||
std::string_view LIBRARY_DATA_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_dataDelete";
|
||||
|
||||
std::string_view LIBRARY_LOAD_ALL_FUNC_NAME = "ClickHouseDictionary_v3_loadAll";
|
||||
std::string_view LIBRARY_LOAD_IDS_FUNC_NAME = "ClickHouseDictionary_v3_loadIds";
|
||||
std::string_view LIBRARY_LOAD_KEYS_FUNC_NAME = "ClickHouseDictionary_v3_loadKeys";
|
||||
|
||||
std::string_view LIBRARY_IS_MODIFIED_FUNC_NAME = "ClickHouseDictionary_v3_isModified";
|
||||
std::string_view LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME = "ClickHouseDictionary_v3_supportsSelectiveLoad";
|
||||
|
||||
void log(LogLevel level, CString msg)
|
||||
void ExternalDictionaryLibraryAPI::log(LogLevel level, CString msg)
|
||||
{
|
||||
auto & logger = Poco::Logger::get(DICT_LOGGER_NAME);
|
||||
switch (level)
|
||||
@ -63,5 +46,3 @@ void log(LogLevel level, CString msg)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
106
programs/library-bridge/ExternalDictionaryLibraryAPI.h
Normal file
106
programs/library-bridge/ExternalDictionaryLibraryAPI.h
Normal file
@ -0,0 +1,106 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
#define CLICKHOUSE_DICTIONARY_LIBRARY_API 1
|
||||
|
||||
struct ExternalDictionaryLibraryAPI
|
||||
{
|
||||
using CString = const char *;
|
||||
using ColumnName = CString;
|
||||
using ColumnNames = ColumnName[];
|
||||
|
||||
struct CStrings
|
||||
{
|
||||
CString * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct VectorUInt64
|
||||
{
|
||||
const uint64_t * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct ColumnsUInt64
|
||||
{
|
||||
VectorUInt64 * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Field
|
||||
{
|
||||
const void * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Row
|
||||
{
|
||||
const Field * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Table
|
||||
{
|
||||
const Row * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
|
||||
const char * error_string = nullptr;
|
||||
};
|
||||
|
||||
enum LogLevel
|
||||
{
|
||||
FATAL = 1,
|
||||
CRITICAL,
|
||||
ERROR,
|
||||
WARNING,
|
||||
NOTICE,
|
||||
INFORMATION,
|
||||
DEBUG,
|
||||
TRACE,
|
||||
};
|
||||
|
||||
static void log(LogLevel level, CString msg);
|
||||
|
||||
using LibraryContext = void *;
|
||||
using LibraryLoggerFunc = void (*)(LogLevel, CString /* message */);
|
||||
using LibrarySettings = CStrings *;
|
||||
using LibraryData = void *;
|
||||
using RawClickHouseLibraryTable = void *;
|
||||
/// Can be safely casted into const Table * with static_cast<const ClickHouseLibrary::Table *>
|
||||
using RequestedColumnsNames = CStrings *;
|
||||
using RequestedIds = const VectorUInt64 *;
|
||||
using RequestedKeys = Table *;
|
||||
|
||||
using LibraryNewFunc = LibraryContext (*)(LibrarySettings, LibraryLoggerFunc);
|
||||
static constexpr const char * LIBRARY_CREATE_NEW_FUNC_NAME = "ClickHouseDictionary_v3_libNew";
|
||||
|
||||
using LibraryCloneFunc = LibraryContext (*)(LibraryContext);
|
||||
static constexpr const char * LIBRARY_CLONE_FUNC_NAME = "ClickHouseDictionary_v3_libClone";
|
||||
|
||||
using LibraryDeleteFunc = void (*)(LibraryContext);
|
||||
static constexpr const char * LIBRARY_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_libDelete";
|
||||
|
||||
using LibraryDataNewFunc = LibraryData (*)(LibraryContext);
|
||||
static constexpr const char * LIBRARY_DATA_NEW_FUNC_NAME = "ClickHouseDictionary_v3_dataNew";
|
||||
|
||||
using LibraryDataDeleteFunc = void (*)(LibraryContext, LibraryData);
|
||||
static constexpr const char * LIBRARY_DATA_DELETE_FUNC_NAME = "ClickHouseDictionary_v3_dataDelete";
|
||||
|
||||
using LibraryLoadAllFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames);
|
||||
static constexpr const char * LIBRARY_LOAD_ALL_FUNC_NAME = "ClickHouseDictionary_v3_loadAll";
|
||||
|
||||
using LibraryLoadIdsFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames, RequestedIds);
|
||||
static constexpr const char * LIBRARY_LOAD_IDS_FUNC_NAME = "ClickHouseDictionary_v3_loadIds";
|
||||
|
||||
/// There are no requested column names for load keys func
|
||||
using LibraryLoadKeysFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedKeys);
|
||||
static constexpr const char * LIBRARY_LOAD_KEYS_FUNC_NAME = "ClickHouseDictionary_v3_loadKeys";
|
||||
|
||||
using LibraryIsModifiedFunc = bool (*)(LibraryContext, LibrarySettings);
|
||||
static constexpr const char * LIBRARY_IS_MODIFIED_FUNC_NAME = "ClickHouseDictionary_v3_isModified";
|
||||
|
||||
using LibrarySupportsSelectiveLoadFunc = bool (*)(LibraryContext, LibrarySettings);
|
||||
static constexpr const char * LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME = "ClickHouseDictionary_v3_supportsSelectiveLoad";
|
||||
};
|
214
programs/library-bridge/ExternalDictionaryLibraryHandler.cpp
Normal file
214
programs/library-bridge/ExternalDictionaryLibraryHandler.cpp
Normal file
@ -0,0 +1,214 @@
|
||||
#include "ExternalDictionaryLibraryHandler.h"
|
||||
|
||||
#include <base/scope_guard.h>
|
||||
#include <base/bit_cast.h>
|
||||
#include <base/find_symbols.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int EXTERNAL_LIBRARY_ERROR;
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
ExternalDictionaryLibraryHandler::ExternalDictionaryLibraryHandler(
|
||||
const std::string & library_path_,
|
||||
const std::vector<std::string> & library_settings,
|
||||
const Block & sample_block_,
|
||||
const std::vector<std::string> & attributes_names_)
|
||||
: library_path(library_path_)
|
||||
, sample_block(sample_block_)
|
||||
, attributes_names(attributes_names_)
|
||||
{
|
||||
library = std::make_shared<SharedLibrary>(library_path);
|
||||
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(library_settings));
|
||||
|
||||
auto lib_new = library->tryGet<ExternalDictionaryLibraryAPI::LibraryNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CREATE_NEW_FUNC_NAME);
|
||||
|
||||
if (lib_new)
|
||||
lib_data = lib_new(&settings_holder->strings, ExternalDictionaryLibraryAPI::log);
|
||||
else
|
||||
throw Exception("Method extDict_libNew failed", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
}
|
||||
|
||||
|
||||
ExternalDictionaryLibraryHandler::ExternalDictionaryLibraryHandler(const ExternalDictionaryLibraryHandler & other)
|
||||
: library_path{other.library_path}
|
||||
, sample_block{other.sample_block}
|
||||
, attributes_names{other.attributes_names}
|
||||
, library{other.library}
|
||||
, settings_holder{other.settings_holder}
|
||||
{
|
||||
|
||||
auto lib_clone = library->tryGet<ExternalDictionaryLibraryAPI::LibraryCloneFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CLONE_FUNC_NAME);
|
||||
|
||||
if (lib_clone)
|
||||
{
|
||||
lib_data = lib_clone(other.lib_data);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lib_new = library->tryGet<ExternalDictionaryLibraryAPI::LibraryNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_CREATE_NEW_FUNC_NAME);
|
||||
|
||||
if (lib_new)
|
||||
lib_data = lib_new(&settings_holder->strings, ExternalDictionaryLibraryAPI::log);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ExternalDictionaryLibraryHandler::~ExternalDictionaryLibraryHandler()
|
||||
{
|
||||
auto lib_delete = library->tryGet<ExternalDictionaryLibraryAPI::LibraryDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DELETE_FUNC_NAME);
|
||||
|
||||
if (lib_delete)
|
||||
lib_delete(lib_data);
|
||||
}
|
||||
|
||||
|
||||
bool ExternalDictionaryLibraryHandler::isModified()
|
||||
{
|
||||
auto func_is_modified = library->tryGet<ExternalDictionaryLibraryAPI::LibraryIsModifiedFunc>(ExternalDictionaryLibraryAPI::LIBRARY_IS_MODIFIED_FUNC_NAME);
|
||||
|
||||
if (func_is_modified)
|
||||
return func_is_modified(lib_data, &settings_holder->strings);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ExternalDictionaryLibraryHandler::supportsSelectiveLoad()
|
||||
{
|
||||
auto func_supports_selective_load = library->tryGet<ExternalDictionaryLibraryAPI::LibrarySupportsSelectiveLoadFunc>(ExternalDictionaryLibraryAPI::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME);
|
||||
|
||||
if (func_supports_selective_load)
|
||||
return func_supports_selective_load(lib_data, &settings_holder->strings);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Block ExternalDictionaryLibraryHandler::loadAll()
|
||||
{
|
||||
auto columns_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(attributes_names.size());
|
||||
ExternalDictionaryLibraryAPI::CStrings columns{static_cast<decltype(ExternalDictionaryLibraryAPI::CStrings::data)>(columns_holder.get()), attributes_names.size()};
|
||||
for (size_t i = 0; i < attributes_names.size(); ++i)
|
||||
columns.data[i] = attributes_names[i].c_str();
|
||||
|
||||
auto load_all_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadAllFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_ALL_FUNC_NAME);
|
||||
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block ExternalDictionaryLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
|
||||
{
|
||||
const ExternalDictionaryLibraryAPI::VectorUInt64 ids_data{bit_cast<decltype(ExternalDictionaryLibraryAPI::VectorUInt64::data)>(ids.data()), ids.size()};
|
||||
|
||||
auto columns_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(attributes_names.size());
|
||||
ExternalDictionaryLibraryAPI::CStrings columns_pass{static_cast<decltype(ExternalDictionaryLibraryAPI::CStrings::data)>(columns_holder.get()), attributes_names.size()};
|
||||
|
||||
auto load_ids_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadIdsFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_IDS_FUNC_NAME);
|
||||
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block ExternalDictionaryLibraryHandler::loadKeys(const Columns & key_columns)
|
||||
{
|
||||
auto holder = std::make_unique<ExternalDictionaryLibraryAPI::Row[]>(key_columns.size());
|
||||
std::vector<std::unique_ptr<ExternalDictionaryLibraryAPI::Field[]>> column_data_holders;
|
||||
|
||||
for (size_t i = 0; i < key_columns.size(); ++i)
|
||||
{
|
||||
auto cell_holder = std::make_unique<ExternalDictionaryLibraryAPI::Field[]>(key_columns[i]->size());
|
||||
|
||||
for (size_t j = 0; j < key_columns[i]->size(); ++j)
|
||||
{
|
||||
auto data_ref = key_columns[i]->getDataAt(j);
|
||||
|
||||
cell_holder[j] = ExternalDictionaryLibraryAPI::Field{
|
||||
.data = static_cast<const void *>(data_ref.data),
|
||||
.size = data_ref.size};
|
||||
}
|
||||
|
||||
holder[i] = ExternalDictionaryLibraryAPI::Row{
|
||||
.data = static_cast<ExternalDictionaryLibraryAPI::Field *>(cell_holder.get()),
|
||||
.size = key_columns[i]->size()};
|
||||
|
||||
column_data_holders.push_back(std::move(cell_holder));
|
||||
}
|
||||
|
||||
ExternalDictionaryLibraryAPI::Table request_cols{
|
||||
.data = static_cast<ExternalDictionaryLibraryAPI::Row *>(holder.get()),
|
||||
.size = key_columns.size()};
|
||||
|
||||
auto load_keys_func = library->get<ExternalDictionaryLibraryAPI::LibraryLoadKeysFunc>(ExternalDictionaryLibraryAPI::LIBRARY_LOAD_KEYS_FUNC_NAME);
|
||||
auto data_new_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataNewFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ExternalDictionaryLibraryAPI::LibraryDataDeleteFunc>(ExternalDictionaryLibraryAPI::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ExternalDictionaryLibraryAPI::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block ExternalDictionaryLibraryHandler::dataToBlock(ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data)
|
||||
{
|
||||
if (!data)
|
||||
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
|
||||
const auto * columns_received = static_cast<const ExternalDictionaryLibraryAPI::Table *>(data);
|
||||
if (columns_received->error_code)
|
||||
throw Exception(
|
||||
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
|
||||
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
|
||||
MutableColumns columns = sample_block.cloneEmptyColumns();
|
||||
|
||||
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
|
||||
{
|
||||
if (columns.size() != columns_received->data[col_n].size)
|
||||
throw Exception(
|
||||
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
|
||||
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
|
||||
{
|
||||
const auto & field = columns_received->data[col_n].data[row_n];
|
||||
if (!field.data)
|
||||
{
|
||||
/// sample_block contains null_value (from config) inside corresponding column
|
||||
const auto & col = sample_block.getByPosition(row_n);
|
||||
columns[row_n]->insertFrom(*(col.column), 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & size = field.size;
|
||||
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sample_block.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <Common/SharedLibrary.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "LibraryUtils.h"
|
||||
#include "ExternalDictionaryLibraryUtils.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -10,21 +10,21 @@ namespace DB
|
||||
|
||||
/// A class that manages all operations with library dictionary.
|
||||
/// Every library dictionary source has its own object of this class, accessed by UUID.
|
||||
class SharedLibraryHandler
|
||||
class ExternalDictionaryLibraryHandler
|
||||
{
|
||||
|
||||
public:
|
||||
SharedLibraryHandler(
|
||||
ExternalDictionaryLibraryHandler(
|
||||
const std::string & library_path_,
|
||||
const std::vector<std::string> & library_settings,
|
||||
const Block & sample_block_,
|
||||
const std::vector<std::string> & attributes_names_);
|
||||
|
||||
SharedLibraryHandler(const SharedLibraryHandler & other);
|
||||
ExternalDictionaryLibraryHandler(const ExternalDictionaryLibraryHandler & other);
|
||||
|
||||
SharedLibraryHandler & operator=(const SharedLibraryHandler & other) = delete;
|
||||
ExternalDictionaryLibraryHandler & operator=(const ExternalDictionaryLibraryHandler & other) = delete;
|
||||
|
||||
~SharedLibraryHandler();
|
||||
~ExternalDictionaryLibraryHandler();
|
||||
|
||||
Block loadAll();
|
||||
|
||||
@ -39,7 +39,7 @@ public:
|
||||
const Block & getSampleBlock() { return sample_block; }
|
||||
|
||||
private:
|
||||
Block dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data);
|
||||
Block dataToBlock(ExternalDictionaryLibraryAPI::RawClickHouseLibraryTable data);
|
||||
|
||||
std::string library_path;
|
||||
const Block sample_block;
|
||||
@ -50,6 +50,6 @@ private:
|
||||
void * lib_data;
|
||||
};
|
||||
|
||||
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
|
||||
using SharedLibraryHandlerPtr = std::shared_ptr<ExternalDictionaryLibraryHandler>;
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
#include "ExternalDictionaryLibraryHandlerFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
SharedLibraryHandlerPtr ExternalDictionaryLibraryHandlerFactory::get(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto library_handler = library_handlers.find(dictionary_id);
|
||||
|
||||
if (library_handler != library_handlers.end())
|
||||
return library_handler->second;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
void ExternalDictionaryLibraryHandlerFactory::create(
|
||||
const std::string & dictionary_id,
|
||||
const std::string & library_path,
|
||||
const std::vector<std::string> & library_settings,
|
||||
const Block & sample_block,
|
||||
const std::vector<std::string> & attributes_names)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!library_handlers.contains(dictionary_id))
|
||||
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<ExternalDictionaryLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
|
||||
else
|
||||
LOG_WARNING(&Poco::Logger::get("ExternalDictionaryLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
bool ExternalDictionaryLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto from_library_handler = library_handlers.find(from_dictionary_id);
|
||||
|
||||
if (from_library_handler == library_handlers.end())
|
||||
return false;
|
||||
|
||||
/// extDict_libClone method will be called in copy constructor
|
||||
library_handlers[to_dictionary_id] = std::make_shared<ExternalDictionaryLibraryHandler>(*from_library_handler->second);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ExternalDictionaryLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
/// extDict_libDelete is called in destructor.
|
||||
return library_handlers.erase(dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
ExternalDictionaryLibraryHandlerFactory & ExternalDictionaryLibraryHandlerFactory::instance()
|
||||
{
|
||||
static ExternalDictionaryLibraryHandlerFactory instance;
|
||||
return instance;
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "SharedLibraryHandler.h"
|
||||
#include "ExternalDictionaryLibraryHandler.h"
|
||||
#include <base/defines.h>
|
||||
|
||||
#include <unordered_map>
|
||||
@ -11,11 +11,11 @@ namespace DB
|
||||
{
|
||||
|
||||
/// Each library dictionary source has unique UUID. When clone() method is called, a new UUID is generated.
|
||||
/// There is a unique mapping from diciotnary UUID to sharedLibraryHandler.
|
||||
class SharedLibraryHandlerFactory final : private boost::noncopyable
|
||||
/// There is a unique mapping from dictionary UUID to sharedLibraryHandler.
|
||||
class ExternalDictionaryLibraryHandlerFactory final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
static SharedLibraryHandlerFactory & instance();
|
||||
static ExternalDictionaryLibraryHandlerFactory & instance();
|
||||
|
||||
SharedLibraryHandlerPtr get(const std::string & dictionary_id);
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <base/bit_cast.h>
|
||||
#include <base/range.h>
|
||||
|
||||
#include "LibraryInterface.h"
|
||||
#include "ExternalDictionaryLibraryAPI.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -22,7 +22,7 @@ public:
|
||||
strings_holder = strings_pass;
|
||||
strings.size = strings_holder.size();
|
||||
|
||||
ptr_holder = std::make_unique<ClickHouseLibrary::CString[]>(strings.size);
|
||||
ptr_holder = std::make_unique<ExternalDictionaryLibraryAPI::CString[]>(strings.size);
|
||||
strings.data = ptr_holder.get();
|
||||
|
||||
size_t i = 0;
|
||||
@ -33,10 +33,10 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
ClickHouseLibrary::CStrings strings; // will pass pointer to lib
|
||||
ExternalDictionaryLibraryAPI::CStrings strings; // will pass pointer to lib
|
||||
|
||||
private:
|
||||
std::unique_ptr<ClickHouseLibrary::CString[]> ptr_holder = nullptr;
|
||||
std::unique_ptr<ExternalDictionaryLibraryAPI::CString[]> ptr_holder = nullptr;
|
||||
Container strings_holder;
|
||||
};
|
||||
|
@ -1,23 +0,0 @@
|
||||
#include "HandlerFactory.h"
|
||||
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include "Handlers.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
|
||||
{
|
||||
Poco::URI uri{request.getURI()};
|
||||
LOG_DEBUG(log, "Request URI: {}", uri.toString());
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
return std::make_unique<LibraryExistsHandler>(keep_alive_timeout, getContext());
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class SharedLibraryHandler;
|
||||
using SharedLibraryHandlerPtr = std::shared_ptr<SharedLibraryHandler>;
|
||||
|
||||
/// Factory for '/ping', '/' handlers.
|
||||
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
LibraryBridgeHandlerFactory(
|
||||
const std::string & name_,
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
std::string name;
|
||||
size_t keep_alive_timeout;
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,5 @@
|
||||
#include "LibraryBridge.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
||||
int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
|
||||
{
|
||||
DB::LibraryBridge app;
|
||||
@ -15,3 +14,18 @@ int mainEntryClickHouseLibraryBridge(int argc, char ** argv)
|
||||
return code ? code : 1;
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string LibraryBridge::bridgeName() const
|
||||
{
|
||||
return "LibraryBridge";
|
||||
}
|
||||
|
||||
LibraryBridge::HandlerFactoryPtr LibraryBridge::getHandlerFactoryPtr(ContextPtr context) const
|
||||
{
|
||||
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory", keep_alive_timeout, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Bridge/IBridge.h>
|
||||
#include "HandlerFactory.h"
|
||||
#include "LibraryBridgeHandlerFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -12,15 +12,8 @@ class LibraryBridge : public IBridge
|
||||
{
|
||||
|
||||
protected:
|
||||
std::string bridgeName() const override
|
||||
{
|
||||
return "LibraryBridge";
|
||||
}
|
||||
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
|
||||
{
|
||||
return std::make_shared<LibraryBridgeHandlerFactory>("LibraryRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||
}
|
||||
std::string bridgeName() const override;
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
40
programs/library-bridge/LibraryBridgeHandlerFactory.cpp
Normal file
40
programs/library-bridge/LibraryBridgeHandlerFactory.cpp
Normal file
@ -0,0 +1,40 @@
|
||||
#include "LibraryBridgeHandlerFactory.h"
|
||||
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Server/HTTP/HTMLForm.h>
|
||||
#include "LibraryBridgeHandlers.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
LibraryBridgeHandlerFactory::LibraryBridgeHandlerFactory(
|
||||
const std::string & name_,
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> LibraryBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
|
||||
{
|
||||
Poco::URI uri{request.getURI()};
|
||||
LOG_DEBUG(log, "Request URI: {}", uri.toString());
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
{
|
||||
if (uri.getPath() == "/extdict_ping")
|
||||
return std::make_unique<LibraryBridgeExistsHandler>(keep_alive_timeout, getContext());
|
||||
}
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
{
|
||||
if (uri.getPath() == "/extdict_request")
|
||||
return std::make_unique<LibraryBridgeRequestHandler>(keep_alive_timeout, getContext());
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
}
|
27
programs/library-bridge/LibraryBridgeHandlerFactory.h
Normal file
27
programs/library-bridge/LibraryBridgeHandlerFactory.h
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class LibraryBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
LibraryBridgeHandlerFactory(
|
||||
const std::string & name_,
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_);
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
const std::string name;
|
||||
const size_t keep_alive_timeout;
|
||||
};
|
||||
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
#include "Handlers.h"
|
||||
#include "SharedLibraryHandlerFactory.h"
|
||||
#include "LibraryBridgeHandlers.h"
|
||||
#include "ExternalDictionaryLibraryHandlerFactory.h"
|
||||
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
|
||||
@ -78,8 +78,14 @@ static void writeData(Block data, OutputFormatPtr format)
|
||||
executor.execute();
|
||||
}
|
||||
|
||||
LibraryBridgeRequestHandler::LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("LibraryBridgeRequestHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
void LibraryBridgeRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
LOG_TRACE(log, "Request URI: {}", request.getURI());
|
||||
HTMLForm params(getContext()->getSettingsRef(), request);
|
||||
@ -104,8 +110,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
try
|
||||
{
|
||||
bool lib_new = (method == "libNew");
|
||||
if (method == "libClone")
|
||||
bool lib_new = (method == "extDict_libNew");
|
||||
if (method == "extDict_libClone")
|
||||
{
|
||||
if (!params.has("from_dictionary_id"))
|
||||
{
|
||||
@ -115,7 +121,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
std::string from_dictionary_id = params.get("from_dictionary_id");
|
||||
bool cloned = false;
|
||||
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
cloned = ExternalDictionaryLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
|
||||
if (cloned)
|
||||
{
|
||||
@ -123,7 +129,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead", from_dictionary_id);
|
||||
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call extDict_libNew instead", from_dictionary_id);
|
||||
lib_new = true;
|
||||
}
|
||||
}
|
||||
@ -138,13 +144,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
return;
|
||||
}
|
||||
|
||||
std::string library_path = params.get("library_path");
|
||||
|
||||
if (!params.has("library_settings"))
|
||||
{
|
||||
processError(response, "No 'library_settings' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string library_path = params.get("library_path");
|
||||
const auto & settings_string = params.get("library_settings");
|
||||
|
||||
LOG_DEBUG(log, "Parsing library settings from binary string");
|
||||
@ -197,12 +204,12 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
|
||||
|
||||
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
|
||||
ExternalDictionaryLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "libDelete")
|
||||
else if (method == "extDict_libDelete")
|
||||
{
|
||||
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
bool deleted = ExternalDictionaryLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
|
||||
/// Do not throw, a warning is ok.
|
||||
if (!deleted)
|
||||
@ -210,57 +217,57 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "isModified")
|
||||
else if (method == "extDict_isModified")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->isModified();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "supportsSelectiveLoad")
|
||||
else if (method == "extDict_supportsSelectiveLoad")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->supportsSelectiveLoad();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "loadAll")
|
||||
else if (method == "extDict_loadAll")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
|
||||
LOG_DEBUG(log, "Calling extDict_loadAll() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadAll();
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
|
||||
writeData(std::move(input), std::move(output));
|
||||
}
|
||||
else if (method == "loadIds")
|
||||
else if (method == "extDict_loadIds")
|
||||
{
|
||||
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
|
||||
String ids_string;
|
||||
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
|
||||
LOG_DEBUG(log, "Calling extDict_loadIds() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadIds(ids);
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
|
||||
writeData(std::move(input), std::move(output));
|
||||
}
|
||||
else if (method == "loadKeys")
|
||||
else if (method == "extDict_loadKeys")
|
||||
{
|
||||
if (!params.has("requested_block_sample"))
|
||||
{
|
||||
@ -289,18 +296,22 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
Block block;
|
||||
executor.pull(block);
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
|
||||
LOG_DEBUG(log, "Calling extDict_loadKeys() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadKeys(block.getColumns());
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
auto output = FormatFactory::instance().getOutputFormat(FORMAT, out, sample_block, getContext());
|
||||
writeData(std::move(input), std::move(output));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(log, "Unknown library method: '{}'", method);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -329,8 +340,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
}
|
||||
|
||||
LibraryBridgeExistsHandler::LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, log(&Poco::Logger::get("LibraryBridgeExistsHandler"))
|
||||
{
|
||||
}
|
||||
|
||||
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
void LibraryBridgeExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -344,15 +361,12 @@ void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServer
|
||||
}
|
||||
|
||||
std::string dictionary_id = params.get("dictionary_id");
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
String res;
|
||||
if (library_handler)
|
||||
res = "1";
|
||||
else
|
||||
res = "0";
|
||||
auto library_handler = ExternalDictionaryLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
|
||||
String res = library_handler ? "1" : "0";
|
||||
|
||||
setResponseDefaultHeaders(response, keep_alive_timeout);
|
||||
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
|
||||
LOG_TRACE(log, "Sending ping response: {} (dictionary id: {})", res, dictionary_id);
|
||||
response.sendBuffer(res.data(), res.size());
|
||||
}
|
||||
catch (...)
|
@ -3,7 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Server/HTTP/HTTPRequestHandler.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include "SharedLibraryHandler.h"
|
||||
#include "ExternalDictionaryLibraryHandler.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -11,23 +11,16 @@ namespace DB
|
||||
|
||||
|
||||
/// Handler for requests to Library Dictionary Source, returns response in RowBinary format.
|
||||
/// When a library dictionary source is created, it sends libNew request to library bridge (which is started on first
|
||||
/// When a library dictionary source is created, it sends 'extDict_libNew' request to library bridge (which is started on first
|
||||
/// request to it, if it was not yet started). On this request a new sharedLibrayHandler is added to a
|
||||
/// sharedLibraryHandlerFactory by a dictionary uuid. With libNew request come: library_path, library_settings,
|
||||
/// sharedLibraryHandlerFactory by a dictionary uuid. With 'extDict_libNew' request come: library_path, library_settings,
|
||||
/// names of dictionary attributes, sample block to parse block of null values, block of null values. Everything is
|
||||
/// passed in binary format and is urlencoded. When dictionary is cloned, a new handler is created.
|
||||
/// Each handler is unique to dictionary.
|
||||
class LibraryRequestHandler : public HTTPRequestHandler, WithContext
|
||||
class LibraryBridgeRequestHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
|
||||
LibraryRequestHandler(
|
||||
size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
LibraryBridgeRequestHandler(size_t keep_alive_timeout_, ContextPtr context_);
|
||||
|
||||
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
|
||||
|
||||
@ -39,22 +32,16 @@ private:
|
||||
};
|
||||
|
||||
|
||||
class LibraryExistsHandler : public HTTPRequestHandler, WithContext
|
||||
class LibraryBridgeExistsHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
explicit LibraryExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
{
|
||||
}
|
||||
LibraryBridgeExistsHandler(size_t keep_alive_timeout_, ContextPtr context_);
|
||||
|
||||
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
|
||||
|
||||
private:
|
||||
const size_t keep_alive_timeout;
|
||||
Poco::Logger * log;
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -1,110 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
|
||||
#define CLICKHOUSE_DICTIONARY_LIBRARY_API 1
|
||||
|
||||
namespace ClickHouseLibrary
|
||||
{
|
||||
using CString = const char *;
|
||||
using ColumnName = CString;
|
||||
using ColumnNames = ColumnName[];
|
||||
|
||||
struct CStrings
|
||||
{
|
||||
CString * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct VectorUInt64
|
||||
{
|
||||
const uint64_t * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct ColumnsUInt64
|
||||
{
|
||||
VectorUInt64 * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Field
|
||||
{
|
||||
const void * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Row
|
||||
{
|
||||
const Field * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
};
|
||||
|
||||
struct Table
|
||||
{
|
||||
const Row * data = nullptr;
|
||||
uint64_t size = 0;
|
||||
uint64_t error_code = 0; // 0 = ok; !0 = error, with message in error_string
|
||||
const char * error_string = nullptr;
|
||||
};
|
||||
|
||||
enum LogLevel
|
||||
{
|
||||
FATAL = 1,
|
||||
CRITICAL,
|
||||
ERROR,
|
||||
WARNING,
|
||||
NOTICE,
|
||||
INFORMATION,
|
||||
DEBUG,
|
||||
TRACE,
|
||||
};
|
||||
|
||||
void log(LogLevel level, CString msg);
|
||||
|
||||
extern std::string_view LIBRARY_CREATE_NEW_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_CLONE_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_DELETE_FUNC_NAME;
|
||||
|
||||
extern std::string_view LIBRARY_DATA_NEW_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_DATA_DELETE_FUNC_NAME;
|
||||
|
||||
extern std::string_view LIBRARY_LOAD_ALL_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_LOAD_IDS_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_LOAD_KEYS_FUNC_NAME;
|
||||
|
||||
extern std::string_view LIBRARY_IS_MODIFIED_FUNC_NAME;
|
||||
extern std::string_view LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME;
|
||||
|
||||
using LibraryContext = void *;
|
||||
|
||||
using LibraryLoggerFunc = void (*)(LogLevel, CString /* message */);
|
||||
|
||||
using LibrarySettings = CStrings *;
|
||||
|
||||
using LibraryNewFunc = LibraryContext (*)(LibrarySettings, LibraryLoggerFunc);
|
||||
using LibraryCloneFunc = LibraryContext (*)(LibraryContext);
|
||||
using LibraryDeleteFunc = void (*)(LibraryContext);
|
||||
|
||||
using LibraryData = void *;
|
||||
using LibraryDataNewFunc = LibraryData (*)(LibraryContext);
|
||||
using LibraryDataDeleteFunc = void (*)(LibraryContext, LibraryData);
|
||||
|
||||
/// Can be safely casted into const Table * with static_cast<const ClickHouseLibrary::Table *>
|
||||
using RawClickHouseLibraryTable = void *;
|
||||
using RequestedColumnsNames = CStrings *;
|
||||
|
||||
using LibraryLoadAllFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames);
|
||||
|
||||
using RequestedIds = const VectorUInt64 *;
|
||||
using LibraryLoadIdsFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedColumnsNames, RequestedIds);
|
||||
|
||||
using RequestedKeys = Table *;
|
||||
/// There are no requested column names for load keys func
|
||||
using LibraryLoadKeysFunc = RawClickHouseLibraryTable (*)(LibraryData, LibrarySettings, RequestedKeys);
|
||||
|
||||
using LibraryIsModifiedFunc = bool (*)(LibraryContext, LibrarySettings);
|
||||
using LibrarySupportsSelectiveLoadFunc = bool (*)(LibraryContext, LibrarySettings);
|
||||
|
||||
}
|
@ -1,214 +0,0 @@
|
||||
#include "SharedLibraryHandler.h"
|
||||
|
||||
#include <base/scope_guard.h>
|
||||
#include <base/bit_cast.h>
|
||||
#include <base/find_symbols.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int EXTERNAL_LIBRARY_ERROR;
|
||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
|
||||
SharedLibraryHandler::SharedLibraryHandler(
|
||||
const std::string & library_path_,
|
||||
const std::vector<std::string> & library_settings,
|
||||
const Block & sample_block_,
|
||||
const std::vector<std::string> & attributes_names_)
|
||||
: library_path(library_path_)
|
||||
, sample_block(sample_block_)
|
||||
, attributes_names(attributes_names_)
|
||||
{
|
||||
library = std::make_shared<SharedLibrary>(library_path, RTLD_LAZY);
|
||||
settings_holder = std::make_shared<CStringsHolder>(CStringsHolder(library_settings));
|
||||
|
||||
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
|
||||
|
||||
if (lib_new)
|
||||
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
|
||||
else
|
||||
throw Exception("Method libNew failed", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
}
|
||||
|
||||
|
||||
SharedLibraryHandler::SharedLibraryHandler(const SharedLibraryHandler & other)
|
||||
: library_path{other.library_path}
|
||||
, sample_block{other.sample_block}
|
||||
, attributes_names{other.attributes_names}
|
||||
, library{other.library}
|
||||
, settings_holder{other.settings_holder}
|
||||
{
|
||||
|
||||
auto lib_clone = library->tryGet<ClickHouseLibrary::LibraryCloneFunc>(ClickHouseLibrary::LIBRARY_CLONE_FUNC_NAME);
|
||||
|
||||
if (lib_clone)
|
||||
{
|
||||
lib_data = lib_clone(other.lib_data);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto lib_new = library->tryGet<ClickHouseLibrary::LibraryNewFunc>(ClickHouseLibrary::LIBRARY_CREATE_NEW_FUNC_NAME);
|
||||
|
||||
if (lib_new)
|
||||
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SharedLibraryHandler::~SharedLibraryHandler()
|
||||
{
|
||||
auto lib_delete = library->tryGet<ClickHouseLibrary::LibraryDeleteFunc>(ClickHouseLibrary::LIBRARY_DELETE_FUNC_NAME);
|
||||
|
||||
if (lib_delete)
|
||||
lib_delete(lib_data);
|
||||
}
|
||||
|
||||
|
||||
bool SharedLibraryHandler::isModified()
|
||||
{
|
||||
auto func_is_modified = library->tryGet<ClickHouseLibrary::LibraryIsModifiedFunc>(ClickHouseLibrary::LIBRARY_IS_MODIFIED_FUNC_NAME);
|
||||
|
||||
if (func_is_modified)
|
||||
return func_is_modified(lib_data, &settings_holder->strings);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool SharedLibraryHandler::supportsSelectiveLoad()
|
||||
{
|
||||
auto func_supports_selective_load = library->tryGet<ClickHouseLibrary::LibrarySupportsSelectiveLoadFunc>(ClickHouseLibrary::LIBRARY_SUPPORTS_SELECTIVE_LOAD_FUNC_NAME);
|
||||
|
||||
if (func_supports_selective_load)
|
||||
return func_supports_selective_load(lib_data, &settings_holder->strings);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Block SharedLibraryHandler::loadAll()
|
||||
{
|
||||
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
|
||||
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
|
||||
for (size_t i = 0; i < attributes_names.size(); ++i)
|
||||
columns.data[i] = attributes_names[i].c_str();
|
||||
|
||||
auto load_all_func = library->get<ClickHouseLibrary::LibraryLoadAllFunc>(ClickHouseLibrary::LIBRARY_LOAD_ALL_FUNC_NAME);
|
||||
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ClickHouseLibrary::RawClickHouseLibraryTable data = load_all_func(data_ptr, &settings_holder->strings, &columns);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block SharedLibraryHandler::loadIds(const std::vector<uint64_t> & ids)
|
||||
{
|
||||
const ClickHouseLibrary::VectorUInt64 ids_data{bit_cast<decltype(ClickHouseLibrary::VectorUInt64::data)>(ids.data()), ids.size()};
|
||||
|
||||
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(attributes_names.size());
|
||||
ClickHouseLibrary::CStrings columns_pass{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), attributes_names.size()};
|
||||
|
||||
auto load_ids_func = library->get<ClickHouseLibrary::LibraryLoadIdsFunc>(ClickHouseLibrary::LIBRARY_LOAD_IDS_FUNC_NAME);
|
||||
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ClickHouseLibrary::RawClickHouseLibraryTable data = load_ids_func(data_ptr, &settings_holder->strings, &columns_pass, &ids_data);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block SharedLibraryHandler::loadKeys(const Columns & key_columns)
|
||||
{
|
||||
auto holder = std::make_unique<ClickHouseLibrary::Row[]>(key_columns.size());
|
||||
std::vector<std::unique_ptr<ClickHouseLibrary::Field[]>> column_data_holders;
|
||||
|
||||
for (size_t i = 0; i < key_columns.size(); ++i)
|
||||
{
|
||||
auto cell_holder = std::make_unique<ClickHouseLibrary::Field[]>(key_columns[i]->size());
|
||||
|
||||
for (size_t j = 0; j < key_columns[i]->size(); ++j)
|
||||
{
|
||||
auto data_ref = key_columns[i]->getDataAt(j);
|
||||
|
||||
cell_holder[j] = ClickHouseLibrary::Field{
|
||||
.data = static_cast<const void *>(data_ref.data),
|
||||
.size = data_ref.size};
|
||||
}
|
||||
|
||||
holder[i] = ClickHouseLibrary::Row{
|
||||
.data = static_cast<ClickHouseLibrary::Field *>(cell_holder.get()),
|
||||
.size = key_columns[i]->size()};
|
||||
|
||||
column_data_holders.push_back(std::move(cell_holder));
|
||||
}
|
||||
|
||||
ClickHouseLibrary::Table request_cols{
|
||||
.data = static_cast<ClickHouseLibrary::Row *>(holder.get()),
|
||||
.size = key_columns.size()};
|
||||
|
||||
auto load_keys_func = library->get<ClickHouseLibrary::LibraryLoadKeysFunc>(ClickHouseLibrary::LIBRARY_LOAD_KEYS_FUNC_NAME);
|
||||
auto data_new_func = library->get<ClickHouseLibrary::LibraryDataNewFunc>(ClickHouseLibrary::LIBRARY_DATA_NEW_FUNC_NAME);
|
||||
auto data_delete_func = library->get<ClickHouseLibrary::LibraryDataDeleteFunc>(ClickHouseLibrary::LIBRARY_DATA_DELETE_FUNC_NAME);
|
||||
|
||||
ClickHouseLibrary::LibraryData data_ptr = data_new_func(lib_data);
|
||||
SCOPE_EXIT(data_delete_func(lib_data, data_ptr));
|
||||
|
||||
ClickHouseLibrary::RawClickHouseLibraryTable data = load_keys_func(data_ptr, &settings_holder->strings, &request_cols);
|
||||
return dataToBlock(data);
|
||||
}
|
||||
|
||||
|
||||
Block SharedLibraryHandler::dataToBlock(const ClickHouseLibrary::RawClickHouseLibraryTable data)
|
||||
{
|
||||
if (!data)
|
||||
throw Exception("LibraryDictionarySource: No data returned", ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
|
||||
const auto * columns_received = static_cast<const ClickHouseLibrary::Table *>(data);
|
||||
if (columns_received->error_code)
|
||||
throw Exception(
|
||||
"LibraryDictionarySource: Returned error: " + std::to_string(columns_received->error_code) + " " + (columns_received->error_string ? columns_received->error_string : ""),
|
||||
ErrorCodes::EXTERNAL_LIBRARY_ERROR);
|
||||
|
||||
MutableColumns columns = sample_block.cloneEmptyColumns();
|
||||
|
||||
for (size_t col_n = 0; col_n < columns_received->size; ++col_n)
|
||||
{
|
||||
if (columns.size() != columns_received->data[col_n].size)
|
||||
throw Exception(
|
||||
"LibraryDictionarySource: Returned unexpected number of columns: " + std::to_string(columns_received->data[col_n].size) + ", must be " + std::to_string(columns.size()),
|
||||
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
for (size_t row_n = 0; row_n < columns_received->data[col_n].size; ++row_n)
|
||||
{
|
||||
const auto & field = columns_received->data[col_n].data[row_n];
|
||||
if (!field.data)
|
||||
{
|
||||
/// sample_block contains null_value (from config) inside corresponding column
|
||||
const auto & col = sample_block.getByPosition(row_n);
|
||||
columns[row_n]->insertFrom(*(col.column), 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & size = field.size;
|
||||
columns[row_n]->insertData(static_cast<const char *>(field.data), size);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return sample_block.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
#include "SharedLibraryHandlerFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto library_handler = library_handlers.find(dictionary_id);
|
||||
|
||||
if (library_handler != library_handlers.end())
|
||||
return library_handler->second;
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
void SharedLibraryHandlerFactory::create(
|
||||
const std::string & dictionary_id,
|
||||
const std::string & library_path,
|
||||
const std::vector<std::string> & library_settings,
|
||||
const Block & sample_block,
|
||||
const std::vector<std::string> & attributes_names)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!library_handlers.contains(dictionary_id))
|
||||
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
|
||||
else
|
||||
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto from_library_handler = library_handlers.find(from_dictionary_id);
|
||||
|
||||
if (from_library_handler == library_handlers.end())
|
||||
return false;
|
||||
|
||||
/// libClone method will be called in copy constructor
|
||||
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
/// libDelete is called in destructor.
|
||||
return library_handlers.erase(dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
SharedLibraryHandlerFactory & SharedLibraryHandlerFactory::instance()
|
||||
{
|
||||
static SharedLibraryHandlerFactory ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
@ -1,3 +1,2 @@
|
||||
int mainEntryClickHouseLibraryBridge(int argc, char ** argv);
|
||||
int main(int argc_, char ** argv_) { return mainEntryClickHouseLibraryBridge(argc_, argv_); }
|
||||
|
||||
|
@ -2,17 +2,17 @@ include(${ClickHouse_SOURCE_DIR}/cmake/split_debug_symbols.cmake)
|
||||
|
||||
set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
|
||||
ColumnInfoHandler.cpp
|
||||
getIdentifierQuote.cpp
|
||||
HandlerFactory.cpp
|
||||
IdentifierQuoteHandler.cpp
|
||||
MainHandler.cpp
|
||||
ODBCBlockInputStream.cpp
|
||||
ODBCBlockOutputStream.cpp
|
||||
ODBCBridge.cpp
|
||||
ODBCHandlerFactory.cpp
|
||||
PingHandler.cpp
|
||||
SchemaAllowedHandler.cpp
|
||||
validateODBCConnectionString.cpp
|
||||
getIdentifierQuote.cpp
|
||||
odbc-bridge.cpp
|
||||
validateODBCConnectionString.cpp
|
||||
)
|
||||
|
||||
if (OS_LINUX)
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include "ODBCBridge.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wmissing-declarations"
|
||||
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
|
||||
{
|
||||
DB::ODBCBridge app;
|
||||
@ -15,3 +14,18 @@ int mainEntryClickHouseODBCBridge(int argc, char ** argv)
|
||||
return code ? code : 1;
|
||||
}
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
std::string ODBCBridge::bridgeName() const
|
||||
{
|
||||
return "ODBCBridge";
|
||||
}
|
||||
|
||||
ODBCBridge::HandlerFactoryPtr ODBCBridge::getHandlerFactoryPtr(ContextPtr context) const
|
||||
{
|
||||
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Poco/Logger.h>
|
||||
#include <Bridge/IBridge.h>
|
||||
#include "HandlerFactory.h"
|
||||
#include "ODBCHandlerFactory.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,14 +13,7 @@ class ODBCBridge : public IBridge
|
||||
{
|
||||
|
||||
protected:
|
||||
std::string bridgeName() const override
|
||||
{
|
||||
return "ODBCBridge";
|
||||
}
|
||||
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override
|
||||
{
|
||||
return std::make_shared<ODBCBridgeHandlerFactory>("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context);
|
||||
}
|
||||
std::string bridgeName() const override;
|
||||
HandlerFactoryPtr getHandlerFactoryPtr(ContextPtr context) const override;
|
||||
};
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include "HandlerFactory.h"
|
||||
#include "ODBCHandlerFactory.h"
|
||||
#include "PingHandler.h"
|
||||
#include "ColumnInfoHandler.h"
|
||||
#include <Common/config.h>
|
||||
@ -9,6 +9,14 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ODBCBridgeHandlerFactory::ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> ODBCBridgeHandlerFactory::createRequestHandler(const HTTPServerRequest & request)
|
||||
{
|
||||
Poco::URI uri{request.getURI()};
|
@ -17,13 +17,7 @@ namespace DB
|
||||
class ODBCBridgeHandlerFactory : public HTTPRequestHandlerFactory, WithContext
|
||||
{
|
||||
public:
|
||||
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get(name_))
|
||||
, name(name_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
{
|
||||
}
|
||||
ODBCBridgeHandlerFactory(const std::string & name_, size_t keep_alive_timeout_, ContextPtr context_);
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include "LibraryBridgeHelper.h"
|
||||
#include "ExternalDictionaryLibraryBridgeHelper.h"
|
||||
|
||||
#include <Formats/formatBlock.h>
|
||||
#include <Dictionaries/DictionarySourceHelpers.h>
|
||||
@ -26,26 +26,43 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
LibraryBridgeHelper::LibraryBridgeHelper(
|
||||
ExternalDictionaryLibraryBridgeHelper::ExternalDictionaryLibraryBridgeHelper(
|
||||
ContextPtr context_,
|
||||
const Block & sample_block_,
|
||||
const Field & dictionary_id_,
|
||||
const LibraryInitData & library_data_)
|
||||
: IBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get("LibraryBridgeHelper"))
|
||||
, log(&Poco::Logger::get("ExternalDictionaryLibraryBridgeHelper"))
|
||||
, sample_block(sample_block_)
|
||||
, config(context_->getConfigRef())
|
||||
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
|
||||
, library_data(library_data_)
|
||||
, dictionary_id(dictionary_id_)
|
||||
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
|
||||
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
|
||||
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
|
||||
{
|
||||
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
|
||||
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
|
||||
}
|
||||
|
||||
|
||||
Poco::URI LibraryBridgeHelper::createRequestURI(const String & method) const
|
||||
Poco::URI ExternalDictionaryLibraryBridgeHelper::getPingURI() const
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(PING_HANDLER);
|
||||
uri.addQueryParameter("dictionary_id", toString(dictionary_id));
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
Poco::URI ExternalDictionaryLibraryBridgeHelper::getMainURI() const
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(MAIN_HANDLER);
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
Poco::URI ExternalDictionaryLibraryBridgeHelper::createRequestURI(const String & method) const
|
||||
{
|
||||
auto uri = getMainURI();
|
||||
uri.addQueryParameter("dictionary_id", toString(dictionary_id));
|
||||
@ -54,7 +71,7 @@ Poco::URI LibraryBridgeHelper::createRequestURI(const String & method) const
|
||||
}
|
||||
|
||||
|
||||
Poco::URI LibraryBridgeHelper::createBaseURI() const
|
||||
Poco::URI ExternalDictionaryLibraryBridgeHelper::createBaseURI() const
|
||||
{
|
||||
Poco::URI uri;
|
||||
uri.setHost(bridge_host);
|
||||
@ -64,18 +81,18 @@ Poco::URI LibraryBridgeHelper::createBaseURI() const
|
||||
}
|
||||
|
||||
|
||||
void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
void ExternalDictionaryLibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
{
|
||||
getContext()->addBridgeCommand(std::move(cmd));
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::bridgeHandShake()
|
||||
bool ExternalDictionaryLibraryBridgeHelper::bridgeHandShake()
|
||||
{
|
||||
String result;
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts, credentials);
|
||||
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts, credentials);
|
||||
readString(result, buf);
|
||||
}
|
||||
catch (...)
|
||||
@ -91,12 +108,12 @@ bool LibraryBridgeHelper::bridgeHandShake()
|
||||
* 2. Bridge crashed or restarted for some reason while server did not.
|
||||
**/
|
||||
if (result.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.", result);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check that bridge and server have the same version.", result);
|
||||
|
||||
UInt8 dictionary_id_exists;
|
||||
auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
|
||||
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check that bridge and server have the same version.",
|
||||
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
|
||||
|
||||
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
|
||||
@ -113,7 +130,7 @@ bool LibraryBridgeHelper::bridgeHandShake()
|
||||
bool reinitialized = false;
|
||||
try
|
||||
{
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
|
||||
reinitialized = executeRequest(uri, getInitLibraryCallback());
|
||||
}
|
||||
catch (...)
|
||||
@ -131,11 +148,11 @@ bool LibraryBridgeHelper::bridgeHandShake()
|
||||
}
|
||||
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCallback() const
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback ExternalDictionaryLibraryBridgeHelper::getInitLibraryCallback() const
|
||||
{
|
||||
/// Sample block must contain null values
|
||||
WriteBufferFromOwnString out;
|
||||
auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
auto output_format = getContext()->getOutputFormat(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
formatBlock(output_format, sample_block);
|
||||
auto block_string = out.str();
|
||||
|
||||
@ -150,19 +167,19 @@ ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCa
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::initLibrary()
|
||||
bool ExternalDictionaryLibraryBridgeHelper::initLibrary()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LIB_NEW_METHOD);
|
||||
library_initialized = executeRequest(uri, getInitLibraryCallback());
|
||||
return library_initialized;
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
|
||||
bool ExternalDictionaryLibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_CLONE_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LIB_CLONE_METHOD);
|
||||
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
|
||||
/// We also pass initialization settings in order to create a library handler
|
||||
/// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
|
||||
@ -171,70 +188,70 @@ bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::removeLibrary()
|
||||
bool ExternalDictionaryLibraryBridgeHelper::removeLibrary()
|
||||
{
|
||||
/// Do not force bridge restart if it is not running in case of removeLibrary
|
||||
/// because in this case after restart it will not have this dictionaty id in memory anyway.
|
||||
if (bridgeHandShake())
|
||||
{
|
||||
auto uri = createRequestURI(LIB_DELETE_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LIB_DELETE_METHOD);
|
||||
return executeRequest(uri);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::isModified()
|
||||
bool ExternalDictionaryLibraryBridgeHelper::isModified()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(IS_MODIFIED_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_IS_MODIFIED_METHOD);
|
||||
return executeRequest(uri);
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::supportsSelectiveLoad()
|
||||
bool ExternalDictionaryLibraryBridgeHelper::supportsSelectiveLoad()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(SUPPORTS_SELECTIVE_LOAD_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_SUPPORTS_SELECTIVE_LOAD_METHOD);
|
||||
return executeRequest(uri);
|
||||
}
|
||||
|
||||
|
||||
QueryPipeline LibraryBridgeHelper::loadAll()
|
||||
QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadAll()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LOAD_ALL_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LOAD_ALL_METHOD);
|
||||
return QueryPipeline(loadBase(uri));
|
||||
}
|
||||
|
||||
|
||||
QueryPipeline LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
|
||||
QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LOAD_IDS_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LOAD_IDS_METHOD);
|
||||
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
|
||||
auto ids_string = getDictIdsString(ids);
|
||||
return QueryPipeline(loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; }));
|
||||
}
|
||||
|
||||
|
||||
QueryPipeline LibraryBridgeHelper::loadKeys(const Block & requested_block)
|
||||
QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadKeys(const Block & requested_block)
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LOAD_KEYS_METHOD);
|
||||
auto uri = createRequestURI(EXT_DICT_LOAD_KEYS_METHOD);
|
||||
/// Sample block to parse block from callback
|
||||
uri.addQueryParameter("requested_block_sample", requested_block.getNamesAndTypesList().toString());
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = [requested_block, this](std::ostream & os)
|
||||
{
|
||||
WriteBufferFromOStream out_buffer(os);
|
||||
auto output_format = getContext()->getOutputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, requested_block.cloneEmpty());
|
||||
auto output_format = getContext()->getOutputFormat(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, out_buffer, requested_block.cloneEmpty());
|
||||
formatBlock(output_format, requested_block);
|
||||
};
|
||||
return QueryPipeline(loadBase(uri, out_stream_callback));
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
|
||||
bool ExternalDictionaryLibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri,
|
||||
@ -248,7 +265,7 @@ bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferF
|
||||
}
|
||||
|
||||
|
||||
QueryPipeline LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
|
||||
QueryPipeline ExternalDictionaryLibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
|
||||
{
|
||||
auto read_buf_ptr = std::make_unique<ReadWriteBufferFromHTTP>(
|
||||
uri,
|
||||
@ -261,13 +278,13 @@ QueryPipeline LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBuff
|
||||
getContext()->getReadSettings(),
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
|
||||
|
||||
auto source = FormatFactory::instance().getInput(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
|
||||
auto source = FormatFactory::instance().getInput(ExternalDictionaryLibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, getContext(), DEFAULT_BLOCK_SIZE);
|
||||
source->addBuffer(std::move(read_buf_ptr));
|
||||
return QueryPipeline(std::move(source));
|
||||
}
|
||||
|
||||
|
||||
String LibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
|
||||
String ExternalDictionaryLibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
writeVectorBinary(ids, out);
|
@ -14,7 +14,7 @@ namespace DB
|
||||
|
||||
class Pipe;
|
||||
|
||||
class LibraryBridgeHelper : public IBridgeHelper
|
||||
class ExternalDictionaryLibraryBridgeHelper : public IBridgeHelper
|
||||
{
|
||||
|
||||
public:
|
||||
@ -26,8 +26,10 @@ public:
|
||||
};
|
||||
|
||||
static constexpr inline size_t DEFAULT_PORT = 9012;
|
||||
static constexpr inline auto PING_HANDLER = "/extdict_ping";
|
||||
static constexpr inline auto MAIN_HANDLER = "/extdict_request";
|
||||
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
|
||||
ExternalDictionaryLibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
|
||||
|
||||
bool initLibrary();
|
||||
|
||||
@ -45,13 +47,13 @@ public:
|
||||
|
||||
QueryPipeline loadKeys(const Block & requested_block);
|
||||
|
||||
QueryPipeline loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
|
||||
|
||||
LibraryInitData getLibraryData() const { return library_data; }
|
||||
|
||||
protected:
|
||||
Poco::URI getPingURI() const override;
|
||||
|
||||
Poco::URI getMainURI() const override;
|
||||
|
||||
bool bridgeHandShake() override;
|
||||
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
|
||||
@ -74,18 +76,21 @@ protected:
|
||||
|
||||
Poco::URI createBaseURI() const override;
|
||||
|
||||
QueryPipeline loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
|
||||
|
||||
private:
|
||||
static constexpr inline auto LIB_NEW_METHOD = "libNew";
|
||||
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
|
||||
static constexpr inline auto LIB_DELETE_METHOD = "libDelete";
|
||||
static constexpr inline auto LOAD_ALL_METHOD = "loadAll";
|
||||
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
|
||||
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
|
||||
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
|
||||
static constexpr inline auto PING = "ping";
|
||||
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
|
||||
static constexpr inline auto EXT_DICT_LIB_NEW_METHOD = "extDict_libNew";
|
||||
static constexpr inline auto EXT_DICT_LIB_CLONE_METHOD = "extDict_libClone";
|
||||
static constexpr inline auto EXT_DICT_LIB_DELETE_METHOD = "extDict_libDelete";
|
||||
static constexpr inline auto EXT_DICT_LOAD_ALL_METHOD = "extDict_loadAll";
|
||||
static constexpr inline auto EXT_DICT_LOAD_IDS_METHOD = "extDict_loadIds";
|
||||
static constexpr inline auto EXT_DICT_LOAD_KEYS_METHOD = "extDict_loadKeys";
|
||||
static constexpr inline auto EXT_DICT_IS_MODIFIED_METHOD = "extDict_isModified";
|
||||
static constexpr inline auto EXT_DICT_SUPPORTS_SELECTIVE_LOAD_METHOD = "extDict_supportsSelectiveLoad";
|
||||
|
||||
Poco::URI createRequestURI(const String & method) const;
|
||||
|
@ -18,22 +18,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
Poco::URI IBridgeHelper::getMainURI() const
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(MAIN_HANDLER);
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
Poco::URI IBridgeHelper::getPingURI() const
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(PING_HANDLER);
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
void IBridgeHelper::startBridgeSync()
|
||||
{
|
||||
if (!bridgeHandShake())
|
||||
|
@ -19,8 +19,6 @@ class IBridgeHelper: protected WithContext
|
||||
|
||||
public:
|
||||
static constexpr inline auto DEFAULT_HOST = "127.0.0.1";
|
||||
static constexpr inline auto PING_HANDLER = "/ping";
|
||||
static constexpr inline auto MAIN_HANDLER = "/";
|
||||
static constexpr inline auto DEFAULT_FORMAT = "RowBinary";
|
||||
static constexpr inline auto PING_OK_ANSWER = "Ok.";
|
||||
|
||||
@ -31,9 +29,9 @@ public:
|
||||
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
Poco::URI getMainURI() const;
|
||||
virtual Poco::URI getMainURI() const = 0;
|
||||
|
||||
Poco::URI getPingURI() const;
|
||||
virtual Poco::URI getPingURI() const = 0;
|
||||
|
||||
void startBridgeSync();
|
||||
|
||||
@ -41,7 +39,6 @@ protected:
|
||||
/// Check bridge is running. Can also check something else in the mean time.
|
||||
virtual bool bridgeHandShake() = 0;
|
||||
|
||||
/// clickhouse-odbc-bridge, clickhouse-library-bridge
|
||||
virtual String serviceAlias() const = 0;
|
||||
|
||||
virtual String serviceFileName() const = 0;
|
||||
|
@ -53,6 +53,8 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
|
||||
|
||||
public:
|
||||
static constexpr inline auto DEFAULT_PORT = BridgeHelperMixin::DEFAULT_PORT;
|
||||
static constexpr inline auto PING_HANDLER = "/ping";
|
||||
static constexpr inline auto MAIN_HANDLER = "/";
|
||||
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
|
||||
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
|
||||
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
|
||||
@ -72,6 +74,22 @@ public:
|
||||
}
|
||||
|
||||
protected:
|
||||
Poco::URI getPingURI() const override
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(PING_HANDLER);
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
Poco::URI getMainURI() const override
|
||||
{
|
||||
auto uri = createBaseURI();
|
||||
uri.setPath(MAIN_HANDLER);
|
||||
return uri;
|
||||
}
|
||||
|
||||
|
||||
bool bridgeHandShake() override
|
||||
{
|
||||
try
|
||||
|
@ -12,8 +12,6 @@
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Dictionaries/registerDictionaries.h>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -44,19 +42,21 @@ LibraryDictionarySource::LibraryDictionarySource(
|
||||
if (created_from_ddl && !fileOrSymlinkPathStartsWith(path, dictionaries_lib_path))
|
||||
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, dictionaries_lib_path);
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
if (!fs::exists(path))
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);
|
||||
|
||||
description.init(sample_block);
|
||||
|
||||
LibraryBridgeHelper::LibraryInitData library_data
|
||||
ExternalDictionaryLibraryBridgeHelper::LibraryInitData library_data
|
||||
{
|
||||
.library_path = path,
|
||||
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
|
||||
.dict_attributes = getDictAttributesString()
|
||||
};
|
||||
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
|
||||
bridge_helper = std::make_shared<ExternalDictionaryLibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
|
||||
|
||||
if (!bridge_helper->initLibrary())
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
|
||||
@ -87,7 +87,7 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
|
||||
, context(other.context)
|
||||
, description{other.description}
|
||||
{
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
|
||||
bridge_helper = std::make_shared<ExternalDictionaryLibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
|
||||
if (!bridge_helper->cloneLibrary(other.dictionary_id))
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to clone library");
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <BridgeHelper/LibraryBridgeHelper.h>
|
||||
#include <BridgeHelper/ExternalDictionaryLibraryBridgeHelper.h>
|
||||
#include <Common/LocalDateTime.h>
|
||||
#include <Core/UUID.h>
|
||||
#include "DictionaryStructure.h"
|
||||
@ -28,7 +28,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
class CStringsHolder;
|
||||
using LibraryBridgeHelperPtr = std::shared_ptr<LibraryBridgeHelper>;
|
||||
using ExternalDictionaryLibraryBridgeHelperPtr = std::shared_ptr<ExternalDictionaryLibraryBridgeHelper>;
|
||||
|
||||
class LibraryDictionarySource final : public IDictionarySource
|
||||
{
|
||||
@ -85,7 +85,7 @@ private:
|
||||
Block sample_block;
|
||||
ContextPtr context;
|
||||
|
||||
LibraryBridgeHelperPtr bridge_helper;
|
||||
ExternalDictionaryLibraryBridgeHelperPtr bridge_helper;
|
||||
ExternalResultDescription description;
|
||||
};
|
||||
|
||||
|
@ -31,6 +31,7 @@ sudo -H pip install \
|
||||
kafka-python \
|
||||
kazoo \
|
||||
minio \
|
||||
lz4 \
|
||||
protobuf \
|
||||
psycopg2-binary \
|
||||
pymongo \
|
||||
@ -147,7 +148,7 @@ will automagically detect the types of variables and only the small diff of two
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
If tests failing for misterious reasons, this may help:
|
||||
If tests failing for mysterious reasons, this may help:
|
||||
|
||||
```
|
||||
sudo service docker stop
|
||||
|
@ -345,7 +345,7 @@ if __name__ == "__main__":
|
||||
f"docker volume create {VOLUME_NAME}_volume", shell=True
|
||||
)
|
||||
except Exception as ex:
|
||||
print("Volume creationg failed, probably it already exists, exception", ex)
|
||||
print("Volume creation failed, probably it already exists, exception", ex)
|
||||
# TODO: this part cleans out stale volumes produced by container name
|
||||
# randomizer, we should remove it after Sep 2022
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user