Some small changes

This commit is contained in:
kssenii 2021-08-01 09:56:48 +00:00
parent 9c6a8b0059
commit 2bdb97d5e0
5 changed files with 45 additions and 25 deletions

View File

@ -1,6 +1,5 @@
#include "LibraryBridgeHelper.h"
#include <IO/ReadHelpers.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/formatBlock.h>
@ -8,6 +7,8 @@
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/ShellCommand.h>
@ -20,6 +21,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_LIBRARY_ERROR;
extern const int LOGICAL_ERROR;
}
LibraryBridgeHelper::LibraryBridgeHelper(
ContextPtr context_,
const Block & sample_block_,
@ -93,23 +100,29 @@ bool LibraryBridgeHelper::checkBridgeIsRunning() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected message from library bridge: {} ({}). Check bridge and server have the same version.",
result, parsed ? toString(dictionary_id_exists) : "failed to parse");
LOG_TRACE(log, "dictionary_id: {}, dictionary_id_exists on bridge side: {}, library confirmed to be initialized on server side: {}",
toString(dictionary_id), toString(dictionary_id_exists), library_initialized);
if (dictionary_id_exists && !library_initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Library was not initialized, but bridge responded to already have dictionary id: {}", dictionary_id);
if (!dictionary_id_exists && library_initialized)
{
LOG_WARNING(log, "Library bridge does not have library handler with dictionaty id: {}. It will be reinitialized.", dictionary_id);
bool reinitialized = false;
try
{
if (!initLibrary(false))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
reinitialized = initLibrary(false);
}
catch (...)
{
tryLogCurrentException(log);
return false;
}
if (!reinitialized)
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR,
"Failed to reinitialize library handler on bridge side for dictionary with id: {}", dictionary_id);
}
return true;
@ -191,12 +204,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_str, const std::vector<uint64_t> ids)
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::vector<uint64_t> & ids)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
return loadBase(uri, [ids_str](std::ostream & os) { os << ids_str; });
auto ids_string = getDictIdsString(ids);
return loadBase(uri, [ids_string](std::ostream & os) { os << ids_string; });
}
@ -246,4 +260,13 @@ BlockInputStreamPtr LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWri
return std::make_shared<OwningBlockInputStream<ReadWriteBufferFromHTTP>>(input_stream, std::move(read_buf_ptr));
}
String LibraryBridgeHelper::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
}

View File

@ -38,7 +38,7 @@ public:
BlockInputStreamPtr loadAll();
BlockInputStreamPtr loadIds(std::string ids_string, const std::vector<uint64_t> ids);
BlockInputStreamPtr loadIds(const std::vector<uint64_t> & ids);
BlockInputStreamPtr loadKeys(const Block & requested_block);
@ -88,6 +88,8 @@ private:
Poco::URI createRequestURI(const String & method) const;
static String getDictIdsString(const std::vector<UInt64> & ids);
Poco::Logger * log;
const Block sample_block;
const Poco::Util::AbstractConfiguration & config;

View File

@ -55,13 +55,14 @@ LibraryDictionarySource::LibraryDictionarySource(
.library_settings = getLibrarySettingsString(config, config_prefix + ".settings"),
.dict_attributes = getDictAttributesString()
};
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
auto res = bridge_helper->initLibrary();
if (!res)
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
else
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, library_data);
bool initialized = bridge_helper->initLibrary();
if (initialized)
bridge_helper->setInitialized();
else
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to create shared library from path: {}", path);
}
@ -90,7 +91,11 @@ LibraryDictionarySource::LibraryDictionarySource(const LibraryDictionarySource &
, description{other.description}
{
bridge_helper = std::make_shared<LibraryBridgeHelper>(context, description.sample_block, dictionary_id, other.bridge_helper->getLibraryData());
bridge_helper->cloneLibrary(other.dictionary_id);
bool cloned = bridge_helper->cloneLibrary(other.dictionary_id);
if (cloned)
bridge_helper->setInitialized();
else
throw Exception(ErrorCodes::EXTERNAL_LIBRARY_ERROR, "Failed to clone library");
}
@ -116,7 +121,7 @@ BlockInputStreamPtr LibraryDictionarySource::loadAll()
BlockInputStreamPtr LibraryDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
return bridge_helper->loadIds(getDictIdsString(ids), ids);
return bridge_helper->loadIds(ids);
}
@ -164,14 +169,6 @@ String LibraryDictionarySource::getLibrarySettingsString(const Poco::Util::Abstr
}
String LibraryDictionarySource::getDictIdsString(const std::vector<UInt64> & ids)
{
WriteBufferFromOwnString out;
writeVectorBinary(ids, out);
return out.str();
}
String LibraryDictionarySource::getDictAttributesString()
{
std::vector<String> attributes_names(dict_struct.attributes.size());

View File

@ -70,8 +70,6 @@ public:
std::string toString() const override;
private:
static String getDictIdsString(const std::vector<UInt64> & ids);
String getDictAttributesString();
static String getLibrarySettingsString(const Poco::Util::AbstractConfiguration & config, const std::string & config_root);

View File

@ -115,7 +115,7 @@ def test_load_ids(ch_cluster):
assert(result.strip() == '100')
# Just check bridge is ok with a large vector of random ids
instance.query('''select number, dictGet(lib_dict_c, 'value1', toUInt64(rand())) from numbers(5000);''')
instance.query('''select number, dictGet(lib_dict_c, 'value1', toUInt64(rand())) from numbers(1000);''')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')