Restore previous ids passing

This commit is contained in:
kssenii 2021-08-01 08:51:40 +00:00
parent 130253e3b9
commit 9c6a8b0059
3 changed files with 55 additions and 33 deletions

View File

@ -40,13 +40,12 @@ namespace
return sample_block;
}
// std::vector<uint64_t> parseIdsFromBinary(const std::string & ids_string)
// {
// ReadBufferFromString buf(ids_string);
// std::vector<uint64_t> ids;
// readVectorBinary(ids, buf);
// return ids;
// }
std::vector<uint64_t> parseIdsFromBinary(ReadBuffer & buf)
{
std::vector<uint64_t> ids;
readVectorBinary(ids, buf);
return ids;
}
std::vector<std::string> parseNamesFromBinary(const std::string & names_string)
{
@ -212,6 +211,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadAll() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadAll();
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
@ -222,23 +222,14 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
{
LOG_DEBUG(log, "Getting diciontary ids for dictionary with id: {}", dictionary_id);
String ids_string;
readString(ids_string, request.getStream());
Strings ids_strs;
splitInto<'-'>(ids_strs, ids_string);
std::vector<uint64_t> ids;
for (const auto & id : ids_strs)
ids.push_back(parse<uint64_t>(id));
if (ids.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Received no ids");
// std::vector<uint64_t> ids = parseIdsFromBinary(ids_string);
std::vector<uint64_t> ids = parseIdsFromBinary(request.getStream());
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadIds() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadIds(ids);
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);
@ -277,6 +268,7 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not found dictionary with id: {}", dictionary_id);
const auto & sample_block = library_handler->getSampleBlock();
LOG_DEBUG(log, "Calling loadKeys() for dictionary id: {}", dictionary_id);
auto input = library_handler->loadKeys(block.getColumns());
LOG_DEBUG(log, "Started sending result data for dictionary id: {}", dictionary_id);

View File

@ -191,24 +191,11 @@ BlockInputStreamPtr LibraryBridgeHelper::loadAll()
}
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string, const std::vector<uint64_t> ids)
BlockInputStreamPtr LibraryBridgeHelper::loadIds(const std::string ids_str, const std::vector<uint64_t> ids)
{
startBridgeSync();
auto uri = createRequestURI(LOAD_IDS_METHOD);
uri.addQueryParameter("ids_num", toString(ids.size()));
String ids_str;
for (const auto & id : ids)
{
if (!ids_str.empty())
ids_str += '-';
ids_str += toString(id);
}
uri.addQueryParameter("ids", ids_str);
std::cerr << "\n\nLibraryBridgeHelper: " << toString(dictionary_id) << " , ids_num: " << ids.size() << " , ids: " << ids_str << std::endl << std::endl;
LOG_ERROR(log, "dictionary_id: {}, ids_num: {}, ids: {}", dictionary_id, ids.size(), ids_str);
uri.addQueryParameter("ids_num", toString(ids.size())); /// Not used parameter, but helpful
return loadBase(uri, [ids_str](std::ostream & os) { os << ids_str; });
}

View File

@ -113,6 +113,10 @@ def test_load_ids(ch_cluster):
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(0));''')
assert(result.strip() == '100')
# Just check bridge is ok with a large vector of random ids
instance.query('''select number, dictGet(lib_dict_c, 'value1', toUInt64(rand())) from numbers(5000);''')
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
instance.query('DROP DICTIONARY lib_dict_c')
@ -221,6 +225,45 @@ def test_server_restart_bridge_might_be_stil_alive(ch_cluster):
instance.query('DROP DICTIONARY lib_dict_c')
def test_bridge_dies_with_parent(ch_cluster):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with third-party shared libraries")
if instance.is_built_with_address_sanitizer():
pytest.skip("Leak sanitizer falsely reports about a leak of 16 bytes in clickhouse-odbc-bridge")
create_dict_simple()
result = instance.query('''select dictGet(lib_dict_c, 'value1', toUInt64(1));''')
assert(result.strip() == '101')
clickhouse_pid = instance.get_process_pid("clickhouse server")
bridge_pid = instance.get_process_pid("library-bridge")
assert clickhouse_pid is not None
assert bridge_pid is not None
while clickhouse_pid is not None:
try:
instance.exec_in_container(["kill", str(clickhouse_pid)], privileged=True, user='root')
except:
pass
clickhouse_pid = instance.get_process_pid("clickhouse server")
time.sleep(1)
for i in range(30):
time.sleep(1)
bridge_pid = instance.get_process_pid("library-bridge")
if bridge_pid is None:
break
if bridge_pid:
out = instance.exec_in_container(["gdb", "-p", str(bridge_pid), "--ex", "thread apply all bt", "--ex", "q"],
privileged=True, user='root')
logging.debug(f"Bridge is running, gdb output:\n{out}")
assert clickhouse_pid is None
assert bridge_pid is None
instance.start_clickhouse(20)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")