mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Fix bridge-server interaction in case of metadata inconsistency
This commit is contained in:
parent
d9f55a3f31
commit
130253e3b9
@ -12,8 +12,8 @@ namespace DB
|
||||
Poco::URI uri{request.getURI()};
|
||||
LOG_DEBUG(log, "Request URI: {}", uri.toString());
|
||||
|
||||
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
return std::make_unique<PingHandler>(keep_alive_timeout);
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
return std::make_unique<PingHandler>(keep_alive_timeout, getContext());
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());
|
||||
|
@ -19,6 +19,16 @@ namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
void processError(HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
if (!response.sent())
|
||||
*response.send() << message << std::endl;
|
||||
|
||||
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), message);
|
||||
}
|
||||
|
||||
std::shared_ptr<Block> parseColumns(std::string && column_string)
|
||||
{
|
||||
auto sample_block = std::make_shared<Block>();
|
||||
@ -30,13 +40,13 @@ namespace
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
|
||||
{
|
||||
ReadBufferFromString buf(ids_string);
|
||||
std::vector<uint64_t> ids;
|
||||
readVectorBinary(ids, buf);
|
||||
return ids;
|
||||
}
|
||||
// std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
|
||||
// {
|
||||
// ReadBufferFromString buf(ids_string);
|
||||
// std::vector<uint64_t> ids;
|
||||
// readVectorBinary(ids, buf);
|
||||
// return ids;
|
||||
// }
|
||||
|
||||
std::vector<std::string> parseNamesFromBinary(const std::string & names_string)
|
||||
{
|
||||
@ -67,13 +77,36 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
std::string method = params.get("method");
|
||||
std::string dictionary_id = params.get("dictionary_id");
|
||||
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
|
||||
|
||||
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
|
||||
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
|
||||
|
||||
try
|
||||
{
|
||||
if (method == "libNew")
|
||||
bool lib_new = (method == "libNew");
|
||||
if (method == "libClone")
|
||||
{
|
||||
if (!params.has("from_dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'from_dictionary_id' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string from_dictionary_id = params.get("from_dictionary_id");
|
||||
bool cloned = false;
|
||||
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
|
||||
if (cloned)
|
||||
{
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead");
|
||||
lib_new = true;
|
||||
}
|
||||
}
|
||||
if (lib_new)
|
||||
{
|
||||
auto & read_buf = request.getStream();
|
||||
params.read(read_buf);
|
||||
@ -92,6 +125,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
std::string library_path = params.get("library_path");
|
||||
const auto & settings_string = params.get("library_settings");
|
||||
|
||||
LOG_DEBUG(log, "Parsing library settings from binary string");
|
||||
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
|
||||
|
||||
/// Needed for library dictionary
|
||||
@ -102,6 +137,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
const auto & attributes_string = params.get("attributes_names");
|
||||
|
||||
LOG_DEBUG(log, "Parsing attributes names from binary string");
|
||||
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
|
||||
|
||||
/// Needed to parse block from binary string format
|
||||
@ -140,54 +177,71 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "libClone")
|
||||
{
|
||||
if (!params.has("from_dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'from_dictionary_id' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string from_dictionary_id = params.get("from_dictionary_id");
|
||||
LOG_TRACE(log, "Calling libClone from {} to {}", from_dictionary_id, dictionary_id);
|
||||
SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "libDelete")
|
||||
{
|
||||
SharedLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
|
||||
/// Do not throw, a warning is ok.
|
||||
if (!deleted)
|
||||
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
|
||||
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "isModified")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->isModified();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "supportsSelectiveLoad")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->supportsSelectiveLoad();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "loadAll")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadAll();
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
else if (method == "loadIds")
|
||||
{
|
||||
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
|
||||
String ids_string;
|
||||
readString(ids_string, request.getStream());
|
||||
std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
|
||||
|
||||
Strings ids_strs;
|
||||
splitInto<'-'>(ids_strs, ids_string);
|
||||
std::vector<uint64_t> ids;
|
||||
for (const auto & id : ids_strs)
|
||||
ids.push_back(parse<uint64_t>(id));
|
||||
if (ids.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Received no ids");
|
||||
|
||||
// std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadIds(ids);
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
@ -219,8 +273,13 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
auto block = reader->read();
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
auto input = library_handler->loadKeys(block.getColumns());
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
@ -228,8 +287,9 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
catch (...)
|
||||
{
|
||||
auto message = getCurrentExceptionMessage(true);
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
|
||||
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
|
||||
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
|
||||
try
|
||||
{
|
||||
writeStringBinary(message, out);
|
||||
@ -239,8 +299,6 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
try
|
||||
@ -254,24 +312,30 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
|
||||
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
if (!response.sent())
|
||||
*response.send() << message << std::endl;
|
||||
|
||||
LOG_WARNING(log, message);
|
||||
}
|
||||
|
||||
|
||||
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
|
||||
void PingHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
try
|
||||
{
|
||||
LOG_TRACE(log, "Request URI: {}", request.getURI());
|
||||
HTMLForm params(getContext()->getSettingsRef(), request);
|
||||
|
||||
if (!params.has("dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'dictionary_id in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string dictionary_id = params.get("dictionary_id");
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
String res;
|
||||
if (library_handler)
|
||||
res = "dictionary=1";
|
||||
else
|
||||
res = "dictionary=0";
|
||||
|
||||
setResponseDefaultHeaders(response, keep_alive_timeout);
|
||||
const char * data = "Ok.\n";
|
||||
response.sendBuffer(data, strlen(data));
|
||||
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
|
||||
response.sendBuffer(res.data(), res.size());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -22,8 +22,7 @@ class LibraryRequestHandler : public HTTPRequestHandler, WithContext
|
||||
public:
|
||||
|
||||
LibraryRequestHandler(
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_)
|
||||
size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
@ -35,18 +34,18 @@ public:
|
||||
private:
|
||||
static constexpr inline auto FORMAT = "RowBinary";
|
||||
|
||||
void processError(HTTPServerResponse & response, const std::string & message);
|
||||
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
};
|
||||
|
||||
|
||||
class PingHandler : public HTTPRequestHandler
|
||||
class PingHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
explicit PingHandler(size_t keep_alive_timeout_)
|
||||
: keep_alive_timeout(keep_alive_timeout_)
|
||||
explicit PingHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
{
|
||||
}
|
||||
|
||||
@ -54,6 +53,8 @@ public:
|
||||
|
||||
private:
|
||||
const size_t keep_alive_timeout;
|
||||
Poco::Logger * log;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dic
|
||||
if (library_handler != library_handlers.end())
|
||||
return library_handler->second;
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found dictionary with id: {}", dictionary_id);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
@ -30,32 +30,32 @@ void SharedLibraryHandlerFactory::create(
|
||||
const std::vector<std::string> & attributes_names)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
library_handlers[dictionary_id] = std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names);
|
||||
if (!library_handlers.count(dictionary_id))
|
||||
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
|
||||
else
|
||||
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
void SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto from_library_handler = library_handlers.find(from_dictionary_id);
|
||||
|
||||
/// This is not supposed to happen as libClone is called from copy constructor of LibraryDictionarySource
|
||||
/// object, and shared library handler of from_dictionary is removed only in its destructor.
|
||||
/// And if for from_dictionary there was no shared library handler, it would have received and exception in
|
||||
/// its constructor, so no libClone would be made from it.
|
||||
if (from_library_handler == library_handlers.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No shared library handler found");
|
||||
return false;
|
||||
|
||||
/// libClone method will be called in copy constructor
|
||||
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
/// libDelete is called in destructor.
|
||||
library_handlers.erase(dictionary_id);
|
||||
return library_handlers.erase(dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,9 +24,9 @@ public:
|
||||
const Block & sample_block,
|
||||
const std::vector<std::string> & attributes_names);
|
||||
|
||||
void clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
|
||||
bool clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
|
||||
|
||||
void remove(const std::string & dictionary_id);
|
||||
bool remove(const std::string & dictionary_id);
|
||||
|
||||
private:
|
||||
/// map: dict_id -> sharedLibraryHandler
|
||||
|
@ -33,21 +33,6 @@ Poco::URI IBridgeHelper::getPingURI() const
|
||||
}
|
||||
|
||||
|
||||
bool IBridgeHelper::checkBridgeIsRunning() const
|
||||
{
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
return checkString(PING_OK_ANSWER, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IBridgeHelper::startBridgeSync() const
|
||||
{
|
||||
if (!checkBridgeIsRunning())
|
||||
|
@ -28,16 +28,18 @@ public:
|
||||
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
|
||||
|
||||
explicit IBridgeHelper(ContextPtr context_) : WithContext(context_) {}
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
void startBridgeSync() const;
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
Poco::URI getMainURI() const;
|
||||
|
||||
Poco::URI getPingURI() const;
|
||||
|
||||
void startBridgeSync() const;
|
||||
|
||||
protected:
|
||||
virtual bool checkBridgeIsRunning() const = 0;
|
||||
|
||||
/// clickhouse-odbc-bridge, clickhouse-library-bridge
|
||||
virtual String serviceAlias() const = 0;
|
||||
|
||||
@ -61,8 +63,6 @@ protected:
|
||||
|
||||
|
||||
private:
|
||||
bool checkBridgeIsRunning() const;
|
||||
|
||||
std::unique_ptr<ShellCommand> startBridgeCommand() const;
|
||||
};
|
||||
|
||||
|
@ -23,12 +23,14 @@ namespace DB
|
||||
LibraryBridgeHelper::LibraryBridgeHelper(
|
||||
ContextPtr context_,
|
||||
const Block & sample_block_,
|
||||
const Field & dictionary_id_)
|
||||
const Field & dictionary_id_,
|
||||
const LibraryInitData & library_data_)
|
||||
: IBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get("LibraryBridgeHelper"))
|
||||
, sample_block(sample_block_)
|
||||
, config(context_->getConfigRef())
|
||||
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
|
||||
, library_data(library_data_)
|
||||
, dictionary_id(dictionary_id_)
|
||||
{
|
||||
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
|
||||
@ -61,26 +63,85 @@ void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names)
|
||||
bool LibraryBridgeHelper::checkBridgeIsRunning() const
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
String result;
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
readString(result, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* When pinging bridge we also pass current dicionary_id. The bridge will check if there is such
|
||||
* dictionary. It is possible that such dictionary_id is not present only in two cases:
|
||||
* 1. It is dictionary source creation and initialization of library handler on bridge side did not happen yet.
|
||||
* 2. Bridge crashed or restarted for some reason while server did not.
|
||||
**/
|
||||
static constexpr auto dictionary_check = "dictionary=";
|
||||
if (result.size() != (std::strlen(dictionary_check) + 1))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.",
|
||||
result, std::strlen(dictionary_check));
|
||||
|
||||
UInt8 dictionary_id_exists;
|
||||
auto parsed = tryParse<UInt8>(dictionary_id_exists, result.substr(std::strlen(dictionary_check)));
|
||||
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
|
||||
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
|
||||
|
||||
if (dictionary_id_exists && !library_initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Library was not initialized, but bridge responded to already have dictionary id: {}", dictionary_id);
|
||||
|
||||
if (!dictionary_id_exists && library_initialized)
|
||||
{
|
||||
LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
|
||||
try
|
||||
{
|
||||
if (!initLibrary(false))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCallback() const
|
||||
{
|
||||
/// Sample block must contain null values
|
||||
WriteBufferFromOwnString out;
|
||||
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
formatBlock(output_stream, sample_block);
|
||||
auto block_string = out.str();
|
||||
|
||||
auto out_stream_callback = [library_path, library_settings, attributes_names, block_string, this](std::ostream & os)
|
||||
return [block_string, this](std::ostream & os)
|
||||
{
|
||||
os << "library_path=" << escapeForFileName(library_path) << "&";
|
||||
os << "library_settings=" << escapeForFileName(library_settings) << "&";
|
||||
os << "attributes_names=" << escapeForFileName(attributes_names) << "&";
|
||||
os << "library_path=" << escapeForFileName(library_data.library_path) << "&";
|
||||
os << "library_settings=" << escapeForFileName(library_data.library_settings) << "&";
|
||||
os << "attributes_names=" << escapeForFileName(library_data.dict_attributes) << "&";
|
||||
os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
|
||||
os << "null_values=" << escapeForFileName(block_string);
|
||||
};
|
||||
return executeRequest(uri, out_stream_callback);
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::initLibrary(bool check_bridge) const
|
||||
{
|
||||
/// Do not check if we call initLibrary from checkBridgeSync.
|
||||
if (check_bridge)
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
return executeRequest(uri, getInitLibraryCallback());
|
||||
}
|
||||
|
||||
|
||||
@ -89,15 +150,20 @@ bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_CLONE_METHOD);
|
||||
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
|
||||
return executeRequest(uri);
|
||||
return executeRequest(uri, getInitLibraryCallback());
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::removeLibrary()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_DELETE_METHOD);
|
||||
return executeRequest(uri);
|
||||
/// Do not force bridge restart if it is not running in case of removeLibrary
|
||||
/// because in this case after restart it will not have this dictionaty id in memory anyway.
|
||||
if (checkBridgeIsRunning())
|
||||
{
|
||||
auto uri = createRequestURI(LIB_DELETE_METHOD);
|
||||
return executeRequest(uri);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -125,11 +191,25 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
|
||||
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string, const std::vector<uint64_t> ids)
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LOAD_IDS_METHOD);
|
||||
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
|
||||
|
||||
uri.addQueryParameter("ids_num", toString(ids.size()));
|
||||
String ids_str;
|
||||
for (const auto & id : ids)
|
||||
{
|
||||
if (!ids_str.empty())
|
||||
ids_str += '-';
|
||||
ids_str += toString(id);
|
||||
}
|
||||
|
||||
uri.addQueryParameter("ids", ids_str);
|
||||
std::cerr << "\n\nLibraryBridgeHelper: " << toString(dictionary_id) << " , ids_num: " << ids.size() << " , ids: " << ids_str << std::endl << std::endl;
|
||||
LOG_ERROR(log, "dictionary_id: {}, ids_num: {}, ids: {}", dictionary_id, ids.size(), ids_str);
|
||||
|
||||
return loadBase(uri, [ids_str](std::ostream & os) { os << ids_str; });
|
||||
}
|
||||
|
||||
|
||||
@ -149,7 +229,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
|
||||
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri,
|
||||
|
@ -15,11 +15,18 @@ class LibraryBridgeHelper : public IBridgeHelper
|
||||
{
|
||||
|
||||
public:
|
||||
struct LibraryInitData
|
||||
{
|
||||
String library_path;
|
||||
String library_settings;
|
||||
String dict_attributes;
|
||||
};
|
||||
|
||||
static constexpr inline size_t DEFAULT_PORT = 9012;
|
||||
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_);
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
|
||||
|
||||
bool initLibrary(const std::string & library_path, std::string library_settings, std::string attributes_names);
|
||||
bool initLibrary(bool check_bridge = true) const;
|
||||
|
||||
bool cloneLibrary(const Field & other_dictionary_id);
|
||||
|
||||
@ -31,16 +38,21 @@ public:
|
||||
|
||||
BlockInputStreamPtr loadAll();
|
||||
|
||||
BlockInputStreamPtr loadIds(std::string ids_string);
|
||||
BlockInputStreamPtr loadIds(std::string ids_string, const std::vector<uint64_t> ids);
|
||||
|
||||
BlockInputStreamPtr loadKeys(const Block & requested_block);
|
||||
|
||||
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
|
||||
|
||||
LibraryInitData getLibraryData() const { return library_data; }
|
||||
|
||||
void setInitialized() { library_initialized = true; }
|
||||
|
||||
protected:
|
||||
bool checkBridgeIsRunning() const override;
|
||||
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
|
||||
|
||||
String serviceAlias() const override { return "clickhouse-library-bridge"; }
|
||||
@ -61,6 +73,8 @@ protected:
|
||||
|
||||
Poco::URI createBaseURI() const override;
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
|
||||
|
||||
private:
|
||||
static constexpr inline auto LIB_NEW_METHOD = "libNew";
|
||||
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
|
||||
@ -69,6 +83,7 @@ private:
|
||||
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
|
||||
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
|
||||
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
|
||||
static constexpr inline auto PING = "ping";
|
||||
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
|
||||
|
||||
Poco::URI createRequestURI(const String & method) const;
|
||||
@ -78,9 +93,11 @@ private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
const Poco::Timespan http_timeout;
|
||||
|
||||
LibraryInitData library_data;
|
||||
Field dictionary_id;
|
||||
std::string bridge_host;
|
||||
size_t bridge_port;
|
||||
bool library_initialized = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -60,20 +60,33 @@ public:
|
||||
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
|
||||
|
||||
XDBCBridgeHelper(
|
||||
ContextPtr context_,
|
||||
Poco::Timespan http_timeout_,
|
||||
const std::string & connection_string_)
|
||||
: IXDBCBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
, connection_string(connection_string_)
|
||||
, http_timeout(http_timeout_)
|
||||
, config(context_->getGlobalContext()->getConfigRef())
|
||||
{
|
||||
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
|
||||
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
|
||||
}
|
||||
ContextPtr context_,
|
||||
Poco::Timespan http_timeout_,
|
||||
const std::string & connection_string_)
|
||||
: IXDBCBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
, connection_string(connection_string_)
|
||||
, http_timeout(http_timeout_)
|
||||
, config(context_->getGlobalContext()->getConfigRef())
|
||||
{
|
||||
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
|
||||
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
|
||||
}
|
||||
|
||||
protected:
|
||||
bool checkBridgeIsRunning() const override
|
||||
{
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
return checkString(PING_OK_ANSWER, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
auto getConnectionString() const { return connection_string; }
|
||||
|
||||
String getName() const override { return BridgeHelperMixin::getName(); }
|
||||
|
@ -48,17 +48,34 @@ LibraryDictionarySource::LibraryDictionarySource(
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);
|
||||
|
||||
description.init(sample_block);
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
|
||||
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"), getDictAttributesString());
|
||||
|
||||
LibraryBridgeHelper::LibraryInitData library_data
|
||||
{
|
||||
.library_path = path,
|
||||
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
|
||||
.dict_attributes = getDictAttributesString()
|
||||
};
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
|
||||
auto res = bridge_helper->initLibrary();
|
||||
|
||||
if (!res)
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
|
||||
else
|
||||
bridge_helper->setInitialized();
|
||||
}
|
||||
|
||||
|
||||
LibraryDictionarySource::~LibraryDictionarySource()
|
||||
{
|
||||
bridge_helper->removeLibrary();
|
||||
try
|
||||
{
|
||||
bridge_helper->removeLibrary();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("LibraryDictionarySource");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -72,7 +89,7 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
|
||||
, context(other.context)
|
||||
, description{other.description}
|
||||
{
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
|
||||
bridge_helper->cloneLibrary(other.dictionary_id);
|
||||
}
|
||||
|
||||
@ -99,7 +116,7 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
|
||||
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
{
|
||||
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
|
||||
return bridge_helper->loadIds(getDictIdsString(ids));
|
||||
return bridge_helper->loadIds(getDictIdsString(ids), ids);
|
||||
}
|
||||
|
||||
|
||||
|
@ -8,8 +8,23 @@ from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
instance = cluster.add_instance('instance',
|
||||
dictionaries=['configs/dictionaries/dict1.xml'],
|
||||
main_configs=['configs/config.d/config.xml'])
|
||||
dictionaries=['configs/dictionaries/dict1.xml'], main_configs=['configs/config.d/config.xml'], stay_alive=True)
|
||||
|
||||
|
||||
def create_dict_simple():
|
||||
instance.query('DROP DICTIONARY IF EXISTS lib_dict_c')
|
||||
instance.query('''
|
||||
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
|
||||
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'))
|
||||
LAYOUT(CACHE(
|
||||
SIZE_IN_CELLS 10000000
|
||||
BLOCK_SIZE 4096
|
||||
FILE_SIZE 16777216
|
||||
READ_BUFFER_SIZE 1048576
|
||||
MAX_STORED_KEYS 1048576))
|
||||
LIFETIME(2) ;
|
||||
''')
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def ch_cluster():
|
||||
@ -160,6 +175,52 @@ def test_null_values(ch_cluster):
|
||||
assert(result == expected)
|
||||
|
||||
|
||||
def test_recover_after_bridge_crash(ch_cluster):
|
||||
if instance.is_built_with_memory_sanitizer():
|
||||
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
|
||||
|
||||
create_dict_simple()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
|
||||
assert(result.strip() == '100')
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.query('SYSTEM RELOAD DICTIONARY lib_dict_c')
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
|
||||
assert(result.strip() == '100')
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.query('DROP DICTIONARY lib_dict_c')
|
||||
|
||||
|
||||
def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
|
||||
if instance.is_built_with_memory_sanitizer():
|
||||
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
|
||||
|
||||
create_dict_simple()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.restart_clickhouse()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.restart_clickhouse()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.query('DROP DICTIONARY lib_dict_c')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user