This commit is contained in:
kssenii 2021-03-05 15:37:43 +00:00
parent e0cda1033a
commit 2c080da51b
8 changed files with 169 additions and 91 deletions

View File

@ -2,6 +2,7 @@
#include <Poco/Net/HTTPServerRequest.h> #include <Poco/Net/HTTPServerRequest.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>
#include "Handlers.h" #include "Handlers.h"
@ -19,20 +20,56 @@ namespace DB
{ {
/// Remove '/' in the beginning. /// Remove '/' in the beginning.
auto dictionary_id = uri.getPath().substr(1); auto dictionary_id = uri.getPath().substr(1);
/// Keep a map: dictionary_id -> SharedLibraryHandler.
auto library_handler = library_handlers.find(dictionary_id); auto library_handler = library_handlers.find(dictionary_id);
if (library_handler == library_handlers.end()) HTMLForm params(request);
{ params.read(request.getStream());
auto library_handler_ptr = std::make_shared<SharedLibraryHandler>(dictionary_id); std::string method = params.get("method");
library_handlers[dictionary_id] = library_handler_ptr;
if (!params.has("method"))
return std::make_unique<LibraryErrorResponseHandler>("No 'method' in request URL");
/// If such handler exists, then current method can be: loadAll, loadIds, loadKeys, isModified, supportsSelectiveLoad.
if (library_handler != library_handlers.end())
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler->second);
/// If there is no such dictionary_id in map, then current method is either libNew or libClone or libDelete.
if (method == "libNew")
{
auto library_handler_ptr = std::make_shared<SharedLibraryHandler>();
library_handlers[dictionary_id] = library_handler_ptr;
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler_ptr); return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler_ptr);
} }
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler->second); if (method == "libClone")
{
if (!params.has("other_dictionary_id"))
return std::make_unique<LibraryErrorResponseHandler>("No 'other_dictionary_id' in request URL");
std::string other_dictionary_id = params.get("other_dictionary_id");
LOG_INFO(log, "libClone from dictionaryID {} to {}", other_dictionary_id, dictionary_id);
auto other_library_handler = library_handlers.find(other_dictionary_id);
if (other_library_handler != library_handlers.end())
{
/// libClone method for lib_data will be called in copy constructor.
auto other_library_handler_ptr = other_library_handler->second;
auto library_handler_ptr = std::make_shared<SharedLibraryHandler>(*other_library_handler_ptr);
library_handlers[dictionary_id] = library_handler_ptr;
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, context, library_handler_ptr);
}
/// cloneLibrary is called in copy constructor for LibraryDictionarySource.
/// SharedLibraryHandler is removed from map only in LibraryDictionarySource desctructor.
/// Therefore other_library_handler is supposed to always exist at this moment.
return std::make_unique<LibraryErrorResponseHandler>("SharedLibraryHandler for dictionary to clone from does not exist");
}
return std::make_unique<LibraryErrorResponseHandler>("Unknown 'method' in request URL");
} }
return nullptr; return std::make_unique<LibraryErrorResponseHandler>("Unknown request");
} }
} }

View File

