mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #27060 from kssenii/library-bridge-fixes
library bridge fixes
This commit is contained in:
commit
f58727b50f
@ -12,8 +12,8 @@ namespace DB
|
||||
Poco::URI uri{request.getURI()};
|
||||
LOG_DEBUG(log, "Request URI: {}", uri.toString());
|
||||
|
||||
if (uri == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
return std::make_unique<PingHandler>(keep_alive_timeout);
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
|
||||
return std::make_unique<LibraryExistsHandler>(keep_alive_timeout, getContext());
|
||||
|
||||
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
return std::make_unique<LibraryRequestHandler>(keep_alive_timeout, getContext());
|
||||
|
@ -17,8 +17,24 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_REQUEST_PARAMETER;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
void processError(HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
if (!response.sent())
|
||||
*response.send() << message << std::endl;
|
||||
|
||||
LOG_WARNING(&Poco::Logger::get("LibraryBridge"), message);
|
||||
}
|
||||
|
||||
std::shared_ptr<Block> parseColumns(std::string && column_string)
|
||||
{
|
||||
auto sample_block = std::make_shared<Block>();
|
||||
@ -30,9 +46,8 @@ namespace
|
||||
return sample_block;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
|
||||
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
|
||||
{
|
||||
ReadBufferFromString buf(ids_string);
|
||||
std::vector<uint64_t> ids;
|
||||
readVectorBinary(ids, buf);
|
||||
return ids;
|
||||
@ -67,13 +82,36 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
std::string method = params.get("method");
|
||||
std::string dictionary_id = params.get("dictionary_id");
|
||||
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
|
||||
|
||||
LOG_TRACE(log, "Library method: '{}', dictionary id: {}", method, dictionary_id);
|
||||
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
|
||||
|
||||
try
|
||||
{
|
||||
if (method == "libNew")
|
||||
bool lib_new = (method == "libNew");
|
||||
if (method == "libClone")
|
||||
{
|
||||
if (!params.has("from_dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'from_dictionary_id' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string from_dictionary_id = params.get("from_dictionary_id");
|
||||
bool cloned = false;
|
||||
cloned = SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
|
||||
if (cloned)
|
||||
{
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Cannot clone from dictionary with id: {}, will call libNew instead");
|
||||
lib_new = true;
|
||||
}
|
||||
}
|
||||
if (lib_new)
|
||||
{
|
||||
auto & read_buf = request.getStream();
|
||||
params.read(read_buf);
|
||||
@ -92,6 +130,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
|
||||
std::string library_path = params.get("library_path");
|
||||
const auto & settings_string = params.get("library_settings");
|
||||
|
||||
LOG_DEBUG(log, "Parsing library settings from binary string");
|
||||
std::vector<std::string> library_settings = parseNamesFromBinary(settings_string);
|
||||
|
||||
/// Needed for library dictionary
|
||||
@ -102,6 +142,8 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
const auto & attributes_string = params.get("attributes_names");
|
||||
|
||||
LOG_DEBUG(log, "Parsing attributes names from binary string");
|
||||
std::vector<std::string> attributes_names = parseNamesFromBinary(attributes_string);
|
||||
|
||||
/// Needed to parse block from binary string format
|
||||
@ -140,54 +182,63 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
SharedLibraryHandlerFactory::instance().create(dictionary_id, library_path, library_settings, sample_block_with_nulls, attributes_names);
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "libClone")
|
||||
{
|
||||
if (!params.has("from_dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'from_dictionary_id' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string from_dictionary_id = params.get("from_dictionary_id");
|
||||
LOG_TRACE(log, "Calling libClone from {} to {}", from_dictionary_id, dictionary_id);
|
||||
SharedLibraryHandlerFactory::instance().clone(from_dictionary_id, dictionary_id);
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "libDelete")
|
||||
{
|
||||
SharedLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
auto deleted = SharedLibraryHandlerFactory::instance().remove(dictionary_id);
|
||||
|
||||
/// Do not throw, a warning is ok.
|
||||
if (!deleted)
|
||||
LOG_WARNING(log, "Cannot delete library for with dictionary id: {}, because such id was not found.", dictionary_id);
|
||||
|
||||
writeStringBinary("1", out);
|
||||
}
|
||||
else if (method == "isModified")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->isModified();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "supportsSelectiveLoad")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
bool res = library_handler->supportsSelectiveLoad();
|
||||
writeStringBinary(std::to_string(res), out);
|
||||
}
|
||||
else if (method == "loadAll")
|
||||
{
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadAll();
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
else if (method == "loadIds")
|
||||
{
|
||||
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
|
||||
String ids_string;
|
||||
readString(ids_string, request.getStream());
|
||||
std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
|
||||
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadIds(ids);
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
@ -219,8 +270,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
auto block = reader->read();
|
||||
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
if (!library_handler)
|
||||
throw Exception(ErrorCodes::BAD_REQUEST_PARAMETER, "Not found dictionary with id: {}", dictionary_id);
|
||||
|
||||
const auto & sample_block = library_handler->getSampleBlock();
|
||||
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
|
||||
auto input = library_handler->loadKeys(block.getColumns());
|
||||
|
||||
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
|
||||
BlockOutputStreamPtr output = FormatFactory::instance().getOutputStream(FORMAT, out, sample_block, getContext());
|
||||
copyData(*input, *output);
|
||||
}
|
||||
@ -228,8 +285,9 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
catch (...)
|
||||
{
|
||||
auto message = getCurrentExceptionMessage(true);
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
|
||||
LOG_ERROR(log, "Failed to process request for dictionary_id: {}. Error: {}", dictionary_id, message);
|
||||
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR, message); // can't call process_error, because of too soon response sending
|
||||
try
|
||||
{
|
||||
writeStringBinary(message, out);
|
||||
@ -239,8 +297,6 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
try
|
||||
@ -254,24 +310,30 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
|
||||
}
|
||||
|
||||
|
||||
void LibraryRequestHandler::processError(HTTPServerResponse & response, const std::string & message)
|
||||
{
|
||||
response.setStatusAndReason(HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
if (!response.sent())
|
||||
*response.send() << message << std::endl;
|
||||
|
||||
LOG_WARNING(log, message);
|
||||
}
|
||||
|
||||
|
||||
void PingHandler::handleRequest(HTTPServerRequest & /* request */, HTTPServerResponse & response)
|
||||
void LibraryExistsHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
|
||||
{
|
||||
try
|
||||
{
|
||||
LOG_TRACE(log, "Request URI: {}", request.getURI());
|
||||
HTMLForm params(getContext()->getSettingsRef(), request);
|
||||
|
||||
if (!params.has("dictionary_id"))
|
||||
{
|
||||
processError(response, "No 'dictionary_id' in request URL");
|
||||
return;
|
||||
}
|
||||
|
||||
std::string dictionary_id = params.get("dictionary_id");
|
||||
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
|
||||
String res;
|
||||
if (library_handler)
|
||||
res = "1";
|
||||
else
|
||||
res = "0";
|
||||
|
||||
setResponseDefaultHeaders(response, keep_alive_timeout);
|
||||
const char * data = "Ok.\n";
|
||||
response.sendBuffer(data, strlen(data));
|
||||
LOG_TRACE(log, "Senging ping response: {} (dictionary id: {})", res, dictionary_id);
|
||||
response.sendBuffer(res.data(), res.size());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -22,8 +22,7 @@ class LibraryRequestHandler : public HTTPRequestHandler, WithContext
|
||||
public:
|
||||
|
||||
LibraryRequestHandler(
|
||||
size_t keep_alive_timeout_,
|
||||
ContextPtr context_)
|
||||
size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
@ -35,18 +34,18 @@ public:
|
||||
private:
|
||||
static constexpr inline auto FORMAT = "RowBinary";
|
||||
|
||||
void processError(HTTPServerResponse & response, const std::string & message);
|
||||
|
||||
Poco::Logger * log;
|
||||
size_t keep_alive_timeout;
|
||||
};
|
||||
|
||||
|
||||
class PingHandler : public HTTPRequestHandler
|
||||
class LibraryExistsHandler : public HTTPRequestHandler, WithContext
|
||||
{
|
||||
public:
|
||||
explicit PingHandler(size_t keep_alive_timeout_)
|
||||
: keep_alive_timeout(keep_alive_timeout_)
|
||||
explicit LibraryExistsHandler(size_t keep_alive_timeout_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, keep_alive_timeout(keep_alive_timeout_)
|
||||
, log(&Poco::Logger::get("LibraryRequestHandler"))
|
||||
{
|
||||
}
|
||||
|
||||
@ -54,6 +53,8 @@ public:
|
||||
|
||||
private:
|
||||
const size_t keep_alive_timeout;
|
||||
Poco::Logger * log;
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4,12 +4,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -18,7 +12,7 @@ SharedLibraryHandlerPtr SharedLibraryHandlerFactory::get(const std::string & dic
|
||||
if (library_handler != library_handlers.end())
|
||||
return library_handler->second;
|
||||
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Not found dictionary with id: {}", dictionary_id);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
@ -30,32 +24,32 @@ void SharedLibraryHandlerFactory::create(
|
||||
const std::vector<std::string> & attributes_names)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
library_handlers[dictionary_id] = std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names);
|
||||
if (!library_handlers.count(dictionary_id))
|
||||
library_handlers.emplace(std::make_pair(dictionary_id, std::make_shared<SharedLibraryHandler>(library_path, library_settings, sample_block, attributes_names)));
|
||||
else
|
||||
LOG_WARNING(&Poco::Logger::get("SharedLibraryHandlerFactory"), "Library handler with dictionary id {} already exists", dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
void SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
bool SharedLibraryHandlerFactory::clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto from_library_handler = library_handlers.find(from_dictionary_id);
|
||||
|
||||
/// This is not supposed to happen as libClone is called from copy constructor of LibraryDictionarySource
|
||||
/// object, and shared library handler of from_dictionary is removed only in its destructor.
|
||||
/// And if for from_dictionary there was no shared library handler, it would have received and exception in
|
||||
/// its constructor, so no libClone would be made from it.
|
||||
if (from_library_handler == library_handlers.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No shared library handler found");
|
||||
return false;
|
||||
|
||||
/// libClone method will be called in copy constructor
|
||||
library_handlers[to_dictionary_id] = std::make_shared<SharedLibraryHandler>(*from_library_handler->second);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
bool SharedLibraryHandlerFactory::remove(const std::string & dictionary_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
/// libDelete is called in destructor.
|
||||
library_handlers.erase(dictionary_id);
|
||||
return library_handlers.erase(dictionary_id);
|
||||
}
|
||||
|
||||
|
||||
|
@ -24,9 +24,9 @@ public:
|
||||
const Block & sample_block,
|
||||
const std::vector<std::string> & attributes_names);
|
||||
|
||||
void clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
|
||||
bool clone(const std::string & from_dictionary_id, const std::string & to_dictionary_id);
|
||||
|
||||
void remove(const std::string & dictionary_id);
|
||||
bool remove(const std::string & dictionary_id);
|
||||
|
||||
private:
|
||||
/// map: dict_id -> sharedLibraryHandler
|
||||
|
@ -33,24 +33,9 @@ Poco::URI IBridgeHelper::getPingURI() const
|
||||
}
|
||||
|
||||
|
||||
bool IBridgeHelper::checkBridgeIsRunning() const
|
||||
void IBridgeHelper::startBridgeSync()
|
||||
{
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
return checkString(PING_OK_ANSWER, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void IBridgeHelper::startBridgeSync() const
|
||||
{
|
||||
if (!checkBridgeIsRunning())
|
||||
if (!bridgeHandShake())
|
||||
{
|
||||
LOG_TRACE(getLog(), "{} is not running, will try to start it", serviceAlias());
|
||||
startBridge(startBridgeCommand());
|
||||
@ -64,7 +49,7 @@ void IBridgeHelper::startBridgeSync() const
|
||||
++counter;
|
||||
LOG_TRACE(getLog(), "Checking {} is running, try {}", serviceAlias(), counter);
|
||||
|
||||
if (checkBridgeIsRunning())
|
||||
if (bridgeHandShake())
|
||||
{
|
||||
started = true;
|
||||
break;
|
||||
@ -81,7 +66,7 @@ void IBridgeHelper::startBridgeSync() const
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand() const
|
||||
std::unique_ptr<ShellCommand> IBridgeHelper::startBridgeCommand()
|
||||
{
|
||||
if (startBridgeManually())
|
||||
throw Exception(serviceAlias() + " is not running. Please, start it manually", ErrorCodes::EXTERNAL_SERVER_IS_NOT_RESPONDING);
|
||||
|
@ -28,16 +28,19 @@ public:
|
||||
static const inline std::string MAIN_METHOD = Poco::Net::HTTPRequest::HTTP_POST;
|
||||
|
||||
explicit IBridgeHelper(ContextPtr context_) : WithContext(context_) {}
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
void startBridgeSync() const;
|
||||
virtual ~IBridgeHelper() = default;
|
||||
|
||||
Poco::URI getMainURI() const;
|
||||
|
||||
Poco::URI getPingURI() const;
|
||||
|
||||
void startBridgeSync();
|
||||
|
||||
protected:
|
||||
/// Check bridge is running. Can also check something else in the mean time.
|
||||
virtual bool bridgeHandShake() = 0;
|
||||
|
||||
/// clickhouse-odbc-bridge, clickhouse-library-bridge
|
||||
virtual String serviceAlias() const = 0;
|
||||
|
||||
@ -61,9 +64,7 @@ protected:
|
||||
|
||||
|
||||
private:
|
||||
bool checkBridgeIsRunning() const;
|
||||
|
||||
std::unique_ptr<ShellCommand> startBridgeCommand() const;
|
||||
std::unique_ptr<ShellCommand> startBridgeCommand();
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include "LibraryBridgeHelper.h"
|
||||
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <DataStreams/OneBlockInputStream.h>
|
||||
#include <DataStreams/OwningBlockInputStream.h>
|
||||
#include <DataStreams/formatBlock.h>
|
||||
@ -8,6 +7,8 @@
|
||||
#include <Processors/Formats/InputStreamFromInputFormat.h>
|
||||
#include <IO/WriteBufferFromOStream.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
@ -20,16 +21,25 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int EXTERNAL_LIBRARY_ERROR;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
LibraryBridgeHelper::LibraryBridgeHelper(
|
||||
ContextPtr context_,
|
||||
const Block & sample_block_,
|
||||
const Field & dictionary_id_)
|
||||
const Field & dictionary_id_,
|
||||
const LibraryInitData & library_data_)
|
||||
: IBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get("LibraryBridgeHelper"))
|
||||
, sample_block(sample_block_)
|
||||
, config(context_->getConfigRef())
|
||||
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
|
||||
, library_data(library_data_)
|
||||
, dictionary_id(dictionary_id_)
|
||||
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
|
||||
{
|
||||
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);
|
||||
bridge_host = config.getString("library_bridge.host", DEFAULT_HOST);
|
||||
@ -61,26 +71,91 @@ void LibraryBridgeHelper::startBridge(std::unique_ptr<ShellCommand> cmd) const
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::initLibrary(const std::string & library_path, const std::string library_settings, const std::string attributes_names)
|
||||
bool LibraryBridgeHelper::bridgeHandShake()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
String result;
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts);
|
||||
readString(result, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* When pinging bridge we also pass current dicionary_id. The bridge will check if there is such
|
||||
* dictionary. It is possible that such dictionary_id is not present only in two cases:
|
||||
* 1. It is dictionary source creation and initialization of library handler on bridge side did not happen yet.
|
||||
* 2. Bridge crashed or restarted for some reason while server did not.
|
||||
**/
|
||||
if (result.size() != 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {}. Check bridge and server have the same version.", result);
|
||||
|
||||
UInt8 dictionary_id_exists;
|
||||
auto parsed = tryParse<UInt8>(dictionary_id_exists, result);
|
||||
if (!parsed || (dictionary_id_exists != 0 && dictionary_id_exists != 1))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
|
||||
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
|
||||
|
||||
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
|
||||
toString(dictionary_id), toString(dictionary_id_exists), library_initialized);
|
||||
|
||||
if (dictionary_id_exists && !library_initialized)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Library was not initialized, but bridge responded to already have dictionary id: {}", dictionary_id);
|
||||
|
||||
/// Here we want to say bridge to recreate a new library handler for current dictionary,
|
||||
/// because it responded to have lost it, but we know that it has already been created. (It is a direct result of bridge crash).
|
||||
if (!dictionary_id_exists && library_initialized)
|
||||
{
|
||||
LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
|
||||
bool reinitialized = false;
|
||||
try
|
||||
{
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
reinitialized = executeRequest(uri, getInitLibraryCallback());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!reinitialized)
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR,
|
||||
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback LibraryBridgeHelper::getInitLibraryCallback() const
|
||||
{
|
||||
/// Sample block must contain null values
|
||||
WriteBufferFromOwnString out;
|
||||
auto output_stream = getContext()->getOutputStream(LibraryBridgeHelper::DEFAULT_FORMAT, out, sample_block);
|
||||
formatBlock(output_stream, sample_block);
|
||||
auto block_string = out.str();
|
||||
|
||||
auto out_stream_callback = [library_path, library_settings, attributes_names, block_string, this](std::ostream & os)
|
||||
return [block_string, this](std::ostream & os)
|
||||
{
|
||||
os << "library_path=" << escapeForFileName(library_path) << "&";
|
||||
os << "library_settings=" << escapeForFileName(library_settings) << "&";
|
||||
os << "attributes_names=" << escapeForFileName(attributes_names) << "&";
|
||||
os << "library_path=" << escapeForFileName(library_data.library_path) << "&";
|
||||
os << "library_settings=" << escapeForFileName(library_data.library_settings) << "&";
|
||||
os << "attributes_names=" << escapeForFileName(library_data.dict_attributes) << "&";
|
||||
os << "sample_block=" << escapeForFileName(sample_block.getNamesAndTypesList().toString()) << "&";
|
||||
os << "null_values=" << escapeForFileName(block_string);
|
||||
};
|
||||
return executeRequest(uri, out_stream_callback);
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::initLibrary()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_NEW_METHOD);
|
||||
library_initialized = executeRequest(uri, getInitLibraryCallback());
|
||||
return library_initialized;
|
||||
}
|
||||
|
||||
|
||||
@ -89,15 +164,23 @@ bool LibraryBridgeHelper::cloneLibrary(const Field & other_dictionary_id)
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_CLONE_METHOD);
|
||||
uri.addQueryParameter("from_dictionary_id", toString(other_dictionary_id));
|
||||
return executeRequest(uri);
|
||||
/// We also pass initialization settings in order to create a library handler
|
||||
/// in case from_dictionary_id does not exist in bridge side (possible in case of bridge crash).
|
||||
library_initialized = executeRequest(uri, getInitLibraryCallback());
|
||||
return library_initialized;
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::removeLibrary()
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LIB_DELETE_METHOD);
|
||||
return executeRequest(uri);
|
||||
/// Do not force bridge restart if it is not running in case of removeLibrary
|
||||
/// because in this case after restart it will not have this dictionaty id in memory anyway.
|
||||
if (bridgeHandShake())
|
||||
{
|
||||
auto uri = createRequestURI(LIB_DELETE_METHOD);
|
||||
return executeRequest(uri);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -125,10 +208,12 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
|
||||
}
|
||||
|
||||
|
||||
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_string)
|
||||
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
|
||||
{
|
||||
startBridgeSync();
|
||||
auto uri = createRequestURI(LOAD_IDS_METHOD);
|
||||
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
|
||||
auto ids_string = getDictIdsString(ids);
|
||||
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
|
||||
}
|
||||
|
||||
@ -149,13 +234,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadKeys(const Block & requested_block)
|
||||
}
|
||||
|
||||
|
||||
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback)
|
||||
bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback) const
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_POST,
|
||||
std::move(out_stream_callback),
|
||||
ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
http_timeouts);
|
||||
|
||||
bool res;
|
||||
readBoolText(res, buf);
|
||||
@ -169,7 +254,7 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_POST,
|
||||
std::move(out_stream_callback),
|
||||
ConnectionTimeouts::getHTTPTimeouts(getContext()),
|
||||
http_timeouts,
|
||||
0,
|
||||
Poco::Net::HTTPBasicCredentials{},
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
@ -179,4 +264,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
|
||||
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
|
||||
}
|
||||
|
||||
|
||||
String LibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
writeVectorBinary(ids, out);
|
||||
return out.str();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -15,11 +15,18 @@ class LibraryBridgeHelper : public IBridgeHelper
|
||||
{
|
||||
|
||||
public:
|
||||
struct LibraryInitData
|
||||
{
|
||||
String library_path;
|
||||
String library_settings;
|
||||
String dict_attributes;
|
||||
};
|
||||
|
||||
static constexpr inline size_t DEFAULT_PORT = 9012;
|
||||
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_);
|
||||
LibraryBridgeHelper(ContextPtr context_, const Block & sample_block, const Field & dictionary_id_, const LibraryInitData & library_data_);
|
||||
|
||||
bool initLibrary(const std::string & library_path, std::string library_settings, std::string attributes_names);
|
||||
bool initLibrary();
|
||||
|
||||
bool cloneLibrary(const Field & other_dictionary_id);
|
||||
|
||||
@ -31,16 +38,19 @@ public:
|
||||
|
||||
BlockInputStreamPtr loadAll();
|
||||
|
||||
BlockInputStreamPtr loadIds(std::string ids_string);
|
||||
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids);
|
||||
|
||||
BlockInputStreamPtr loadKeys(const Block & requested_block);
|
||||
|
||||
BlockInputStreamPtr loadBase(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {});
|
||||
bool executeRequest(const Poco::URI & uri, ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback = {}) const;
|
||||
|
||||
LibraryInitData getLibraryData() const { return library_data; }
|
||||
|
||||
protected:
|
||||
bool bridgeHandShake() override;
|
||||
|
||||
void startBridge(std::unique_ptr<ShellCommand> cmd) const override;
|
||||
|
||||
String serviceAlias() const override { return "clickhouse-library-bridge"; }
|
||||
@ -61,6 +71,8 @@ protected:
|
||||
|
||||
Poco::URI createBaseURI() const override;
|
||||
|
||||
ReadWriteBufferFromHTTP::OutStreamCallback getInitLibraryCallback() const;
|
||||
|
||||
private:
|
||||
static constexpr inline auto LIB_NEW_METHOD = "libNew";
|
||||
static constexpr inline auto LIB_CLONE_METHOD = "libClone";
|
||||
@ -69,18 +81,24 @@ private:
|
||||
static constexpr inline auto LOAD_IDS_METHOD = "loadIds";
|
||||
static constexpr inline auto LOAD_KEYS_METHOD = "loadKeys";
|
||||
static constexpr inline auto IS_MODIFIED_METHOD = "isModified";
|
||||
static constexpr inline auto PING = "ping";
|
||||
static constexpr inline auto SUPPORTS_SELECTIVE_LOAD_METHOD = "supportsSelectiveLoad";
|
||||
|
||||
Poco::URI createRequestURI(const String & method) const;
|
||||
|
||||
static String getDictIdsString(const std::vector<UInt64> & ids);
|
||||
|
||||
Poco::Logger * log;
|
||||
const Block sample_block;
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
const Poco::Timespan http_timeout;
|
||||
|
||||
LibraryInitData library_data;
|
||||
Field dictionary_id;
|
||||
std::string bridge_host;
|
||||
size_t bridge_port;
|
||||
bool library_initialized = false;
|
||||
ConnectionTimeouts http_timeouts;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -60,20 +60,33 @@ public:
|
||||
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
|
||||
|
||||
XDBCBridgeHelper(
|
||||
ContextPtr context_,
|
||||
Poco::Timespan http_timeout_,
|
||||
const std::string & connection_string_)
|
||||
: IXDBCBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
, connection_string(connection_string_)
|
||||
, http_timeout(http_timeout_)
|
||||
, config(context_->getGlobalContext()->getConfigRef())
|
||||
{
|
||||
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
|
||||
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
|
||||
}
|
||||
ContextPtr context_,
|
||||
Poco::Timespan http_timeout_,
|
||||
const std::string & connection_string_)
|
||||
: IXDBCBridgeHelper(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
|
||||
, connection_string(connection_string_)
|
||||
, http_timeout(http_timeout_)
|
||||
, config(context_->getGlobalContext()->getConfigRef())
|
||||
{
|
||||
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
|
||||
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
|
||||
}
|
||||
|
||||
protected:
|
||||
bool bridgeHandShake() override
|
||||
{
|
||||
try
|
||||
{
|
||||
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
|
||||
return checkString(PING_OK_ANSWER, buf);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
auto getConnectionString() const { return connection_string; }
|
||||
|
||||
String getName() const override { return BridgeHelperMixin::getName(); }
|
||||
|
@ -41,6 +41,9 @@ LibraryDictionarySource::LibraryDictionarySource(
|
||||
, sample_block{sample_block_}
|
||||
, context(Context::createCopy(context_))
|
||||
{
|
||||
if (fs::path(path).is_relative())
|
||||
path = fs::canonical(path);
|
||||
|
||||
if (created_from_ddl && !pathStartsWith(path, context->getDictionariesLibPath()))
|
||||
throw Exception(ErrorCodes::PATH_ACCESS_DENIED, "File path {} is not inside {}", path, context->getDictionariesLibPath());
|
||||
|
||||
@ -48,17 +51,32 @@ LibraryDictionarySource::LibraryDictionarySource(
|
||||
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "LibraryDictionarySource: Can't load library {}: file doesn't exist", path);
|
||||
|
||||
description.init(sample_block);
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
|
||||
auto res = bridge_helper->initLibrary(path, getLibrarySettingsString(config, config_prefix + ".settings"), getDictAttributesString());
|
||||
|
||||
if (!res)
|
||||
LibraryBridgeHelper::LibraryInitData library_data
|
||||
{
|
||||
.library_path = path,
|
||||
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
|
||||
.dict_attributes = getDictAttributesString()
|
||||
};
|
||||
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
|
||||
|
||||
if (!bridge_helper->initLibrary())
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
|
||||
}
|
||||
|
||||
|
||||
LibraryDictionarySource::~LibraryDictionarySource()
|
||||
{
|
||||
bridge_helper->removeLibrary();
|
||||
try
|
||||
{
|
||||
bridge_helper->removeLibrary();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException("LibraryDictionarySource");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -72,8 +90,9 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
|
||||
, context(other.context)
|
||||
, description{other.description}
|
||||
{
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id);
|
||||
bridge_helper->cloneLibrary(other.dictionary_id);
|
||||
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
|
||||
if (!bridge_helper->cloneLibrary(other.dictionary_id))
|
||||
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to clone library");
|
||||
}
|
||||
|
||||
|
||||
@ -99,7 +118,7 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
|
||||
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
|
||||
{
|
||||
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
|
||||
return bridge_helper->loadIds(getDictIdsString(ids));
|
||||
return bridge_helper->loadIds(ids);
|
||||
}
|
||||
|
||||
|
||||
@ -147,14 +166,6 @@ String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::Abstr
|
||||
}
|
||||
|
||||
|
||||
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
writeVectorBinary(ids, out);
|
||||
return out.str();
|
||||
}
|
||||
|
||||
|
||||
String LibraryDictionarySource::getDictAttributesString()
|
||||
{
|
||||
std::vector<String> attributes_names(dict_struct.attributes.size());
|
||||
|
@ -70,8 +70,6 @@ public:
|
||||
std::string toString() const override;
|
||||
|
||||
private:
|
||||
static String getDictIdsString(const std::vector<UInt64> & ids);
|
||||
|
||||
String getDictAttributesString();
|
||||
|
||||
static String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);
|
||||
@ -82,7 +80,7 @@ private:
|
||||
|
||||
const DictionaryStructure dict_struct;
|
||||
const std::string config_prefix;
|
||||
const std::string path;
|
||||
std::string path;
|
||||
const Field dictionary_id;
|
||||
|
||||
Block sample_block;
|
||||
|
@ -8,5 +8,9 @@
|
||||
<count>10</count>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
|
||||
<library_bridge_log>/var/log/clickhouse-server/clickhouse-library-bridge.log</library_bridge_log>
|
||||
<library_bridge_errlog>/var/log/clickhouse-server/clickhouse-library-bridge.err.log</library_bridge_errlog>
|
||||
<library_bridge_level>trace</library_bridge_level>
|
||||
</logger>
|
||||
</yandex>
|
||||
|
@ -2,14 +2,30 @@ import os
|
||||
import os.path as p
|
||||
import pytest
|
||||
import time
|
||||
import logging
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
instance = cluster.add_instance('instance',
|
||||
dictionaries=['configs/dictionaries/dict1.xml'],
|
||||
main_configs=['configs/config.d/config.xml'])
|
||||
dictionaries=['configs/dictionaries/dict1.xml'], main_configs=['configs/config.d/config.xml'], stay_alive=True)
|
||||
|
||||
|
||||
def create_dict_simple():
|
||||
instance.query('DROP DICTIONARY IF EXISTS lib_dict_c')
|
||||
instance.query('''
|
||||
CREATE DICTIONARY lib_dict_c (key UInt64, value1 UInt64, value2 UInt64, value3 UInt64)
|
||||
PRIMARY KEY key SOURCE(library(PATH '/etc/clickhouse-server/config.d/dictionaries_lib/dict_lib.so'))
|
||||
LAYOUT(CACHE(
|
||||
SIZE_IN_CELLS 10000000
|
||||
BLOCK_SIZE 4096
|
||||
FILE_SIZE 16777216
|
||||
READ_BUFFER_SIZE 1048576
|
||||
MAX_STORED_KEYS 1048576))
|
||||
LIFETIME(2) ;
|
||||
''')
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def ch_cluster():
|
||||
@ -98,6 +114,10 @@ def test_load_ids(ch_cluster):
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
|
||||
assert(result.strip() == '100')
|
||||
|
||||
# Just check bridge is ok with a large vector of random ids
|
||||
instance.query('''select number, dictGet(lib_dict_c, 'value1', toUInt64(rand())) from numbers(1000);''')
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
instance.query('DROP DICTIONARY lib_dict_c')
|
||||
@ -160,6 +180,91 @@ def test_null_values(ch_cluster):
|
||||
assert(result == expected)
|
||||
|
||||
|
||||
def test_recover_after_bridge_crash(ch_cluster):
|
||||
if instance.is_built_with_memory_sanitizer():
|
||||
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
|
||||
|
||||
create_dict_simple()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
|
||||
assert(result.strip() == '100')
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.query('SYSTEM RELOAD DICTIONARY lib_dict_c')
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
|
||||
assert(result.strip() == '100')
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.query('DROP DICTIONARY lib_dict_c')
|
||||
|
||||
|
||||
def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
|
||||
if instance.is_built_with_memory_sanitizer():
|
||||
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
|
||||
|
||||
create_dict_simple()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.restart_clickhouse()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.exec_in_container(['bash', '-c', 'kill -9 `pidof clickhouse-library-bridge`'], user='root')
|
||||
instance.restart_clickhouse()
|
||||
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
instance.query('DROP DICTIONARY lib_dict_c')
|
||||
|
||||
|
||||
def test_bridge_dies_with_parent(ch_cluster):
|
||||
if instance.is_built_with_memory_sanitizer():
|
||||
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
|
||||
if instance.is_built_with_address_sanitizer():
|
||||
pytest.skip("Leak sanitizer falsely reports about a leak of 16 bytes in clickhouse-odbc-bridge")
|
||||
|
||||
create_dict_simple()
|
||||
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
|
||||
assert(result.strip() == '101')
|
||||
|
||||
clickhouse_pid = instance.get_process_pid("clickhouse server")
|
||||
bridge_pid = instance.get_process_pid("library-bridge")
|
||||
assert clickhouse_pid is not None
|
||||
assert bridge_pid is not None
|
||||
|
||||
while clickhouse_pid is not None:
|
||||
try:
|
||||
instance.exec_in_container(["kill", str(clickhouse_pid)], privileged=True, user='root')
|
||||
except:
|
||||
pass
|
||||
clickhouse_pid = instance.get_process_pid("clickhouse server")
|
||||
time.sleep(1)
|
||||
|
||||
for i in range(30):
|
||||
time.sleep(1)
|
||||
bridge_pid = instance.get_process_pid("library-bridge")
|
||||
if bridge_pid is None:
|
||||
break
|
||||
|
||||
if bridge_pid:
|
||||
out = instance.exec_in_container(["gdb", "-p", str(bridge_pid), "--ex", "thread apply all bt", "--ex", "q"],
|
||||
privileged=True, user='root')
|
||||
logging.debug(f"Bridge is running, gdb output:\n{out}")
|
||||
|
||||
assert clickhouse_pid is None
|
||||
assert bridge_pid is None
|
||||
instance.start_clickhouse(20)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cluster.start()
|
||||
input("Cluster created, press any key to destroy...")
|
||||
|
Loading…
Reference in New Issue
Block a user