mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Remove Meilisearch
This commit is contained in:
parent
e39a5e4c63
commit
5ebd8ef41d
@ -80,7 +80,6 @@ RUN python3 -m pip install --no-cache-dir \
|
||||
kafka-python \
|
||||
kazoo \
|
||||
lz4 \
|
||||
meilisearch==0.18.3 \
|
||||
minio \
|
||||
nats-py \
|
||||
protobuf \
|
||||
|
@ -1,15 +0,0 @@
|
||||
version: '2.3'
|
||||
services:
|
||||
meili1:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_EXTERNAL_PORT:-7700}:${MEILI_INTERNAL_PORT:-7700}
|
||||
|
||||
meili_secure:
|
||||
image: getmeili/meilisearch:v0.27.0
|
||||
restart: always
|
||||
ports:
|
||||
- ${MEILI_SECURE_EXTERNAL_PORT:-7700}:${MEILI_SECURE_INTERNAL_PORT:-7700}
|
||||
environment:
|
||||
MEILI_MASTER_KEY: "password"
|
@ -207,7 +207,6 @@ enum class AccessType
|
||||
M(REMOTE, "", GLOBAL, SOURCES) \
|
||||
M(MONGO, "", GLOBAL, SOURCES) \
|
||||
M(REDIS, "", GLOBAL, SOURCES) \
|
||||
M(MEILISEARCH, "", GLOBAL, SOURCES) \
|
||||
M(MYSQL, "", GLOBAL, SOURCES) \
|
||||
M(POSTGRES, "", GLOBAL, SOURCES) \
|
||||
M(SQLITE, "", GLOBAL, SOURCES) \
|
||||
|
@ -104,7 +104,6 @@ if (TARGET ch_contrib::nats_io)
|
||||
endif()
|
||||
|
||||
add_headers_and_sources(dbms Storages/DataLakes)
|
||||
add_headers_and_sources(dbms Storages/MeiliSearch)
|
||||
add_headers_and_sources(dbms Common/NamedCollections)
|
||||
|
||||
if (TARGET ch_contrib::amqp_cpp)
|
||||
|
@ -549,9 +549,6 @@
|
||||
M(653, CANNOT_PARSE_BACKUP_SETTINGS) \
|
||||
M(654, WRONG_BACKUP_SETTINGS) \
|
||||
M(655, FAILED_TO_SYNC_BACKUP_OR_RESTORE) \
|
||||
M(656, MEILISEARCH_EXCEPTION) \
|
||||
M(657, UNSUPPORTED_MEILISEARCH_TYPE) \
|
||||
M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \
|
||||
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
|
||||
M(660, HDFS_ERROR) \
|
||||
M(661, CANNOT_SEND_SIGNAL) \
|
||||
|
@ -1,14 +1,9 @@
|
||||
#include <Disks/ObjectStorages/DiskObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
|
||||
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromEmptyFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
@ -16,7 +11,6 @@
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h>
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
|
||||
#include <Disks/FakeDiskTransaction.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
|
@ -3,9 +3,7 @@
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
@ -1,49 +0,0 @@
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace
|
||||
{
|
||||
// This class is a stub for the meiliMatch function in the where section of the query,
|
||||
// this function is used to pass parameters to the MeiliSearch storage engine
|
||||
class FunctionMeiliMatch : public IFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "meiliMatch";
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMeiliMatch>(); }
|
||||
|
||||
/// Get the function name.
|
||||
String getName() const override { return name; }
|
||||
|
||||
bool isStateful() const override { return false; }
|
||||
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
|
||||
|
||||
size_t getNumberOfArguments() const override { return 0; }
|
||||
|
||||
bool isVariadic() const override { return true; }
|
||||
|
||||
bool isDeterministic() const override { return false; }
|
||||
|
||||
bool isDeterministicInScopeOfQuery() const override { return false; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override { return std::make_shared<DataTypeUInt8>(); }
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
|
||||
{
|
||||
return ColumnUInt8::create(input_rows_count, 1u);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
REGISTER_FUNCTION(MeiliMatch)
|
||||
{
|
||||
factory.registerFunction<FunctionMeiliMatch>();
|
||||
}
|
||||
|
||||
}
|
@ -1,87 +0,0 @@
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/Serializations/ISerialization.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
|
||||
#include <base/JSON.h>
|
||||
#include <base/types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_READ_ALL_DATA;
|
||||
extern const int MEILISEARCH_EXCEPTION;
|
||||
}
|
||||
|
||||
MeiliSearchColumnDescriptionFetcher::MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config) : connection(config)
|
||||
{
|
||||
}
|
||||
|
||||
void MeiliSearchColumnDescriptionFetcher::addParam(const String & key, const String & val)
|
||||
{
|
||||
query_params[key] = val;
|
||||
}
|
||||
|
||||
bool checkIfInteger(const String & s)
|
||||
{
|
||||
return s.find('.') == String::npos;
|
||||
}
|
||||
|
||||
DataTypePtr parseTypeOfField(JSON ptr)
|
||||
{
|
||||
if (ptr.isString())
|
||||
{
|
||||
return std::make_shared<DataTypeString>();
|
||||
}
|
||||
if (ptr.isArray())
|
||||
{
|
||||
auto nested_type = parseTypeOfField(ptr.begin());
|
||||
return std::make_shared<DataTypeArray>(nested_type);
|
||||
}
|
||||
if (ptr.isBool())
|
||||
{
|
||||
return std::make_shared<DataTypeUInt8>();
|
||||
}
|
||||
if (ptr.isNull())
|
||||
{
|
||||
DataTypePtr res = std::make_shared<DataTypeNullable>(res);
|
||||
return res;
|
||||
}
|
||||
if (ptr.isNumber())
|
||||
{
|
||||
if (checkIfInteger(ptr.toString()))
|
||||
{
|
||||
return std::make_shared<DataTypeInt64>();
|
||||
}
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
return std::make_shared<DataTypeString>();
|
||||
}
|
||||
|
||||
ColumnsDescription MeiliSearchColumnDescriptionFetcher::fetchColumnsDescription() const
|
||||
{
|
||||
auto response = connection.searchQuery(query_params);
|
||||
JSON jres = JSON(response).begin();
|
||||
|
||||
if (jres.getName() == "message")
|
||||
throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
|
||||
|
||||
NamesAndTypesList list;
|
||||
|
||||
for (const JSON kv_pair : jres.getValue().begin())
|
||||
{
|
||||
if (!kv_pair.isNameValuePair())
|
||||
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Bad response data");
|
||||
|
||||
list.emplace_back(kv_pair.getName(), parseTypeOfField(kv_pair.getValue()));
|
||||
}
|
||||
|
||||
return ColumnsDescription(list);
|
||||
}
|
||||
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
#include <base/types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class MeiliSearchColumnDescriptionFetcher
|
||||
{
|
||||
public:
|
||||
explicit MeiliSearchColumnDescriptionFetcher(const MeiliSearchConfiguration & config);
|
||||
|
||||
void addParam(const String & key, const String & val);
|
||||
|
||||
ColumnsDescription fetchColumnsDescription() const;
|
||||
|
||||
private:
|
||||
std::unordered_map<String, String> query_params;
|
||||
MeiliSearchConnection connection;
|
||||
};
|
||||
|
||||
}
|
@ -1,126 +0,0 @@
|
||||
#include <sstream>
|
||||
#include <string_view>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <Poco/StreamCopier.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
|
||||
MeiliSearchConnection::MeiliSearchConnection(const MeiliConfig & conf) : config{conf}
|
||||
{
|
||||
Poco::URI uri(config.connection_string);
|
||||
session.setHost(uri.getHost());
|
||||
session.setPort(uri.getPort());
|
||||
}
|
||||
|
||||
String MeiliSearchConnection::execPostQuery(const String & url, std::string_view post_fields) const
|
||||
{
|
||||
Poco::URI uri(url);
|
||||
|
||||
String path(uri.getPathAndQuery());
|
||||
if (path.empty())
|
||||
path = "/";
|
||||
|
||||
Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_POST, path, Poco::Net::HTTPMessage::HTTP_1_1);
|
||||
req.setContentType("application/json");
|
||||
|
||||
if (!config.key.empty())
|
||||
req.add("Authorization", "Bearer " + config.key);
|
||||
|
||||
req.setContentLength(post_fields.length());
|
||||
|
||||
std::ostream & os = session.sendRequest(req);
|
||||
os << post_fields;
|
||||
|
||||
Poco::Net::HTTPResponse res;
|
||||
std::istream & is = session.receiveResponse(res);
|
||||
|
||||
// need to separate MeiliSearch response from other situations
|
||||
// in order to handle it properly
|
||||
if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4)
|
||||
{
|
||||
String response_buffer;
|
||||
Poco::StreamCopier::copyToString(is, response_buffer);
|
||||
return response_buffer;
|
||||
}
|
||||
else
|
||||
throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason());
|
||||
}
|
||||
|
||||
String MeiliSearchConnection::execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const
|
||||
{
|
||||
Poco::URI uri(url);
|
||||
for (const auto & kv : query_params)
|
||||
{
|
||||
uri.addQueryParameter(kv.first, kv.second);
|
||||
}
|
||||
|
||||
String path(uri.getPathAndQuery());
|
||||
if (path.empty())
|
||||
path = "/";
|
||||
|
||||
Poco::Net::HTTPRequest req(Poco::Net::HTTPRequest::HTTP_GET, path, Poco::Net::HTTPMessage::HTTP_1_1);
|
||||
|
||||
if (!config.key.empty())
|
||||
req.add("Authorization", "Bearer " + config.key);
|
||||
|
||||
session.sendRequest(req);
|
||||
|
||||
Poco::Net::HTTPResponse res;
|
||||
std::istream & is = session.receiveResponse(res);
|
||||
|
||||
// need to separate MeiliSearch response from other situations
|
||||
// in order to handle it properly
|
||||
if (res.getStatus() / 100 == 2 || res.getStatus() / 100 == 4)
|
||||
{
|
||||
String response_buffer;
|
||||
Poco::StreamCopier::copyToString(is, response_buffer);
|
||||
return response_buffer;
|
||||
}
|
||||
else
|
||||
throw Exception::createRuntime(ErrorCodes::NETWORK_ERROR, res.getReason());
|
||||
}
|
||||
|
||||
|
||||
String MeiliSearchConnection::searchQuery(const std::unordered_map<String, String> & query_params) const
|
||||
{
|
||||
WriteBufferFromOwnString post_fields;
|
||||
|
||||
post_fields << "{";
|
||||
|
||||
auto it = query_params.begin();
|
||||
while (it != query_params.end())
|
||||
{
|
||||
post_fields << it->first << ":" << it->second;
|
||||
++it;
|
||||
if (it != query_params.end())
|
||||
post_fields << ",";
|
||||
}
|
||||
|
||||
post_fields << "}";
|
||||
|
||||
String url = config.connection_string + "search";
|
||||
return execPostQuery(url, post_fields.str());
|
||||
}
|
||||
|
||||
String MeiliSearchConnection::updateQuery(std::string_view data) const
|
||||
{
|
||||
String url = config.connection_string + "documents";
|
||||
return execPostQuery(url, data);
|
||||
}
|
||||
|
||||
String MeiliSearchConnection::getDocumentsQuery(const std::unordered_map<String, String> & query_params) const
|
||||
{
|
||||
String url = config.connection_string + "documents";
|
||||
return execGetQuery(url, query_params);
|
||||
}
|
||||
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <base/types.h>
|
||||
|
||||
#include <Poco/Exception.h>
|
||||
#include <Poco/Net/HTTPClientSession.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Net/HTTPResponse.h>
|
||||
#include <Poco/Path.h>
|
||||
#include <Poco/URI.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct MeiliSearchConfiguration
|
||||
{
|
||||
String key;
|
||||
String index;
|
||||
String connection_string;
|
||||
|
||||
MeiliSearchConfiguration(const String & url_, const String & index_, const String & key_) : key{key_}, index{index_}
|
||||
{
|
||||
connection_string = url_ + "/indexes/" + index_ + "/";
|
||||
}
|
||||
};
|
||||
|
||||
using MeiliConfig = MeiliSearchConfiguration;
|
||||
|
||||
class MeiliSearchConnection
|
||||
{
|
||||
public:
|
||||
explicit MeiliSearchConnection(const MeiliConfig & config);
|
||||
|
||||
String searchQuery(const std::unordered_map<String, String> & query_params) const;
|
||||
|
||||
String getDocumentsQuery(const std::unordered_map<String, String> & query_params) const;
|
||||
|
||||
String updateQuery(std::string_view data) const;
|
||||
|
||||
private:
|
||||
String execPostQuery(const String & url, std::string_view post_fields) const;
|
||||
|
||||
String execGetQuery(const String & url, const std::unordered_map<String, String> & query_params) const;
|
||||
|
||||
MeiliConfig config;
|
||||
mutable Poco::Net::HTTPClientSession session;
|
||||
};
|
||||
|
||||
}
|
@ -1,65 +0,0 @@
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
|
||||
#include <Storages/MeiliSearch/SinkMeiliSearch.h>
|
||||
#include <base/JSON.h>
|
||||
#include <base/types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int MEILISEARCH_EXCEPTION;
|
||||
}
|
||||
|
||||
SinkMeiliSearch::SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_)
|
||||
: SinkToStorage(sample_block_), connection(config_), local_context{local_context_}, sample_block{sample_block_}
|
||||
{
|
||||
}
|
||||
|
||||
// gets the content of the json data section, which was obtained using the JSON format output
|
||||
// "data": [{...}, {...}, {...}]
|
||||
void extractData(std::string_view & view)
|
||||
{
|
||||
size_t ind = view.find("\"data\":");
|
||||
while (view[ind] != '[')
|
||||
++ind;
|
||||
view.remove_prefix(ind);
|
||||
size_t bal = ind = 1;
|
||||
while (bal > 0)
|
||||
{
|
||||
if (view[ind] == '[')
|
||||
++bal;
|
||||
else if (view[ind] == ']')
|
||||
--bal;
|
||||
++ind;
|
||||
}
|
||||
view.remove_suffix(view.size() - ind);
|
||||
}
|
||||
|
||||
void SinkMeiliSearch::writeBlockData(const Block & block) const
|
||||
{
|
||||
FormatSettings settings = getFormatSettings(local_context);
|
||||
settings.json.quote_64bit_integers = false;
|
||||
WriteBufferFromOwnString buf;
|
||||
auto writer = FormatFactory::instance().getOutputFormat("JSON", buf, sample_block, local_context, settings);
|
||||
writer->write(block);
|
||||
writer->flush();
|
||||
writer->finalize();
|
||||
|
||||
std::string_view vbuf(buf.str());
|
||||
extractData(vbuf);
|
||||
|
||||
auto response = connection.updateQuery(vbuf);
|
||||
auto jres = JSON(response).begin();
|
||||
if (jres.getName() == "message")
|
||||
throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.getValue().toString());
|
||||
}
|
||||
|
||||
void SinkMeiliSearch::consume(Chunk chunk)
|
||||
{
|
||||
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
|
||||
writeBlockData(block);
|
||||
}
|
||||
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Processors/Sinks/SinkToStorage.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class SinkMeiliSearch : public SinkToStorage
|
||||
{
|
||||
public:
|
||||
SinkMeiliSearch(const MeiliSearchConfiguration & config_, const Block & sample_block_, ContextPtr local_context_);
|
||||
|
||||
String getName() const override { return "SinkMeiliSearch"; }
|
||||
|
||||
void consume(Chunk chunk) override;
|
||||
|
||||
void writeBlockData(const Block & block) const;
|
||||
|
||||
private:
|
||||
MeiliSearchConnection connection;
|
||||
ContextPtr local_context;
|
||||
Block sample_block;
|
||||
};
|
||||
|
||||
}
|
@ -1,231 +0,0 @@
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Core/Types.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/Serializations/ISerialization.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Storages/MeiliSearch/SourceMeiliSearch.h>
|
||||
#include <base/JSON.h>
|
||||
#include <base/range.h>
|
||||
#include <base/types.h>
|
||||
#include <magic_enum.hpp>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include "Interpreters/ProcessList.h"
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int MEILISEARCH_EXCEPTION;
|
||||
extern const int UNSUPPORTED_MEILISEARCH_TYPE;
|
||||
extern const int MEILISEARCH_MISSING_SOME_COLUMNS;
|
||||
}
|
||||
|
||||
String MeiliSearchSource::doubleQuoteIfNeed(const String & param) const
|
||||
{
|
||||
if (route == QueryRoute::search)
|
||||
return doubleQuoteString(param);
|
||||
return param;
|
||||
}
|
||||
|
||||
String MeiliSearchSource::constructAttributesToRetrieve() const
|
||||
{
|
||||
WriteBufferFromOwnString columns_to_get;
|
||||
|
||||
if (route == QueryRoute::search)
|
||||
columns_to_get << "[";
|
||||
|
||||
auto it = description.sample_block.begin();
|
||||
while (it != description.sample_block.end())
|
||||
{
|
||||
columns_to_get << doubleQuoteIfNeed(it->name);
|
||||
++it;
|
||||
if (it != description.sample_block.end())
|
||||
columns_to_get << ",";
|
||||
}
|
||||
|
||||
if (route == QueryRoute::search)
|
||||
columns_to_get << "]";
|
||||
|
||||
return columns_to_get.str();
|
||||
}
|
||||
|
||||
MeiliSearchSource::MeiliSearchSource(
|
||||
const MeiliSearchConfiguration & config,
|
||||
const Block & sample_block,
|
||||
UInt64 max_block_size_,
|
||||
QueryRoute route_,
|
||||
std::unordered_map<String, String> query_params_)
|
||||
: ISource(sample_block.cloneEmpty())
|
||||
, connection(config)
|
||||
, max_block_size{max_block_size_}
|
||||
, route{route_}
|
||||
, query_params{query_params_}
|
||||
, offset{0}
|
||||
{
|
||||
description.init(sample_block);
|
||||
|
||||
auto attributes_to_retrieve = constructAttributesToRetrieve();
|
||||
|
||||
query_params[doubleQuoteIfNeed("attributesToRetrieve")] = attributes_to_retrieve;
|
||||
query_params[doubleQuoteIfNeed("limit")] = std::to_string(max_block_size);
|
||||
}
|
||||
|
||||
|
||||
MeiliSearchSource::~MeiliSearchSource() = default;
|
||||
|
||||
Field getField(JSON value, DataTypePtr type_ptr)
|
||||
{
|
||||
TypeIndex type_id = type_ptr->getTypeId();
|
||||
|
||||
if (type_id == TypeIndex::UInt64 || type_id == TypeIndex::UInt32 || type_id == TypeIndex::UInt16 || type_id == TypeIndex::UInt8)
|
||||
{
|
||||
if (value.isBool())
|
||||
return value.getBool();
|
||||
else
|
||||
return value.get<UInt64>();
|
||||
}
|
||||
else if (type_id == TypeIndex::Int64 || type_id == TypeIndex::Int32 || type_id == TypeIndex::Int16 || type_id == TypeIndex::Int8)
|
||||
{
|
||||
return value.get<Int64>();
|
||||
}
|
||||
else if (type_id == TypeIndex::String)
|
||||
{
|
||||
if (value.isObject())
|
||||
return value.toString();
|
||||
else
|
||||
return value.get<String>();
|
||||
}
|
||||
else if (type_id == TypeIndex::Float64 || type_id == TypeIndex::Float32)
|
||||
{
|
||||
return value.get<Float64>();
|
||||
}
|
||||
else if (type_id == TypeIndex::Date)
|
||||
{
|
||||
return UInt16{LocalDate{String(value.toString())}.getDayNum()};
|
||||
}
|
||||
else if (type_id == TypeIndex::Date32)
|
||||
{
|
||||
return Int32{LocalDate{String(value.toString())}.getExtenedDayNum()};
|
||||
}
|
||||
else if (type_id == TypeIndex::DateTime)
|
||||
{
|
||||
ReadBufferFromString in(value.toString());
|
||||
time_t time = 0;
|
||||
readDateTimeText(time, in, assert_cast<const DataTypeDateTime *>(type_ptr.get())->getTimeZone());
|
||||
if (time < 0)
|
||||
time = 0;
|
||||
return time;
|
||||
}
|
||||
else if (type_id == TypeIndex::Nullable)
|
||||
{
|
||||
if (value.isNull())
|
||||
return Null();
|
||||
|
||||
const auto * null_type = typeid_cast<const DataTypeNullable *>(type_ptr.get());
|
||||
DataTypePtr nested = null_type->getNestedType();
|
||||
|
||||
return getField(value, nested);
|
||||
}
|
||||
else if (type_id == TypeIndex::Array)
|
||||
{
|
||||
const auto * array_type = typeid_cast<const DataTypeArray *>(type_ptr.get());
|
||||
DataTypePtr nested = array_type->getNestedType();
|
||||
|
||||
Array array;
|
||||
for (const auto el : value)
|
||||
array.push_back(getField(el, nested));
|
||||
|
||||
return array;
|
||||
}
|
||||
else
|
||||
{
|
||||
const std::string_view type_name = magic_enum::enum_name(type_id);
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_MEILISEARCH_TYPE, "MeiliSearch storage doesn't support type: {}", type_name);
|
||||
}
|
||||
}
|
||||
|
||||
void insertWithTypeId(MutableColumnPtr & column, JSON value, DataTypePtr type_ptr)
|
||||
{
|
||||
column->insert(getField(value, type_ptr));
|
||||
}
|
||||
|
||||
size_t MeiliSearchSource::parseJSON(MutableColumns & columns, const JSON & jres) const
|
||||
{
|
||||
size_t cnt_match = 0;
|
||||
|
||||
for (const auto json : jres)
|
||||
{
|
||||
++cnt_match;
|
||||
size_t cnt_fields = 0;
|
||||
for (const auto kv_pair : json)
|
||||
{
|
||||
++cnt_fields;
|
||||
const auto & name = kv_pair.getName();
|
||||
size_t pos = description.sample_block.getPositionByName(name);
|
||||
MutableColumnPtr & col = columns[pos];
|
||||
DataTypePtr type_ptr = description.sample_block.getByPosition(pos).type;
|
||||
insertWithTypeId(col, kv_pair.getValue(), type_ptr);
|
||||
}
|
||||
if (cnt_fields != columns.size())
|
||||
throw Exception(
|
||||
ErrorCodes::MEILISEARCH_MISSING_SOME_COLUMNS, "Some columns were not found in the table, json = {}", json.toString());
|
||||
}
|
||||
return cnt_match;
|
||||
}
|
||||
|
||||
Chunk MeiliSearchSource::generate()
|
||||
{
|
||||
if (all_read)
|
||||
return {};
|
||||
|
||||
MutableColumns columns = description.sample_block.cloneEmptyColumns();
|
||||
query_params[doubleQuoteIfNeed("offset")] = std::to_string(offset);
|
||||
|
||||
size_t cnt_match = 0;
|
||||
|
||||
if (route == QueryRoute::search)
|
||||
{
|
||||
auto response = connection.searchQuery(query_params);
|
||||
JSON jres = JSON(response).begin();
|
||||
if (jres.getName() == "message")
|
||||
throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, jres.toString());
|
||||
|
||||
cnt_match = parseJSON(columns, jres.getValue());
|
||||
}
|
||||
else
|
||||
{
|
||||
auto response = connection.getDocumentsQuery(query_params);
|
||||
JSON jres(response);
|
||||
if (!jres.isArray())
|
||||
{
|
||||
auto error = jres.getWithDefault<String>("message");
|
||||
throw Exception::createRuntime(ErrorCodes::MEILISEARCH_EXCEPTION, error);
|
||||
}
|
||||
cnt_match = parseJSON(columns, jres);
|
||||
}
|
||||
|
||||
offset += cnt_match;
|
||||
|
||||
if (cnt_match == 0)
|
||||
{
|
||||
all_read = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
return Chunk(std::move(columns), cnt_match);
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,53 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <unordered_map>
|
||||
#include <Core/ColumnsWithTypeAndName.h>
|
||||
#include <Core/ExternalResultDescription.h>
|
||||
#include <Processors/Chunk.h>
|
||||
#include <Processors/ISource.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
#include <base/JSON.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class MeiliSearchSource final : public ISource
|
||||
{
|
||||
public:
|
||||
enum QueryRoute
|
||||
{
|
||||
search,
|
||||
documents
|
||||
};
|
||||
|
||||
MeiliSearchSource(
|
||||
const MeiliSearchConfiguration & config,
|
||||
const Block & sample_block,
|
||||
UInt64 max_block_size_,
|
||||
QueryRoute route,
|
||||
std::unordered_map<String, String> query_params_);
|
||||
|
||||
~MeiliSearchSource() override;
|
||||
|
||||
String getName() const override { return "MeiliSearchSource"; }
|
||||
|
||||
private:
|
||||
String doubleQuoteIfNeed(const String & param) const;
|
||||
|
||||
String constructAttributesToRetrieve() const;
|
||||
|
||||
size_t parseJSON(MutableColumns & columns, const JSON & jres) const;
|
||||
|
||||
Chunk generate() override;
|
||||
|
||||
MeiliSearchConnection connection;
|
||||
const UInt64 max_block_size;
|
||||
const QueryRoute route;
|
||||
ExternalResultDescription description;
|
||||
std::unordered_map<String, String> query_params;
|
||||
|
||||
UInt64 offset;
|
||||
bool all_read = false;
|
||||
};
|
||||
|
||||
}
|
@ -1,201 +0,0 @@
|
||||
#include <memory>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Processors/Formats/IOutputFormat.h>
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
#include <Storages/MeiliSearch/SinkMeiliSearch.h>
|
||||
#include <Storages/MeiliSearch/SourceMeiliSearch.h>
|
||||
#include <Storages/MeiliSearch/StorageMeiliSearch.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/checkAndGetLiteralArgument.h>
|
||||
#include <Storages/NamedCollectionsHelpers.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/parseAddress.h>
|
||||
#include <Common/NamedCollections/NamedCollections.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchColumnDescriptionFetcher.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_QUERY_PARAMETER;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
StorageMeiliSearch::StorageMeiliSearch(
|
||||
const StorageID & table_id,
|
||||
const MeiliSearchConfiguration & config_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment)
|
||||
: IStorage(table_id), config{config_}, log(&Poco::Logger::get("StorageMeiliSearch (" + table_id.table_name + ")"))
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
auto columns = getTableStructureFromData(config);
|
||||
storage_metadata.setColumns(columns);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
}
|
||||
|
||||
ColumnsDescription StorageMeiliSearch::getTableStructureFromData(const MeiliSearchConfiguration & config_)
|
||||
{
|
||||
MeiliSearchColumnDescriptionFetcher fetcher(config_);
|
||||
fetcher.addParam(doubleQuoteString("limit"), "1");
|
||||
return fetcher.fetchColumnsDescription();
|
||||
}
|
||||
|
||||
String convertASTtoStr(ASTPtr ptr)
|
||||
{
|
||||
WriteBufferFromOwnString out;
|
||||
IAST::FormatSettings settings(
|
||||
out, /*one_line*/ true, /*hilite*/ false,
|
||||
/*always_quote_identifiers*/ IdentifierQuotingStyle::BackticksMySQL != IdentifierQuotingStyle::None,
|
||||
/*identifier_quoting_style*/ IdentifierQuotingStyle::BackticksMySQL);
|
||||
ptr->format(settings);
|
||||
return out.str();
|
||||
}
|
||||
|
||||
ASTPtr getFunctionParams(ASTPtr node, const String & name)
|
||||
{
|
||||
if (!node)
|
||||
return nullptr;
|
||||
|
||||
const auto * ptr = node->as<ASTFunction>();
|
||||
if (ptr && ptr->name == name)
|
||||
{
|
||||
if (node->children.size() == 1)
|
||||
return node->children[0];
|
||||
else
|
||||
return nullptr;
|
||||
}
|
||||
for (const auto & next : node->children)
|
||||
{
|
||||
auto res = getFunctionParams(next, name);
|
||||
if (res != nullptr)
|
||||
return res;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Pipe StorageMeiliSearch::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr /*context*/,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
size_t /*num_streams*/)
|
||||
{
|
||||
storage_snapshot->check(column_names);
|
||||
|
||||
ASTPtr original_where = query_info.query->clone()->as<ASTSelectQuery &>().where();
|
||||
ASTPtr query_params = getFunctionParams(original_where, "meiliMatch");
|
||||
|
||||
MeiliSearchSource::QueryRoute route = MeiliSearchSource::QueryRoute::documents;
|
||||
|
||||
std::unordered_map<String, String> kv_pairs_params;
|
||||
if (query_params)
|
||||
{
|
||||
route = MeiliSearchSource::QueryRoute::search;
|
||||
LOG_TRACE(log, "Query params: {}", convertASTtoStr(query_params));
|
||||
for (const auto & el : query_params->children)
|
||||
{
|
||||
auto str = el->getColumnName();
|
||||
auto it = std::find(str.begin(), str.end(), '=');
|
||||
if (it == str.end())
|
||||
throw Exception(ErrorCodes::BAD_QUERY_PARAMETER, "meiliMatch function must have parameters of the form \'key=value\'");
|
||||
|
||||
String key(str.begin() + 1, it);
|
||||
String value(it + 1, str.end() - 1);
|
||||
kv_pairs_params[key] = value;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Query params: none");
|
||||
}
|
||||
|
||||
for (const auto & el : kv_pairs_params)
|
||||
LOG_TRACE(log, "Parsed parameter: key = {}, value = {}", el.first, el.second);
|
||||
|
||||
auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names);
|
||||
|
||||
return Pipe(std::make_shared<MeiliSearchSource>(config, sample_block, max_block_size, route, kv_pairs_params));
|
||||
}
|
||||
|
||||
SinkToStoragePtr StorageMeiliSearch::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool /*async_insert*/)
|
||||
{
|
||||
LOG_TRACE(log, "Trying update index: {}", config.index);
|
||||
return std::make_shared<SinkMeiliSearch>(config, metadata_snapshot->getSampleBlock(), local_context);
|
||||
}
|
||||
|
||||
MeiliSearchConfiguration StorageMeiliSearch::getConfiguration(ASTs engine_args, ContextPtr context)
|
||||
{
|
||||
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args, context))
|
||||
{
|
||||
validateNamedCollection(*named_collection, {"url", "index"}, {"key"});
|
||||
|
||||
String url = named_collection->get<String>("url");
|
||||
String index = named_collection->get<String>("index");
|
||||
String key = named_collection->getOrDefault<String>("key", "");
|
||||
|
||||
if (url.empty() || index.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")");
|
||||
}
|
||||
|
||||
return MeiliSearchConfiguration(url, index, key);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (engine_args.size() < 2 || 3 < engine_args.size())
|
||||
{
|
||||
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
|
||||
"Storage MeiliSearch requires 3 parameters: MeiliSearch('url', 'index', 'key'= \"\")");
|
||||
}
|
||||
|
||||
for (auto & engine_arg : engine_args)
|
||||
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, context);
|
||||
|
||||
String url = checkAndGetLiteralArgument<String>(engine_args[0], "url");
|
||||
String index = checkAndGetLiteralArgument<String>(engine_args[1], "index");
|
||||
String key;
|
||||
if (engine_args.size() == 3)
|
||||
key = checkAndGetLiteralArgument<String>(engine_args[2], "key");
|
||||
return MeiliSearchConfiguration(url, index, key);
|
||||
}
|
||||
}
|
||||
|
||||
void registerStorageMeiliSearch(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage(
|
||||
"MeiliSearch",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
{
|
||||
auto config = StorageMeiliSearch::getConfiguration(args.engine_args, args.getLocalContext());
|
||||
return std::make_shared<StorageMeiliSearch>(args.table_id, config, args.columns, args.constraints, args.comment);
|
||||
},
|
||||
{
|
||||
.supports_schema_inference = true,
|
||||
.source_access_type = AccessType::MEILISEARCH,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class StorageMeiliSearch final : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageMeiliSearch(
|
||||
const StorageID & table_id,
|
||||
const MeiliSearchConfiguration & config_,
|
||||
const ColumnsDescription & columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment);
|
||||
|
||||
String getName() const override { return "MeiliSearch"; }
|
||||
|
||||
Pipe read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t num_streams) override;
|
||||
|
||||
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context, bool async_insert) override;
|
||||
|
||||
static MeiliSearchConfiguration getConfiguration(ASTs engine_args, ContextPtr context);
|
||||
|
||||
static ColumnsDescription getTableStructureFromData(const MeiliSearchConfiguration & config_);
|
||||
|
||||
private:
|
||||
MeiliSearchConfiguration config;
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
@ -26,9 +26,6 @@ void registerStorageGenerateRandom(StorageFactory & factory);
|
||||
void registerStorageExecutable(StorageFactory & factory);
|
||||
void registerStorageWindowView(StorageFactory & factory);
|
||||
|
||||
// MEILISEARCH
|
||||
void registerStorageMeiliSearch(StorageFactory& factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
void registerStorageS3(StorageFactory & factory);
|
||||
void registerStorageCOS(StorageFactory & factory);
|
||||
@ -127,9 +124,6 @@ void registerStorages()
|
||||
registerStorageExecutable(factory);
|
||||
registerStorageWindowView(factory);
|
||||
|
||||
// MEILISEARCH
|
||||
registerStorageMeiliSearch(factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
registerStorageS3(factory);
|
||||
registerStorageCOS(factory);
|
||||
|
@ -1,34 +0,0 @@
|
||||
#include <memory>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Storages/MeiliSearch/StorageMeiliSearch.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <TableFunctions/TableFunctionMeiliSearch.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
StoragePtr TableFunctionMeiliSearch::executeImpl(
|
||||
const ASTPtr & /* ast_function */, ContextPtr /*context*/, const String & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
|
||||
{
|
||||
return std::make_shared<StorageMeiliSearch>(
|
||||
StorageID(getDatabaseName(), table_name), configuration.value(), ColumnsDescription{}, ConstraintsDescription{}, String{});
|
||||
}
|
||||
|
||||
ColumnsDescription TableFunctionMeiliSearch::getActualTableStructure(ContextPtr /* context */, bool /*is_insert_query*/) const
|
||||
{
|
||||
return StorageMeiliSearch::getTableStructureFromData(configuration.value());
|
||||
}
|
||||
|
||||
|
||||
void TableFunctionMeiliSearch::parseArguments(const ASTPtr & ast_function, ContextPtr context)
|
||||
{
|
||||
const auto & func_args = ast_function->as<ASTFunction &>();
|
||||
configuration = StorageMeiliSearch::getConfiguration(func_args.arguments->children, context);
|
||||
}
|
||||
|
||||
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionMeiliSearch>();
|
||||
}
|
||||
|
||||
}
|
@ -1,26 +0,0 @@
|
||||
#pragma once
|
||||
#include <Storages/MeiliSearch/MeiliSearchConnection.h>
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TableFunctionMeiliSearch : public ITableFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "meilisearch";
|
||||
String getName() const override { return name; }
|
||||
|
||||
private:
|
||||
StoragePtr executeImpl(
|
||||
const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;
|
||||
|
||||
const char * getStorageTypeName() const override { return "meilisearch"; }
|
||||
|
||||
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;
|
||||
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
|
||||
|
||||
std::optional<MeiliSearchConfiguration> configuration;
|
||||
};
|
||||
|
||||
}
|
@ -23,8 +23,6 @@ void registerTableFunctions()
|
||||
registerTableFunctionMongoDB(factory);
|
||||
registerTableFunctionRedis(factory);
|
||||
|
||||
registerTableFunctionMeiliSearch(factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
registerTableFunctionS3(factory);
|
||||
registerTableFunctionS3Cluster(factory);
|
||||
|
@ -20,8 +20,6 @@ void registerTableFunctionGenerate(TableFunctionFactory & factory);
|
||||
void registerTableFunctionMongoDB(TableFunctionFactory & factory);
|
||||
void registerTableFunctionRedis(TableFunctionFactory & factory);
|
||||
|
||||
void registerTableFunctionMeiliSearch(TableFunctionFactory & factory);
|
||||
|
||||
#if USE_AWS_S3
|
||||
void registerTableFunctionS3(TableFunctionFactory & factory);
|
||||
void registerTableFunctionS3Cluster(TableFunctionFactory & factory);
|
||||
|
@ -46,7 +46,6 @@ sudo -H pip install \
|
||||
hypothesis \
|
||||
pyhdfs \
|
||||
pika \
|
||||
meilisearch \
|
||||
nats-py
|
||||
```
|
||||
|
||||
|
@ -31,7 +31,6 @@ try:
|
||||
import pymysql
|
||||
import nats
|
||||
import ssl
|
||||
import meilisearch
|
||||
import pyspark
|
||||
from confluent_kafka.avro.cached_schema_registry_client import (
|
||||
CachedSchemaRegistryClient,
|
||||
@ -432,7 +431,6 @@ class ClickHouseCluster:
|
||||
self.with_kerberized_hdfs = False
|
||||
self.with_mongo = False
|
||||
self.with_mongo_secure = False
|
||||
self.with_meili = False
|
||||
self.with_net_trics = False
|
||||
self.with_redis = False
|
||||
self.with_cassandra = False
|
||||
@ -510,12 +508,6 @@ class ClickHouseCluster:
|
||||
self.mongo_no_cred_host = "mongo2"
|
||||
self._mongo_no_cred_port = 0
|
||||
|
||||
# available when with_meili == True
|
||||
self.meili_host = "meili1"
|
||||
self._meili_port = 0
|
||||
self.meili_secure_host = "meili_secure"
|
||||
self._meili_secure_port = 0
|
||||
|
||||
# available when with_cassandra == True
|
||||
self.cassandra_host = "cassandra1"
|
||||
self.cassandra_port = 9042
|
||||
@ -687,20 +679,6 @@ class ClickHouseCluster:
|
||||
self._mongo_no_cred_port = get_free_port()
|
||||
return self._mongo_no_cred_port
|
||||
|
||||
@property
|
||||
def meili_port(self):
|
||||
if self._meili_port:
|
||||
return self._meili_port
|
||||
self._meili_port = get_free_port()
|
||||
return self._meili_port
|
||||
|
||||
@property
|
||||
def meili_secure_port(self):
|
||||
if self._meili_secure_port:
|
||||
return self._meili_secure_port
|
||||
self._meili_secure_port = get_free_port()
|
||||
return self._meili_secure_port
|
||||
|
||||
@property
|
||||
def redis_port(self):
|
||||
if self._redis_port:
|
||||
@ -1363,30 +1341,6 @@ class ClickHouseCluster:
|
||||
|
||||
return self.base_coredns_cmd
|
||||
|
||||
def setup_meili_cmd(self, instance, env_variables, docker_compose_yml_dir):
|
||||
self.with_meili = True
|
||||
env_variables["MEILI_HOST"] = self.meili_host
|
||||
env_variables["MEILI_EXTERNAL_PORT"] = str(self.meili_port)
|
||||
env_variables["MEILI_INTERNAL_PORT"] = "7700"
|
||||
|
||||
env_variables["MEILI_SECURE_HOST"] = self.meili_secure_host
|
||||
env_variables["MEILI_SECURE_EXTERNAL_PORT"] = str(self.meili_secure_port)
|
||||
env_variables["MEILI_SECURE_INTERNAL_PORT"] = "7700"
|
||||
|
||||
self.base_cmd.extend(
|
||||
["--file", p.join(docker_compose_yml_dir, "docker_compose_meili.yml")]
|
||||
)
|
||||
self.base_meili_cmd = [
|
||||
"docker-compose",
|
||||
"--env-file",
|
||||
instance.env_file,
|
||||
"--project-name",
|
||||
self.project_name,
|
||||
"--file",
|
||||
p.join(docker_compose_yml_dir, "docker_compose_meili.yml"),
|
||||
]
|
||||
return self.base_meili_cmd
|
||||
|
||||
def setup_minio_cmd(self, instance, env_variables, docker_compose_yml_dir):
|
||||
self.with_minio = True
|
||||
cert_d = p.join(self.minio_dir, "certs")
|
||||
@ -1524,7 +1478,6 @@ class ClickHouseCluster:
|
||||
with_kerberized_hdfs=False,
|
||||
with_mongo=False,
|
||||
with_mongo_secure=False,
|
||||
with_meili=False,
|
||||
with_nginx=False,
|
||||
with_redis=False,
|
||||
with_minio=False,
|
||||
@ -1623,7 +1576,6 @@ class ClickHouseCluster:
|
||||
or with_kerberos_kdc
|
||||
or with_kerberized_kafka,
|
||||
with_mongo=with_mongo or with_mongo_secure,
|
||||
with_meili=with_meili,
|
||||
with_redis=with_redis,
|
||||
with_minio=with_minio,
|
||||
with_azurite=with_azurite,
|
||||
@ -1815,11 +1767,6 @@ class ClickHouseCluster:
|
||||
self.setup_coredns_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
)
|
||||
|
||||
if with_meili and not self.with_meili:
|
||||
cmds.append(
|
||||
self.setup_meili_cmd(instance, env_variables, docker_compose_yml_dir)
|
||||
)
|
||||
|
||||
if self.with_net_trics:
|
||||
for cmd in cmds:
|
||||
cmd.extend(
|
||||
@ -2415,30 +2362,6 @@ class ClickHouseCluster:
|
||||
logging.debug("Can't connect to Mongo " + str(ex))
|
||||
time.sleep(1)
|
||||
|
||||
def wait_meili_to_start(self, timeout=30):
|
||||
connection_str = "http://{host}:{port}".format(
|
||||
host="localhost", port=self.meili_port
|
||||
)
|
||||
client = meilisearch.Client(connection_str)
|
||||
|
||||
connection_str_secure = "http://{host}:{port}".format(
|
||||
host="localhost", port=self.meili_secure_port
|
||||
)
|
||||
client_secure = meilisearch.Client(connection_str_secure, "password")
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < timeout:
|
||||
try:
|
||||
client.get_all_stats()
|
||||
client_secure.get_all_stats()
|
||||
logging.debug(
|
||||
f"Connected to MeiliSearch dbs: {client.get_all_stats()}\n{client_secure.get_all_stats()}"
|
||||
)
|
||||
return
|
||||
except Exception as ex:
|
||||
logging.debug("Can't connect to MeiliSearch " + str(ex))
|
||||
time.sleep(1)
|
||||
|
||||
def wait_minio_to_start(self, timeout=180, secure=False):
|
||||
self.minio_ip = self.get_instance_ip(self.minio_host)
|
||||
self.minio_redirect_ip = self.get_instance_ip(self.minio_redirect_host)
|
||||
@ -2836,12 +2759,6 @@ class ClickHouseCluster:
|
||||
self.up_called = True
|
||||
time.sleep(10)
|
||||
|
||||
if self.with_meili and self.base_meili_cmd:
|
||||
logging.debug("Setup MeiliSearch")
|
||||
run_and_check(self.base_meili_cmd + common_opts)
|
||||
self.up_called = True
|
||||
self.wait_meili_to_start()
|
||||
|
||||
if self.with_redis and self.base_redis_cmd:
|
||||
logging.debug("Setup Redis")
|
||||
subprocess_check_call(self.base_redis_cmd + common_opts)
|
||||
@ -3163,7 +3080,6 @@ class ClickHouseInstance:
|
||||
with_kerberized_hdfs,
|
||||
with_secrets,
|
||||
with_mongo,
|
||||
with_meili,
|
||||
with_redis,
|
||||
with_minio,
|
||||
with_azurite,
|
||||
@ -3250,7 +3166,6 @@ class ClickHouseInstance:
|
||||
self.with_kerberized_hdfs = with_kerberized_hdfs
|
||||
self.with_secrets = with_secrets
|
||||
self.with_mongo = with_mongo
|
||||
self.with_meili = with_meili
|
||||
self.with_redis = with_redis
|
||||
self.with_minio = with_minio
|
||||
self.with_azurite = with_azurite
|
||||
|
@ -1,19 +0,0 @@
|
||||
<clickhouse>
|
||||
<named_collections>
|
||||
<named_collection_for_meili>
|
||||
<url>http://meili1:7700</url>
|
||||
<index>new_table</index>
|
||||
</named_collection_for_meili>
|
||||
|
||||
<named_collection_for_meili_secure>
|
||||
<url>http://meili_secure:7700</url>
|
||||
<index>new_table</index>
|
||||
<key>password</key>
|
||||
</named_collection_for_meili_secure>
|
||||
|
||||
<named_collection_for_meili_secure_no_password>
|
||||
<url>http://meili_secure:7700</url>
|
||||
<index>new_table</index>
|
||||
</named_collection_for_meili_secure_no_password>
|
||||
</named_collections>
|
||||
</clickhouse>
|
@ -1,9 +0,0 @@
|
||||
<clickhouse>
|
||||
<users>
|
||||
<default>
|
||||
<password></password>
|
||||
<profile>default</profile>
|
||||
<named_collection_control>1</named_collection_control>
|
||||
</default>
|
||||
</users>
|
||||
</clickhouse>
|
File diff suppressed because it is too large
Load Diff
@ -1,896 +0,0 @@
|
||||
import json
|
||||
import os
|
||||
from time import sleep
|
||||
import meilisearch
|
||||
from pymysql import NULL
|
||||
|
||||
import pytest
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster(request):
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance(
|
||||
"meili",
|
||||
main_configs=["configs/named_collection.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
with_meili=True,
|
||||
)
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def get_meili_client(started_cluster):
|
||||
connection_str = "http://localhost:{}".format(started_cluster.meili_port)
|
||||
return meilisearch.Client(connection_str)
|
||||
|
||||
|
||||
def get_meili_secure_client(started_cluster):
|
||||
connection_str = "http://localhost:{}".format(started_cluster.meili_secure_port)
|
||||
return meilisearch.Client(connection_str, "password")
|
||||
|
||||
|
||||
def push_data(client, table, documents):
|
||||
ans = table.add_documents(documents)
|
||||
client.wait_for_task(ans["uid"])
|
||||
|
||||
|
||||
def push_movies(client):
|
||||
print(SCRIPT_DIR + "/movies.json")
|
||||
json_file = open(SCRIPT_DIR + "/movies.json")
|
||||
movies = json.load(json_file)
|
||||
ans = client.index("movies").add_documents(movies)
|
||||
client.wait_for_task(ans["uid"], 100000)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_simple_select(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
parameters = "'http://meili1:7700', 'new_table', ''"
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS simple_meili_table")
|
||||
node.query(
|
||||
f"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch({parameters})"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
|
||||
assert (
|
||||
node.query("SELECT sum(id) FROM simple_meili_table")
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE simple_meili_table_auto_schema_engine ENGINE=MeiliSearch({parameters})"
|
||||
)
|
||||
node.query(
|
||||
f"CREATE TABLE simple_meili_table_auto_schema_function AS meilisearch({parameters})"
|
||||
)
|
||||
|
||||
expected = "id\tInt64\t\t\t\t\t\ndata\tString\t\t\t\t\t\n"
|
||||
assert (
|
||||
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_engine") == expected
|
||||
)
|
||||
assert (
|
||||
node.query("DESCRIBE TABLE simple_meili_table_auto_schema_function") == expected
|
||||
)
|
||||
|
||||
node.query("DROP TABLE simple_meili_table")
|
||||
node.query("DROP TABLE simple_meili_table_auto_schema_engine")
|
||||
node.query("DROP TABLE simple_meili_table_auto_schema_function")
|
||||
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_insert(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
new_table = client.index("new_table")
|
||||
big_table = client.index("big_table")
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS new_table")
|
||||
node.query(
|
||||
"CREATE TABLE new_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili1:7700', 'new_table', '')"
|
||||
)
|
||||
|
||||
node.query(
|
||||
"INSERT INTO new_table (id, data) VALUES (1, '1') (2, '2') (3, '3') (4, '4') (5, '5') (6, '6') (7, '7')"
|
||||
)
|
||||
sleep(5)
|
||||
assert len(new_table.get_documents()) == 7
|
||||
|
||||
node.query("DROP TABLE IF EXISTS big_table")
|
||||
node.query(
|
||||
"CREATE TABLE big_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili1:7700', 'big_table', '')"
|
||||
)
|
||||
|
||||
values = ""
|
||||
for i in range(1, 40001):
|
||||
values += "(" + str(i) + ", " + "'" + str(i) + "'" + ") "
|
||||
|
||||
node.query("INSERT INTO big_table (id, data) VALUES " + values)
|
||||
sleep(5)
|
||||
ans = big_table.update_sortable_attributes(["id"])
|
||||
client.wait_for_task(ans["uid"])
|
||||
docs = big_table.get_documents({"limit": 40010})
|
||||
assert len(docs) == 40000
|
||||
|
||||
node.query("DROP TABLE new_table")
|
||||
node.query("DROP TABLE big_table")
|
||||
new_table.delete()
|
||||
big_table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_meilimatch(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("movies")
|
||||
table.update_sortable_attributes(["release_date"])
|
||||
table.update_filterable_attributes(["release_date"])
|
||||
|
||||
push_movies(client)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS movies_table")
|
||||
node.query(
|
||||
"CREATE TABLE movies_table(id String, title String, release_date Int64) ENGINE = MeiliSearch('http://meili1:7700', 'movies', '')"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM movies_table") == "19546\n"
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{"attributesToRetrieve": ["id", "title", "release_date"], "limit": 20000},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{
|
||||
"attributesToRetrieve": ["id", "title", "release_date"],
|
||||
"limit": 20000,
|
||||
"sort": ["release_date:asc"],
|
||||
},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\', \'"sort"=["release_date:asc"]\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{
|
||||
"attributesToRetrieve": ["id", "title", "release_date"],
|
||||
"limit": 20000,
|
||||
"sort": ["release_date:desc"],
|
||||
"filter": "release_date < 700000000",
|
||||
},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\', \'"sort"=["release_date:asc"]\', \'"filter"="release_date < 700000000"\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
node.query("DROP TABLE movies_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_incorrect_data_type(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i), "aaaa": "Hello"})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS strange_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE strange_meili_table(id UInt64, data String, bbbb String) ENGINE = MeiliSearch('http://meili1:7700', 'new_table', '')"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error("SELECT bbbb FROM strange_meili_table")
|
||||
assert "MEILISEARCH_MISSING_SOME_COLUMNS" in error
|
||||
|
||||
node.query("DROP TABLE strange_meili_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_simple_select_secure(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS simple_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', 'password')"
|
||||
)
|
||||
|
||||
node.query("DROP TABLE IF EXISTS wrong_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE wrong_meili_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', 'wrong_password')"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
|
||||
assert (
|
||||
node.query("SELECT sum(id) FROM simple_meili_table")
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error("SELECT COUNT() FROM wrong_meili_table")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error("SELECT sum(id) FROM wrong_meili_table")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error("SELECT data FROM wrong_meili_table WHERE id = 42")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
node.query("DROP TABLE simple_meili_table")
|
||||
node.query("DROP TABLE wrong_meili_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_meilimatch_secure(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
table = client.index("movies")
|
||||
table.update_sortable_attributes(["release_date"])
|
||||
table.update_filterable_attributes(["release_date"])
|
||||
|
||||
push_movies(client)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS movies_table")
|
||||
node.query(
|
||||
"CREATE TABLE movies_table(id String, title String, release_date Int64) ENGINE = MeiliSearch('http://meili_secure:7700', 'movies', 'password')"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM movies_table") == "19546\n"
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{"attributesToRetrieve": ["id", "title", "release_date"], "limit": 20000},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{
|
||||
"attributesToRetrieve": ["id", "title", "release_date"],
|
||||
"limit": 20000,
|
||||
"sort": ["release_date:asc"],
|
||||
},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\', \'"sort"=["release_date:asc"]\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
real_json = table.search(
|
||||
"abaca",
|
||||
{
|
||||
"attributesToRetrieve": ["id", "title", "release_date"],
|
||||
"limit": 20000,
|
||||
"sort": ["release_date:desc"],
|
||||
"filter": "release_date < 700000000",
|
||||
},
|
||||
)["hits"]
|
||||
click_ans = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM movies_table WHERE \
|
||||
meiliMatch(\'"q"="abaca"\', \'"sort"=["release_date:asc"]\', \'"filter"="release_date < 700000000"\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
click_json = json.loads(click_ans)
|
||||
assert real_json == click_json
|
||||
|
||||
node.query("DROP TABLE movies_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_incorrect_data_type_secure(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i), "aaaa": "Hello"})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS strange_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE strange_meili_table(id UInt64, data String, bbbb String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', 'password')"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error("SELECT bbbb FROM strange_meili_table")
|
||||
assert "MEILISEARCH_MISSING_SOME_COLUMNS" in error
|
||||
|
||||
node.query("DROP TABLE strange_meili_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_insert_secure(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
new_table = client.index("new_table")
|
||||
big_table = client.index("big_table")
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS new_table")
|
||||
node.query(
|
||||
"CREATE TABLE new_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', 'password')"
|
||||
)
|
||||
|
||||
node.query(
|
||||
"INSERT INTO new_table (id, data) VALUES (1, '1') (2, '2') (3, '3') (4, '4') (5, '5') (6, '6') (7, '7')"
|
||||
)
|
||||
sleep(5)
|
||||
assert len(new_table.get_documents()) == 7
|
||||
|
||||
node.query("DROP TABLE IF EXISTS big_table")
|
||||
node.query(
|
||||
"CREATE TABLE big_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'big_table', 'password')"
|
||||
)
|
||||
|
||||
values = ""
|
||||
for i in range(1, 40001):
|
||||
values += "(" + str(i) + ", " + "'" + str(i) + "'" + ") "
|
||||
|
||||
node.query("INSERT INTO big_table (id, data) VALUES " + values)
|
||||
sleep(5)
|
||||
ans = big_table.update_sortable_attributes(["id"])
|
||||
client.wait_for_task(ans["uid"])
|
||||
docs = big_table.get_documents({"limit": 40010})
|
||||
assert len(docs) == 40000
|
||||
|
||||
node.query("DROP TABLE new_table")
|
||||
node.query("DROP TABLE big_table")
|
||||
new_table.delete()
|
||||
big_table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_security_levels(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
new_table = client.index("new_table")
|
||||
search_key = client.get_keys()["results"][0]["key"]
|
||||
admin_key = client.get_keys()["results"][1]["key"]
|
||||
|
||||
values = ""
|
||||
for i in range(1, 101):
|
||||
values += "(" + str(i) + ", " + "'" + str(i) + "'" + ") "
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS read_table")
|
||||
node.query(
|
||||
f"CREATE TABLE read_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', '{search_key}')"
|
||||
)
|
||||
node.query("DROP TABLE IF EXISTS write_table")
|
||||
node.query(
|
||||
f"CREATE TABLE write_table(id UInt64, data String) ENGINE = MeiliSearch('http://meili_secure:7700', 'new_table', '{admin_key}')"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error(
|
||||
"INSERT INTO read_table (id, data) VALUES " + values
|
||||
)
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
node.query("INSERT INTO write_table (id, data) VALUES " + values)
|
||||
sleep(5)
|
||||
assert len(new_table.get_documents({"limit": 40010})) == 100
|
||||
|
||||
ans1 = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
'SELECT * FROM read_table where meiliMatch(\'"q"=""\') \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0'
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
ans2 = (
|
||||
"["
|
||||
+ ", ".join(
|
||||
node.query(
|
||||
"SELECT * FROM write_table \
|
||||
format JSONEachRow settings output_format_json_quote_64bit_integers=0"
|
||||
).split("\n")[:-1]
|
||||
)
|
||||
+ "]"
|
||||
)
|
||||
|
||||
assert ans1 == ans2
|
||||
|
||||
docs = json.loads(ans1)
|
||||
assert len(docs) == 100
|
||||
|
||||
node.query("DROP TABLE read_table")
|
||||
node.query("DROP TABLE write_table")
|
||||
client.index("new_table").delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_types(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("types_table")
|
||||
|
||||
data = {
|
||||
"id": 1,
|
||||
"UInt8_test": 128,
|
||||
"UInt16_test": 32768,
|
||||
"UInt32_test": 2147483648,
|
||||
"UInt64_test": 9223372036854775808,
|
||||
"Int8_test": -128,
|
||||
"Int16_test": -32768,
|
||||
"Int32_test": -2147483648,
|
||||
"Int64_test": -9223372036854775808,
|
||||
"String_test": "abacaba",
|
||||
"Float32_test": 42.42,
|
||||
"Float64_test": 42.42,
|
||||
"Array_test": [["aba", "caba"], ["2d", "array"]],
|
||||
"Null_test1": "value",
|
||||
"Null_test2": NULL,
|
||||
"Bool_test1": True,
|
||||
"Bool_test2": False,
|
||||
"Json_test": {"a": 1, "b": {"in_json": "qwerty"}},
|
||||
}
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS types_table")
|
||||
node.query(
|
||||
"CREATE TABLE types_table(\
|
||||
id UInt64,\
|
||||
UInt8_test UInt8,\
|
||||
UInt16_test UInt16,\
|
||||
UInt32_test UInt32,\
|
||||
UInt64_test UInt64,\
|
||||
Int8_test Int8,\
|
||||
Int16_test Int16,\
|
||||
Int32_test Int32,\
|
||||
Int64_test Int64,\
|
||||
String_test String,\
|
||||
Float32_test Float32,\
|
||||
Float64_test Float64,\
|
||||
Array_test Array(Array(String)),\
|
||||
Null_test1 Nullable(String),\
|
||||
Null_test2 Nullable(String),\
|
||||
Bool_test1 Boolean,\
|
||||
Bool_test2 Boolean,\
|
||||
Json_test String\
|
||||
) ENGINE = MeiliSearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
|
||||
assert node.query("SELECT id FROM types_table") == "1\n"
|
||||
assert node.query("SELECT UInt8_test FROM types_table") == "128\n"
|
||||
assert node.query("SELECT UInt16_test FROM types_table") == "32768\n"
|
||||
assert node.query("SELECT UInt32_test FROM types_table") == "2147483648\n"
|
||||
assert node.query("SELECT UInt64_test FROM types_table") == "9223372036854775808\n"
|
||||
assert node.query("SELECT Int8_test FROM types_table") == "-128\n"
|
||||
assert node.query("SELECT Int16_test FROM types_table") == "-32768\n"
|
||||
assert node.query("SELECT Int32_test FROM types_table") == "-2147483648\n"
|
||||
assert node.query("SELECT Int64_test FROM types_table") == "-9223372036854775808\n"
|
||||
assert node.query("SELECT String_test FROM types_table") == "abacaba\n"
|
||||
assert node.query("SELECT Float32_test FROM types_table") == "42.42\n"
|
||||
assert node.query("SELECT Float32_test FROM types_table") == "42.42\n"
|
||||
assert (
|
||||
node.query("SELECT Array_test FROM types_table")
|
||||
== "[['aba','caba'],['2d','array']]\n"
|
||||
)
|
||||
assert node.query("SELECT Null_test1 FROM types_table") == "value\n"
|
||||
assert node.query("SELECT Null_test2 FROM types_table") == "NULL\n"
|
||||
assert node.query("SELECT Bool_test1 FROM types_table") == "true\n"
|
||||
assert node.query("SELECT Bool_test2 FROM types_table") == "false\n"
|
||||
assert (
|
||||
node.query("SELECT Json_test FROM types_table")
|
||||
== '{"a":1,"b":{"in_json":"qwerty"}}\n'
|
||||
)
|
||||
|
||||
node.query("DROP TABLE types_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_named_collection(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS simple_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch( named_collection_for_meili )"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
|
||||
assert (
|
||||
node.query("SELECT sum(id) FROM simple_meili_table")
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
|
||||
assert (
|
||||
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
node.query("DROP TABLE simple_meili_table")
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_named_collection_secure(started_cluster):
|
||||
client_secure = get_meili_secure_client(started_cluster)
|
||||
client_free = get_meili_client(started_cluster)
|
||||
table_secure = client_secure.index("new_table")
|
||||
table_free = client_free.index("new_table")
|
||||
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client_secure, table_secure, data)
|
||||
push_data(client_free, table_free, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
node.query("DROP TABLE IF EXISTS simple_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE simple_meili_table(id UInt64, data String) ENGINE = MeiliSearch( named_collection_for_meili_secure )"
|
||||
)
|
||||
|
||||
node.query("DROP TABLE IF EXISTS wrong_meili_table")
|
||||
node.query(
|
||||
"CREATE TABLE wrong_meili_table(id UInt64, data String) ENGINE = MeiliSearch( named_collection_for_meili_secure_no_password )"
|
||||
)
|
||||
|
||||
node.query("DROP TABLE IF EXISTS combine_meili_table")
|
||||
node.query(
|
||||
'CREATE TABLE combine_meili_table(id UInt64, data String) ENGINE = MeiliSearch( named_collection_for_meili_secure_no_password, key="password" )'
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM simple_meili_table") == "100\n"
|
||||
assert (
|
||||
node.query("SELECT sum(id) FROM simple_meili_table")
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query("SELECT data FROM simple_meili_table WHERE id = 42")
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
|
||||
assert node.query("SELECT COUNT() FROM combine_meili_table") == "100\n"
|
||||
assert (
|
||||
node.query("SELECT sum(id) FROM combine_meili_table")
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query("SELECT data FROM combine_meili_table WHERE id = 42")
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error("SELECT COUNT() FROM wrong_meili_table")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error("SELECT sum(id) FROM wrong_meili_table")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error("SELECT data FROM wrong_meili_table WHERE id = 42")
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
node.query("DROP TABLE simple_meili_table")
|
||||
node.query("DROP TABLE wrong_meili_table")
|
||||
node.query("DROP TABLE combine_meili_table")
|
||||
table_secure.delete()
|
||||
table_free.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_table_function(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT COUNT() FROM meilisearch('http://meili1:7700', 'new_table', '')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(id) FROM meilisearch('http://meili1:7700', 'new_table', '')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT data FROM meilisearch('http://meili1:7700', 'new_table', '') WHERE id = 42"
|
||||
)
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_table_function_secure(started_cluster):
|
||||
client = get_meili_secure_client(started_cluster)
|
||||
table = client.index("new_table")
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append({"id": i, "data": hex(i * i)})
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT COUNT() FROM meilisearch('http://meili_secure:7700', 'new_table', 'password')"
|
||||
)
|
||||
== "100\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT sum(id) FROM meilisearch('http://meili_secure:7700', 'new_table', 'password')"
|
||||
)
|
||||
== str(sum(range(0, 100))) + "\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT data FROM meilisearch('http://meili_secure:7700', 'new_table', 'password') WHERE id = 42"
|
||||
)
|
||||
== hex(42 * 42) + "\n"
|
||||
)
|
||||
|
||||
error = node.query_and_get_error(
|
||||
"SELECT COUNT() FROM meilisearch('http://meili_secure:7700', 'new_table', 'wrong_password')"
|
||||
)
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error(
|
||||
"SELECT sum(id) FROM meilisearch('http://meili_secure:7700', 'new_table', 'wrong_password')"
|
||||
)
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
error = node.query_and_get_error(
|
||||
"SELECT data FROM meilisearch('http://meili_secure:7700', 'new_table', 'wrong_password') WHERE id = 42"
|
||||
)
|
||||
assert "MEILISEARCH_EXCEPTION" in error
|
||||
|
||||
table.delete()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
|
||||
def test_types_in_table_function(started_cluster):
|
||||
client = get_meili_client(started_cluster)
|
||||
table = client.index("types_table")
|
||||
|
||||
data = {
|
||||
"id": 1,
|
||||
"UInt8_test": 128,
|
||||
"UInt16_test": 32768,
|
||||
"UInt32_test": 2147483648,
|
||||
"Int8_test": -128,
|
||||
"Int16_test": -32768,
|
||||
"Int32_test": -2147483648,
|
||||
"Int64_test": -9223372036854775808,
|
||||
"String_test": "abacaba",
|
||||
"Float32_test": 42.42,
|
||||
"Float64_test": 42.42,
|
||||
"Array_test": [["aba", "caba"], ["2d", "array"]],
|
||||
"Null_test1": "value",
|
||||
"Null_test2": NULL,
|
||||
"Bool_test1": True,
|
||||
"Bool_test2": False,
|
||||
"Json_test": {"a": 1, "b": {"in_json": "qwerty"}},
|
||||
}
|
||||
|
||||
push_data(client, table, data)
|
||||
|
||||
node = started_cluster.instances["meili"]
|
||||
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT id FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "1\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT UInt8_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "128\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT UInt16_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "32768\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT UInt32_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "2147483648\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Int8_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "-128\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Int16_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "-32768\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Int32_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "-2147483648\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Int64_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "-9223372036854775808\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT String_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "abacaba\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Float32_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "42.42\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Float32_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "42.42\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Array_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "[['aba','caba'],['2d','array']]\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Null_test1 FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "value\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Null_test2 FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "NULL\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Bool_test1 FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "1\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Bool_test2 FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== "0\n"
|
||||
)
|
||||
assert (
|
||||
node.query(
|
||||
"SELECT Json_test FROM meilisearch('http://meili1:7700', 'types_table', '')"
|
||||
)
|
||||
== '{"a":1,"b":{"in_json":"qwerty"}}\n'
|
||||
)
|
||||
|
||||
table.delete()
|
@ -155,7 +155,6 @@ URL [] GLOBAL SOURCES
|
||||
REMOTE [] GLOBAL SOURCES
|
||||
MONGO [] GLOBAL SOURCES
|
||||
REDIS [] GLOBAL SOURCES
|
||||
MEILISEARCH [] GLOBAL SOURCES
|
||||
MYSQL [] GLOBAL SOURCES
|
||||
POSTGRES [] GLOBAL SOURCES
|
||||
SQLITE [] GLOBAL SOURCES
|
||||
|
@ -6,7 +6,6 @@ file
|
||||
generateRandom
|
||||
input
|
||||
jdbc
|
||||
meilisearch
|
||||
merge
|
||||
mongodb
|
||||
null
|
||||
|
@ -430,7 +430,6 @@ mapUpdate
|
||||
match
|
||||
materialize
|
||||
max2
|
||||
meiliMatch
|
||||
metroHash64
|
||||
min2
|
||||
minSampleSizeContinous
|
||||
|
Loading…
Reference in New Issue
Block a user