@ -41,7 +41,6 @@ namespace
void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{ {
LOG_TRACE(log, "Dictionary ID: {}", library_handler->getDictID());
LOG_TRACE(log, "Request URI: {}", request.getURI()); LOG_TRACE(log, "Request URI: {}", request.getURI());
HTMLForm params(request); HTMLForm params(request);
@ -81,9 +80,9 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
library_handler->libNew(library_path, library_settings); library_handler->libNew(library_path, library_settings);
writeStringBinary("1", out); writeStringBinary("1", out);
} }
else if (method == "libDelete") else if (method == "libClone")
{ {
//library_handler->libDelete(); /// libClone was already called, only need to send the responce.
writeStringBinary("1", out); writeStringBinary("1", out);
} }
else if (method == "isModified") else if (method == "isModified")
@ -203,6 +202,17 @@ void LibraryRequestHandler::processError(HTTPServerResponse & response, const st
} }
void LibraryErrorResponseHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
{
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
*response.send() << message << std::endl;
LOG_ERROR(log, message);
}
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response) void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
{ {
try try

View File

@ -10,8 +10,7 @@ namespace DB
{ {
/** Handler for requests to Library Dictionary Source, returns response in RowBinary format /// Handler for requests to Library Dictionary Source, returns response in RowBinary format
*/
class LibraryRequestHandler : public HTTPRequestHandler class LibraryRequestHandler : public HTTPRequestHandler
{ {
public: public:
@ -42,7 +41,25 @@ private:
}; };
/// Simple ping handler, answers "Ok." to GET request class LibraryErrorResponseHandler : public HTTPRequestHandler
{
public:
explicit LibraryErrorResponseHandler(std::string message_)
: log(&Poco::Logger::get("LibraryErrorResponseHandler"))
, message(message_)
{
}
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
private:
Poco::Logger * log;
const std::string message;
};
/// Handler to send error responce.
class PingHandler : public HTTPRequestHandler class PingHandler : public HTTPRequestHandler
{ {
public: public:

View File

@ -8,10 +8,15 @@
namespace DB namespace DB
{ {
SharedLibraryHandler::SharedLibraryHandler(const std::string & dictionary_id_) SharedLibraryHandler::SharedLibraryHandler(const SharedLibraryHandler & other)
: log(&Poco::Logger::get("SharedLibraryHandler")) : library_path{other.library_path}
, dictionary_id(dictionary_id_) , library{other.library}
, settings_holder{other.settings_holder}
{ {
if (auto lib_clone = library->tryGet<decltype(lib_data) (*)(decltype(other.lib_data))>("ClickHouseDictionary_v3_libClone"))
lib_data = lib_clone(other.lib_data);
else if (auto lib_new = library->tryGet<decltype(lib_data) (*)(decltype(&settings_holder->strings), decltype(&ClickHouseLibrary::log))>("ClickHouseDictionary_v3_libNew"))
lib_data = lib_new(&settings_holder->strings, ClickHouseLibrary::log);
} }
@ -65,19 +70,11 @@ bool SharedLibraryHandler::supportsSelectiveLoad()
return true; return true;
} }
//void SharedLibraryHandler::libCloneOrNew()
//{
// if (auto lib_clone = library->tryGet<decltype(lib_data) (*)(decltype(other.lib_data))>("ClickHouseDictionary_v3_libClone"))
// lib_data = lib_clone(other.lib_data);
// else if (auto lib_new = library->tryGet<decltype(lib_data) (*)(decltype(&settings->strings), decltype(&ClickHouseLibrary::log))>("ClickHouseDictionary_v3_libNew"))
// lib_data = lib_new(&settings->strings, ClickHouseLibrary::log);
//}
BlockInputStreamPtr SharedLibraryHandler::loadAll(const std::string & attributes_string, const Block & sample_block) BlockInputStreamPtr SharedLibraryHandler::loadAll(const std::string & attributes_string, const Block & sample_block)
{ {
std::vector<std::string> dict_attributes; std::vector<std::string> dict_attributes;
boost::split(dict_attributes, attributes_string, [](char c) { return c == ','; }); boost::split(dict_attributes, attributes_string, [](char c) { return c == ' '; });
auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_attributes.size()); auto columns_holder = std::make_unique<ClickHouseLibrary::CString[]>(dict_attributes.size());
ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), dict_attributes.size()}; ClickHouseLibrary::CStrings columns{static_cast<decltype(ClickHouseLibrary::CStrings::data)>(columns_holder.get()), dict_attributes.size()};
@ -105,7 +102,7 @@ BlockInputStreamPtr SharedLibraryHandler::loadAll(const std::string & attributes
BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::string & attributes_string, const std::string & ids_string, const Block & sample_block) BlockInputStreamPtr SharedLibraryHandler::loadIds(const std::string & attributes_string, const std::string & ids_string, const Block & sample_block)
{ {
std::vector<std::string> dict_string_ids; std::vector<std::string> dict_string_ids;
boost::split(dict_string_ids, ids_string, [](char c) { return c == ','; }); boost::split(dict_string_ids, ids_string, [](char c) { return c == ' '; });
std::vector<UInt64> dict_ids; std::vector<UInt64> dict_ids;
for (const auto & id : dict_string_ids) for (const auto & id : dict_string_ids)
dict_ids.push_back(parseFromString<UInt64>(id)); dict_ids.push_back(parseFromString<UInt64>(id));

View File

@ -14,14 +14,12 @@ class SharedLibraryHandler
{ {
public: public:
SharedLibraryHandler(const std::string & dictionary_id_); SharedLibraryHandler() {}
SharedLibraryHandler(const SharedLibraryHandler & other);
~SharedLibraryHandler(); ~SharedLibraryHandler();
const std::string & getDictID() { return dictionary_id; }
//void libDelete();
void libNew(const std::string & path, const std::string & settings); void libNew(const std::string & path, const std::string & settings);
BlockInputStreamPtr loadAll(const std::string & attributes_string, const Block & sample_block); BlockInputStreamPtr loadAll(const std::string & attributes_string, const Block & sample_block);
@ -33,12 +31,10 @@ public:
bool supportsSelectiveLoad(); bool supportsSelectiveLoad();
private: private:
void libDelete();
Block dataToBlock(const Block & sample_block, const void * data); Block dataToBlock(const Block & sample_block, const void * data);
Poco::Logger * log;
std::string dictionary_id;
std::string library_path; std::string library_path;
SharedLibraryPtr library; SharedLibraryPtr library;

View File

@ -77,12 +77,13 @@ bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const st
} }
bool LibraryBridgeHelper::deleteLibrary() bool LibraryBridgeHelper::cloneLibrary(const std::string & other_dictionary_id)
{ {
startLibraryBridgeSync(); startLibraryBridgeSync();
auto uri = getDictionaryURI(); auto uri = getDictionaryURI();
uri.addQueryParameter("method", LIB_DELETE_METHOD); uri.addQueryParameter("method", LIB_CLONE_METHOD);
uri.addQueryParameter("other_dictionary_id", other_dictionary_id);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context)); ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
bool res; bool res;
@ -91,6 +92,21 @@ bool LibraryBridgeHelper::deleteLibrary()
} }
bool LibraryBridgeHelper::removeLibrary()
{
// startLibraryBridgeSync();
//
// auto uri = getDictionaryURI();
// uri.addQueryParameter("method", LIB_DELETE_METHOD);
//
// ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
// bool res;
// readBoolText(res, buf);
// return res;
return true;
}
bool LibraryBridgeHelper::isModified() bool LibraryBridgeHelper::isModified()
{ {
startLibraryBridgeSync(); startLibraryBridgeSync();

View File

@ -27,7 +27,9 @@ public:
bool initLibrary(const std::string & library_path, const std::string librray_settings); bool initLibrary(const std::string & library_path, const std::string librray_settings);
bool deleteLibrary(); bool removeLibrary();
bool cloneLibrary(const std::string & other_dictionary_id);
BlockInputStreamPtr loadAll(const std::string attributes_string, const Block & sample_block); BlockInputStreamPtr loadAll(const std::string attributes_string, const Block & sample_block);
@ -43,6 +45,7 @@ public:
static constexpr inline auto PING_HANDLER = "/ping"; static constexpr inline auto PING_HANDLER = "/ping";
static constexpr inline auto LIB_NEW_METHOD = "libNew"; static constexpr inline auto LIB_NEW_METHOD = "libNew";
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
static constexpr inline auto LIB_DELETE_METHOD = "libDelete"; static constexpr inline auto LIB_DELETE_METHOD = "libDelete";
static constexpr inline auto LOAD_ALL_METHOD = "loadAll"; static constexpr inline auto LOAD_ALL_METHOD = "loadAll";
static constexpr inline auto LOAD_IDS_METHOD = "loadIds"; static constexpr inline auto LOAD_IDS_METHOD = "loadIds";

View File

@ -66,6 +66,7 @@ LibraryDictionarySource::LibraryDictionarySource(
LibraryDictionarySource::~LibraryDictionarySource() LibraryDictionarySource::~LibraryDictionarySource()
{ {
bridge_helper->removeLibrary();
} }
@ -74,64 +75,13 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
, dict_struct{other.dict_struct} , dict_struct{other.dict_struct}
, config_prefix{other.config_prefix} , config_prefix{other.config_prefix}
, path{other.path} , path{other.path}
, dictionary_id{other.dictionary_id} , dictionary_id{createDictID()}
, sample_block{other.sample_block} , sample_block{other.sample_block}
, context(other.context) , context(other.context)
, description{other.description} , description{other.description}
{ {
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, dictionary_id); bridge_helper = std::make_shared<LibraryBridgeHelper>(context, dictionary_id);
} bridge_helper->cloneLibrary(other.dictionary_id);
String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
std::string res;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
if (!res.empty())
res += ' ';
res += key_name + ' ' + config.getString(config_root + "." + key);
}
return res;
}
String LibraryDictionarySource::getDictAttributesString()
{
std::string res;
for (const auto & attr : dict_struct.attributes)
{
if (!res.empty())
res += ',';
res += attr.name;
}
return res;
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
std::string res;
for (const auto & id : ids)
{
if (!res.empty())
res += ',';
res += std::to_string(id);
}
return res;
} }
@ -211,6 +161,58 @@ std::string LibraryDictionarySource::toString() const
} }
String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_root, config_keys);
std::string res;
for (const auto & key : config_keys)
{
std::string key_name = key;
auto bracket_pos = key.find('[');
if (bracket_pos != std::string::npos && bracket_pos > 0)
key_name = key.substr(0, bracket_pos);
if (!res.empty())
res += ' ';
res += key_name + ' ' + config.getString(config_root + "." + key);
}
return res;
}
String LibraryDictionarySource::getDictAttributesString()
{
std::string res;
for (const auto & attr : dict_struct.attributes)
{
if (!res.empty())
res += ' ';
res += attr.name;
}
return res;
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
std::string res;
for (const auto & id : ids)
{
if (!res.empty())
res += ' ';
res += std::to_string(id);
}
return res;
}
void registerDictionarySourceLibrary(DictionarySourceFactory & factory) void registerDictionarySourceLibrary(DictionarySourceFactory & factory)
{ {
auto create_table_source = [=](const DictionaryStructure & dict_struct, auto create_table_source = [=](const DictionaryStructure & dict_struct,