diff --git a/docker/test/integration/runner/compose/docker_compose_meili.yml b/docker/test/integration/runner/compose/docker_compose_meili.yml index 196324bfcfb..55f1cfad76e 100644 --- a/docker/test/integration/runner/compose/docker_compose_meili.yml +++ b/docker/test/integration/runner/compose/docker_compose_meili.yml @@ -3,14 +3,14 @@ services: meili1: image: getmeili/meilisearch:latest restart: always - command: ./meilisearch ports: - ${MEILI_EXTERNAL_PORT}:${MEILI_INTERNAL_PORT} meili_secure: image: getmeili/meilisearch:latest restart: always - command: ./meilisearch --master-key="password" ports: - ${MEILI_SECURE_EXTERNAL_PORT}:${MEILI_SECURE_INTERNAL_PORT} + environment: + MEILI_MASTER_KEY: "password" diff --git a/src/Storages/MeiliSearch/MeiliSearchConnection.cpp b/src/Storages/MeiliSearch/MeiliSearchConnection.cpp index 8bc3447969c..a4f4594397d 100644 --- a/src/Storages/MeiliSearch/MeiliSearchConnection.cpp +++ b/src/Storages/MeiliSearch/MeiliSearchConnection.cpp @@ -5,6 +5,7 @@ #include #include +#include namespace DB { @@ -20,7 +21,7 @@ MeiliSearchConnection::MeiliSearchConnection(const MeiliConfig & conf) : config{ session.setPort(uri.getPort()); } -void MeiliSearchConnection::execQuery(const String & url, std::string_view post_fields, std::string & response_buffer) const +void MeiliSearchConnection::execPostQuery(const String & url, std::string_view post_fields, std::string & response_buffer) const { Poco::URI uri(url); @@ -45,11 +46,43 @@ void MeiliSearchConnection::execQuery(const String & url, std::string_view post_ // need to separate MeiliSearch response from other situations // in order to handle it properly if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4) - response_buffer = String(std::istreambuf_iterator(is), {}); + Poco::StreamCopier::copyToString(is, response_buffer); else throw Exception(ErrorCodes::NETWORK_ERROR, res.getReason()); } +void MeiliSearchConnection::execGetQuery( + const String & url, const std::unordered_map & query_params, std::string & response_buffer) const +{ + Poco::URI uri(url); + for (const auto & kv : query_params) + { + uri.addQueryParameter(kv.first, kv.second); + } + + String path(uri.getPathAndQuery()); + if (path.empty()) + path = "/"; + + Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, path, Poco::Net::HTTPMessage::HTTP_1_1); + + if (!config.key.empty()) + req.add("Authorization", "Bearer " + config.key); + + session.sendRequest(req); + + Poco::Net::HTTPResponse res; + std::istream & is = session.receiveResponse(res); + + // need to separate MeiliSearch response from other situations + // in order to handle it properly + if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4) + Poco::StreamCopier::copyToString(is, response_buffer); + else + throw Exception(ErrorCodes::NETWORK_ERROR, res.getReason()); +} + + String MeiliSearchConnection::searchQuery(const std::unordered_map & query_params) const { std::string response_buffer; @@ -71,7 +104,7 @@ String MeiliSearchConnection::searchQuery(const std::unordered_map & query_params) const +{ + String response_buffer; + + String url = config.connection_string + "documents"; + + execGetQuery(url, query_params, response_buffer); return response_buffer; } diff --git a/src/Storages/MeiliSearch/MeiliSearchConnection.h b/src/Storages/MeiliSearch/MeiliSearchConnection.h index 0e26bf02328..687471e6fea 100644 --- a/src/Storages/MeiliSearch/MeiliSearchConnection.h +++ b/src/Storages/MeiliSearch/MeiliSearchConnection.h @@ -35,10 +35,14 @@ public: String searchQuery(const std::unordered_map & query_params) const; + String getDocumentsQuery(const std::unordered_map & query_params) const; + String updateQuery(std::string_view data) const; private: - void execQuery(const String & url, std::string_view post_fields, std::string & response_buffer) const; + void execPostQuery(const String & url, std::string_view post_fields, std::string & response_buffer) const; + + void execGetQuery(const String & url, const std::unordered_map & query_params, std::string & response_buffer) const; MeiliConfig config; mutable Poco::Net::HTTPClientSession session; diff --git a/src/Storages/MeiliSearch/SourceMeiliSearch.cpp b/src/Storages/MeiliSearch/SourceMeiliSearch.cpp index 0c01f7dd3c4..bb53ff425ab 100644 --- a/src/Storages/MeiliSearch/SourceMeiliSearch.cpp +++ b/src/Storages/MeiliSearch/SourceMeiliSearch.cpp @@ -21,6 +21,7 @@ #include #include #include +#include "Interpreters/ProcessList.h" namespace DB { @@ -31,36 +32,54 @@ namespace ErrorCodes extern const int MEILISEARCH_MISSING_SOME_COLUMNS; } -MeiliSearchSource::MeiliSearchSource( - const MeiliSearchConfiguration & config, - const Block & sample_block, - UInt64 max_block_size_, - std::unordered_map query_params_) - : SourceWithProgress(sample_block.cloneEmpty()) - , connection(config) - , max_block_size{max_block_size_} - , query_params{query_params_} - , offset{0} +String MeiliSearchSource::doubleQuoteIfNeed(const String & param) const { - description.init(sample_block); + if (route == QueryRoute::search) + return doubleQuoteString(param); + return param; +} +String MeiliSearchSource::constructAttributesToRetrieve() const +{ WriteBufferFromOwnString columns_to_get; - columns_to_get << "["; - auto it = description.sample_block.begin(); + if (route == QueryRoute::search) + columns_to_get << "["; + auto it = description.sample_block.begin(); while (it != description.sample_block.end()) { - columns_to_get << doubleQuoteString(it->name); + columns_to_get << doubleQuoteIfNeed(it->name); ++it; if (it != description.sample_block.end()) columns_to_get << ","; } - columns_to_get << "]"; + if (route == QueryRoute::search) + columns_to_get << "]"; - query_params[doubleQuoteString("attributesToRetrieve")] = columns_to_get.str(); - query_params[doubleQuoteString("limit")] = std::to_string(max_block_size); + return columns_to_get.str(); +} + +MeiliSearchSource::MeiliSearchSource( + const MeiliSearchConfiguration & config, + const Block & sample_block, + UInt64 max_block_size_, + QueryRoute route_, + std::unordered_map query_params_) + : SourceWithProgress(sample_block.cloneEmpty()) + , connection(config) + , max_block_size{max_block_size_} + , route{route_} + , query_params{query_params_} + , offset{0} +{ + description.init(sample_block); + + auto attributes_to_retrieve = constructAttributesToRetrieve(); + + query_params[doubleQuoteIfNeed("attributesToRetrieve")] = attributes_to_retrieve; + query_params[doubleQuoteIfNeed("limit")] = std::to_string(max_block_size); } @@ -143,25 +162,11 @@ void insertWithTypeId(MutableColumnPtr & column, JSON value, DataTypePtr type_pt column->insert(getField(value, type_ptr)); } -Chunk MeiliSearchSource::generate() +size_t MeiliSearchSource::parseJSON(MutableColumns & columns, const JSON & jres) const { - if (all_read) - return {}; - - MutableColumns columns = description.sample_block.cloneEmptyColumns(); - - query_params[doubleQuoteString("offset")] = std::to_string(offset); - auto response = connection.searchQuery(query_params); - - JSON jres = JSON(response).begin(); - - if (jres.getName() == "message") - throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString()); - size_t cnt_match = 0; - String def; - for (const auto json : jres.getValue()) + for (const auto json : jres) { ++cnt_match; size_t cnt_fields = 0; @@ -178,7 +183,39 @@ Chunk MeiliSearchSource::generate() throw Exception( ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table, json = " + json.toString()); } + return cnt_match; +} +Chunk MeiliSearchSource::generate() +{ + if (all_read) + return {}; + + MutableColumns columns = description.sample_block.cloneEmptyColumns(); + query_params[doubleQuoteIfNeed("offset")] = std::to_string(offset); + + size_t cnt_match = 0; + + if (route == QueryRoute::search) + { + auto response = connection.searchQuery(query_params); + JSON jres = JSON(response).begin(); + if (jres.getName() == "message") + throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, jres.toString()); + + cnt_match = parseJSON(columns, jres.getValue()); + } + else + { + auto response = connection.getDocumentsQuery(query_params); + JSON jres(response); + if (!jres.isArray()) + { + auto error = jres.getWithDefault("message"); + throw Exception(ErrorCodes::MEILISEARCH_EXCEPTION, error); + } + cnt_match = parseJSON(columns, jres); + } offset += cnt_match; diff --git a/src/Storages/MeiliSearch/SourceMeiliSearch.h b/src/Storages/MeiliSearch/SourceMeiliSearch.h index ed976b6c628..695982068b6 100644 --- a/src/Storages/MeiliSearch/SourceMeiliSearch.h +++ b/src/Storages/MeiliSearch/SourceMeiliSearch.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -13,10 +14,17 @@ namespace DB class MeiliSearchSource final : public SourceWithProgress { public: + enum QueryRoute + { + search, + documents + }; + MeiliSearchSource( const MeiliSearchConfiguration & config, const Block & sample_block, UInt64 max_block_size_, + QueryRoute route, std::unordered_map query_params_); ~MeiliSearchSource() override; @@ -24,10 +32,17 @@ public: String getName() const override { return "MeiliSearchSource"; } private: + String doubleQuoteIfNeed(const String & param) const; + + String constructAttributesToRetrieve() const; + + size_t parseJSON(MutableColumns & columns, const JSON & jres) const; + Chunk generate() override; MeiliSearchConnection connection; const UInt64 max_block_size; + const QueryRoute route; ExternalResultDescription description; std::unordered_map query_params; diff --git a/src/Storages/MeiliSearch/StorageMeiliSearch.cpp b/src/Storages/MeiliSearch/StorageMeiliSearch.cpp index 717697f3b29..00b255bee4b 100644 --- a/src/Storages/MeiliSearch/StorageMeiliSearch.cpp +++ b/src/Storages/MeiliSearch/StorageMeiliSearch.cpp @@ -87,10 +87,12 @@ Pipe StorageMeiliSearch::read( ASTPtr original_where = query_info.query->clone()->as().where(); ASTPtr query_params = getFunctionParams(original_where, "meiliMatch"); + MeiliSearchSource::QueryRoute route = MeiliSearchSource::QueryRoute::documents; std::unordered_map kv_pairs_params; if (query_params) { + route = MeiliSearchSource::QueryRoute::search; LOG_TRACE(log, "Query params: {}", convertASTtoStr(query_params)); for (const auto & el : query_params->children) { @@ -114,7 +116,7 @@ Pipe StorageMeiliSearch::read( auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); - return Pipe(std::make_shared(config, sample_block, max_block_size, kv_pairs_params)); + return Pipe(std::make_shared(config, sample_block, max_block_size, route, kv_pairs_params)); } SinkToStoragePtr StorageMeiliSearch::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) @@ -166,12 +168,7 @@ void registerStorageMeiliSearch(StorageFactory & factory) [](const StorageFactory::Arguments & args) { auto config = StorageMeiliSearch::getConfiguration(args.engine_args, args.getLocalContext()); - return std::make_shared( - args.table_id, - config, - args.columns, - args.constraints, - args.comment); + return std::make_shared(args.table_id, config, args.columns, args.constraints, args.comment); }, { .source_access_type = AccessType::MEILISEARCH, diff --git a/src/TableFunctions/TableFunctionMeiliSearch.cpp b/src/TableFunctions/TableFunctionMeiliSearch.cpp index 39c7a2af7db..abfb7181a62 100644 --- a/src/TableFunctions/TableFunctionMeiliSearch.cpp +++ b/src/TableFunctions/TableFunctionMeiliSearch.cpp @@ -14,11 +14,7 @@ StoragePtr TableFunctionMeiliSearch::executeImpl( auto columns = getActualTableStructure(context); return std::make_shared( - StorageID(getDatabaseName(), table_name), - configuration.value(), - columns, - ConstraintsDescription{}, - String{}); + StorageID(getDatabaseName(), table_name), configuration.value(), columns, ConstraintsDescription{}, String{}); } ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */) const diff --git a/tests/integration/test_storage_meilisearch/test.py b/tests/integration/test_storage_meilisearch/test.py index bcf61f9fa45..9ca67d2c3ab 100644 --- a/tests/integration/test_storage_meilisearch/test.py +++ b/tests/integration/test_storage_meilisearch/test.py @@ -88,12 +88,9 @@ def test_insert(started_cluster): sleep(5) ans = big_table.update_sortable_attributes(['id']) client.wait_for_task(ans['uid']) - docs = big_table.search("", {"limit":50000, 'sort': ['id:asc']})["hits"] + docs = big_table.get_documents({'limit':40010}) assert len(docs) == 40000 - for i in range(1, 40001): - assert docs[i - 1] == {"id": i, "data": str(i)} - node.query("DROP TABLE new_table") node.query("DROP TABLE big_table") new_table.delete() @@ -113,9 +110,7 @@ def test_meilimatch(started_cluster): node.query("CREATE TABLE movies_table(id String, title String, release_date Int64) ENGINE = MeiliSearch('http://meili1:7700', 'movies', '')") assert node.query("SELECT COUNT() FROM movies_table") == '19546\n' - assert node.query("SELECT COUNT() FROM movies_table WHERE meiliMatch('\"q\"=\"abaca\"')") == '13\n' - assert node.query("SELECT sum(release_date) FROM movies_table WHERE meiliMatch('\"q\"=\"abaca\"')") == '12887532000\n' - + real_json = table.search("abaca", {"attributesToRetrieve":["id", "title", "release_date"], "limit":20000})["hits"] click_ans = "[" + ", ".join(node.query("SELECT * FROM movies_table WHERE \ meiliMatch('\"q\"=\"abaca\"') \ @@ -137,7 +132,6 @@ def test_meilimatch(started_cluster): click_json = json.loads(click_ans) assert real_json == click_json - node.query("DROP TABLE movies_table") table.delete() @@ -211,8 +205,6 @@ def test_meilimatch_secure(started_cluster): node.query("CREATE TABLE movies_table(id String, title String, release_date Int64) ENGINE = MeiliSearch('http://meili_secure:7700', 'movies', 'password')") assert node.query("SELECT COUNT() FROM movies_table") == '19546\n' - assert node.query("SELECT COUNT() FROM movies_table WHERE meiliMatch('\"q\"=\"abaca\"')") == '13\n' - assert node.query("SELECT sum(release_date) FROM movies_table WHERE meiliMatch('\"q\"=\"abaca\"')") == '12887532000\n' real_json = table.search("abaca", {"attributesToRetrieve":["id", "title", "release_date"], "limit":20000})["hits"] click_ans = "[" + ", ".join(node.query("SELECT * FROM movies_table WHERE \ @@ -282,12 +274,9 @@ def test_insert_secure(started_cluster): sleep(5) ans = big_table.update_sortable_attributes(['id']) client.wait_for_task(ans['uid']) - docs = big_table.search("", {"limit":50000, 'sort': ['id:asc']})["hits"] + docs = big_table.get_documents({'limit':40010}) assert len(docs) == 40000 - for i in range(1, 40001): - assert docs[i - 1] == {"id": i, "data": str(i)} - node.query("DROP TABLE new_table") node.query("DROP TABLE big_table") new_table.delete() @@ -296,6 +285,7 @@ def test_insert_secure(started_cluster): @pytest.mark.parametrize('started_cluster', [False], indirect=['started_cluster']) def test_security_levels(started_cluster): client = get_meili_secure_client(started_cluster) + new_table = client.index("new_table") search_key = client.get_keys()['results'][0]['key'] admin_key = client.get_keys()['results'][1]['key'] @@ -312,8 +302,9 @@ def test_security_levels(started_cluster): node.query("INSERT INTO write_table (id, data) VALUES " + values) sleep(1) + assert len(new_table.get_documents({'limit':40010})) == 100 - ans1 = "[" + ", ".join(node.query("SELECT * FROM read_table \ + ans1 = "[" + ", ".join(node.query("SELECT * FROM read_table where meiliMatch('\"q\"=\"\"') \ format JSONEachRow settings output_format_json_quote_64bit_integers=0").split("\n")[:-1]) + "]" ans2 = "[" + ", ".join(node.query("SELECT * FROM write_table \ format JSONEachRow settings output_format_json_quote_64bit_integers=0").split("\n")[:-1]) + "]"