WHERE over QueryTree, JSON fix, code clean-up

This commit is contained in:
Kirill Nikiforov 2024-07-27 15:20:24 +03:00
parent edb3d0ab42
commit 610ac0adf4
No known key found for this signature in database
21 changed files with 766 additions and 809 deletions

View File

@ -83,6 +83,7 @@ set(MONGOC_ENABLE_SSL_SECURE_CHANNEL 0)
set(MONGOC_ENABLE_SSL_SECURE_TRANSPORT 0)
set(MONGOC_ENABLE_SSL_LIBRESSL 0)
set(MONGOC_ENABLE_CRYPTO_LIBCRYPTO 0)
set(MONGOC_ENABLE_CLIENT_SIDE_ENCRYPTION 0)
set(MONGOC_HAVE_ASN1_STRING_GET0_DATA 0)
if (ENABLE_SSL)
set(MONGOC_ENABLE_SSL 1)
@ -90,6 +91,8 @@ if (ENABLE_SSL)
set(MONGOC_ENABLE_SSL_OPENSSL 1)
set(MONGOC_ENABLE_CRYPTO_LIBCRYPTO 1)
set(MONGOC_HAVE_ASN1_STRING_GET0_DATA 1)
else ()
message(WARNING "Building mongoc without SSL")
endif()
set(CMAKE_EXTRA_INCLUDE_FILES "sys/socket.h")
@ -102,7 +105,6 @@ set(MONGOC_NO_AUTOMATIC_GLOBALS 1)
set(MONGOC_ENABLE_STATIC_INSTALL 0)
set(MONGOC_ENABLE_SHM_COUNTERS 0)
set(MONGOC_HAVE_SCHED_GETCPU 0)
set(MONGOC_ENABLE_CLIENT_SIDE_ENCRYPTION 0)
set(MONGOC_HAVE_SS_FAMILY 0)
configure_file(

View File

@ -32,6 +32,7 @@ If key not found in MongoDB document, default value or null(if the column is nul
## Supported clauses
**You can disable all these restriction, see [mongodb_fail_on_query_build_error](../../../operations/settings/settings.md#mongodb_fail_on_query_build_error).**
*If `allow_experimental_analyzer=0`, ClickHouse will not try to build MongoDB query, sort and limit.*
*Hint: you can use MongoDB table in CTE to perform any clauses, but be aware, that in some cases, performance will be significantly degraded.*
@ -116,11 +117,11 @@ CREATE TABLE mongo_table
(
key UInt64,
data String
) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'testuser', 'clickhouse');
) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'testuser', 'password');
```
or
``` sql
ENGINE = MongoDB('mongodb://testuser:clickhouse@mongo1:27017/test', 'simple_table');
ENGINE = MongoDB('mongodb://testuser:password@mongo1:27017/test', 'simple_table');
```
To read from an SSL secured MongoDB server:
@ -130,7 +131,7 @@ CREATE TABLE mongo_table_ssl
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'ssl=true');
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'password', 'ssl=true');
```
Query:
@ -152,7 +153,7 @@ CREATE TABLE mongo_table
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'connectTimeoutMS=100000');
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'password', 'connectTimeoutMS=100000');
```
## Troubleshooting

View File

@ -5593,6 +5593,6 @@ Default value: `1GiB`.
If enabled, MongoDB tables will return an error when a MongoDB query can't be built.
Not applied for the legacy implementation, or if the new analyzer is used.
Not applied for the legacy implementation, or when 'allow_experimental_analyzer=0`.
Default value: `true`.

377
src/Common/BSONCXXHelper.h Normal file
View File

@ -0,0 +1,377 @@
#pragma once
#include "config.h"
#if USE_MONGODB
#include <Common/Base64.h>
#include <DataTypes/FieldToDataType.h>
#include <bsoncxx/builder/basic/array.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/json.hpp>
using bsoncxx::builder::basic::array;
using bsoncxx::builder::basic::document;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int NOT_IMPLEMENTED;
}
namespace BSONCXXHelper
{
static bsoncxx::types::bson_value::value fieldAsBSONValue(const Field & field, const DataTypePtr & type)
{
switch (type->getTypeId())
{
case TypeIndex::String:
return field.safeGet<String>();
case TypeIndex::UInt8: {
if (isBool(type))
return field.safeGet<UInt8>() != 0;
return static_cast<Int32>(field.safeGet<UInt8>());
}
case TypeIndex::UInt16:
return static_cast<Int32>(field.safeGet<UInt16>());
case TypeIndex::UInt32:
return static_cast<Int32>(field.safeGet<UInt32>());
case TypeIndex::UInt64:
return static_cast<Int64>(field.safeGet<UInt64>());
case TypeIndex::Int8:
return field.safeGet<Int8 &>();
case TypeIndex::Int16:
return field.safeGet<Int16>();
case TypeIndex::Int32:
return field.safeGet<Int32>();
case TypeIndex::Int64:
return field.safeGet<Int64>();
case TypeIndex::Float32:
return field.safeGet<Float32>();
case TypeIndex::Float64:
return field.safeGet<Float64>();
case TypeIndex::Date:
return std::chrono::milliseconds(field.safeGet<UInt16>() * 1000);
case TypeIndex::Date32:
return std::chrono::milliseconds(field.safeGet<Int32>() * 1000);
case TypeIndex::DateTime:
return std::chrono::milliseconds(field.safeGet<UInt32>() * 1000);
case TypeIndex::DateTime64:
return std::chrono::milliseconds(field.safeGet<Decimal64>().getValue());
case TypeIndex::UUID:
return static_cast<String>(formatUUID(field.safeGet<UUID>()));
case TypeIndex::Tuple: {
auto arr = array();
for (const auto & elem : field.safeGet<Tuple &>())
arr.append(fieldAsBSONValue(elem, applyVisitor(FieldToDataType(), elem)));
return arr.view();
}
case TypeIndex::Array: {
auto arr = array();
for (const auto & elem : field.safeGet<Array &>())
arr.append(fieldAsBSONValue(elem, applyVisitor(FieldToDataType(), elem)));
return arr.view();
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Fields with type '{}' is not supported.", type->getPrettyName());
}
}
template <typename T>
static JSONBuilder::ItemPtr BSONElementAsJSON(const T & value)
{
switch (value.type())
{
case bsoncxx::type::k_string:
return std::make_unique<JSONBuilder::JSONString>(std::string(value.get_string().value));
case bsoncxx::type::k_symbol:
return std::make_unique<JSONBuilder::JSONString>(std::string(value.get_string().value));
case bsoncxx::type::k_oid:
return std::make_unique<JSONBuilder::JSONString>(value.get_oid().value.to_string());
case bsoncxx::type::k_binary:
return std::make_unique<JSONBuilder::JSONString>(
std::string(reinterpret_cast<const char *>(value.get_binary().bytes), value.get_binary().size));
case bsoncxx::type::k_bool:
return std::make_unique<JSONBuilder::JSONBool>(value.get_bool());
case bsoncxx::type::k_int32:
return std::make_unique<JSONBuilder::JSONNumber<Int32>>(value.get_int32());
case bsoncxx::type::k_int64:
return std::make_unique<JSONBuilder::JSONNumber<Int64>>(value.get_int64());
case bsoncxx::type::k_double:
return std::make_unique<JSONBuilder::JSONNumber<Float64>>(value.get_double());
case bsoncxx::type::k_date:
return std::make_unique<JSONBuilder::JSONString>(DateLUT::instance().timeToString(value.get_date().to_int64() / 1000));
case bsoncxx::type::k_timestamp:
return std::make_unique<JSONBuilder::JSONString>(DateLUT::instance().timeToString(value.get_timestamp().timestamp));
case bsoncxx::type::k_document: {
auto doc = std::make_unique<JSONBuilder::JSONMap>();
for (const auto & elem : value.get_document().value)
doc->add(std::string(elem.key()), BSONElementAsJSON(elem));
return doc;
}
case bsoncxx::type::k_array: {
auto arr = std::make_unique<JSONBuilder::JSONArray>();
for (const auto & elem : value.get_array().value)
arr->add(BSONElementAsJSON(elem));
return arr;
}
case bsoncxx::type::k_regex: {
auto doc = std::make_unique<JSONBuilder::JSONMap>();
doc->add(std::string(value.get_regex().regex), std::string(value.get_regex().options));
return doc;
}
case bsoncxx::type::k_dbpointer: {
auto doc = std::make_unique<JSONBuilder::JSONMap>();
doc->add(value.get_dbpointer().value.to_string(), std::string(value.get_dbpointer().collection));
return doc;
}
case bsoncxx::type::k_null:
return std::make_unique<JSONBuilder::JSONNull>();
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "BSON type {} is unserializable.", bsoncxx::to_string(value.type()));
}
}
template <typename T>
static std::string BSONElementAsString(const T & value, const JSONBuilder::FormatSettings & json_format_settings)
{
switch (value.type())
{
case bsoncxx::type::k_string:
return std::string(value.get_string().value);
case bsoncxx::type::k_oid:
return value.get_oid().value.to_string();
case bsoncxx::type::k_binary:
return base64Encode(std::string(reinterpret_cast<const char *>(value.get_binary().bytes), value.get_binary().size));
case bsoncxx::type::k_bool:
return value.get_bool().value ? "true" : "false";
case bsoncxx::type::k_int32:
return std::to_string(static_cast<Int64>(value.get_int32().value));
case bsoncxx::type::k_int64:
return std::to_string(value.get_int64().value);
case bsoncxx::type::k_double:
return std::to_string(value.get_double().value);
case bsoncxx::type::k_decimal128:
return value.get_decimal128().value.to_string();
case bsoncxx::type::k_date:
return DateLUT::instance().timeToString(value.get_date().to_int64() / 1000);
case bsoncxx::type::k_timestamp:
return DateLUT::instance().timeToString(value.get_timestamp().timestamp);
// MongoDB's documents and arrays may not have strict types or be nested, so the most optimal solution is store their JSON representations.
// bsoncxx::to_json function will return something like "'number': {'$numberInt': '321'}", this why we have to use own implementation.
case bsoncxx::type::k_document:
case bsoncxx::type::k_array:
case bsoncxx::type::k_regex:
case bsoncxx::type::k_dbpointer:
case bsoncxx::type::k_symbol: {
WriteBufferFromOwnString buf;
auto format_context = JSONBuilder::FormatContext{.out = buf};
BSONElementAsJSON(value)->format(json_format_settings, format_context);
std::cout << buf.str() << std::endl;
return buf.str();
}
case bsoncxx::type::k_undefined:
return "undefined";
case bsoncxx::type::k_null:
return "null";
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "BSON type {} is unserializable.", bsoncxx::to_string(value.type()));
}
}
template <typename T, typename T2>
static T BSONElementAsNumber(const T2 & value, const std::string & name)
{
switch (value.type())
{
case bsoncxx::type::k_bool:
return static_cast<T>(value.get_bool());
case bsoncxx::type::k_int32:
return static_cast<T>(value.get_int32());
case bsoncxx::type::k_int64:
return static_cast<T>(value.get_int64());
case bsoncxx::type::k_double:
return static_cast<T>(value.get_double());
default:
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, {} cannot be converted to number for column {}.",
bsoncxx::to_string(value.type()),
name);
}
}
static Array BSONArrayAsArray(
size_t dimensions,
const bsoncxx::types::b_array & array,
const DataTypePtr & type,
const Field & default_value,
const std::string & name,
const JSONBuilder::FormatSettings & json_format_settings)
{
auto arr = Array();
if (dimensions > 0)
{
--dimensions;
for (auto const & elem : array.value)
{
if (elem.type() != bsoncxx::type::k_array)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Array {} have less dimensions then defined in the schema.", name);
arr.emplace_back(BSONArrayAsArray(dimensions, elem.get_array(), type, default_value, name, json_format_settings));
}
}
else
{
for (auto const & value : array.value)
{
if (value.type() == bsoncxx::type::k_null)
arr.emplace_back(default_value);
else
{
switch (type->getTypeId())
{
case TypeIndex::Int8:
arr.emplace_back(BSONElementAsNumber<Int8, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt8:
arr.emplace_back(BSONElementAsNumber<UInt8, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int16:
arr.emplace_back(BSONElementAsNumber<Int16, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt16:
arr.emplace_back(BSONElementAsNumber<UInt16, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int32:
arr.emplace_back(BSONElementAsNumber<Int32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt32:
arr.emplace_back(BSONElementAsNumber<UInt32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int64:
arr.emplace_back(BSONElementAsNumber<Int64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt64:
arr.emplace_back(BSONElementAsNumber<UInt64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int128:
arr.emplace_back(BSONElementAsNumber<Int128, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt128:
arr.emplace_back(BSONElementAsNumber<UInt128, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int256:
arr.emplace_back(BSONElementAsNumber<Int256, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt256:
arr.emplace_back(BSONElementAsNumber<UInt256, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Float32:
arr.emplace_back(BSONElementAsNumber<Float32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Float64:
arr.emplace_back(BSONElementAsNumber<Float64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Date: {
if (value.type() != bsoncxx::type::k_date)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, expected date, got {} for column {}.",
bsoncxx::to_string(value.type()),
name);
arr.emplace_back(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType());
break;
}
case TypeIndex::Date32: {
if (value.type() != bsoncxx::type::k_date)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, expected date, got {} for column {}.",
bsoncxx::to_string(value.type()),
name);
arr.emplace_back(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType());
break;
}
case TypeIndex::DateTime: {
if (value.type() != bsoncxx::type::k_date)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, expected date, got {} for column {}.",
bsoncxx::to_string(value.type()),
name);
arr.emplace_back(static_cast<UInt32>(value.get_date().to_int64() / 1000));
break;
}
case TypeIndex::DateTime64: {
if (value.type() != bsoncxx::type::k_date)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, expected date, got {} for column {}.",
bsoncxx::to_string(value.type()),
name);
arr.emplace_back(static_cast<Decimal64>(value.get_date().to_int64()));
break;
}
case TypeIndex::UUID: {
if (value.type() != bsoncxx::type::k_string)
throw Exception(
ErrorCodes::TYPE_MISMATCH,
"Type mismatch, expected string (UUID), got {} for column {}.",
bsoncxx::to_string(value.type()),
name);
arr.emplace_back(parse<UUID>(value.get_string().value.data()));
break;
}
case TypeIndex::String:
arr.emplace_back(BSONElementAsString(value, json_format_settings));
break;
default:
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Array {} has unsupported nested type {}.",
name,
type->getName());
}
}
}
}
return arr;
}
static bsoncxx::types::bson_value::value fieldAsOID(const Field & field)
{
switch (field.getType())
{
case Field::Types::String:
return bsoncxx::oid(field.safeGet<String &>());
case Field::Types::Array: {
auto arr = array();
for (const auto & elem : field.safeGet<Array &>())
arr.append(fieldAsOID(elem));
return arr.view();
}
case Field::Types::Tuple: {
auto tuple = array();
for (const auto & elem : field.safeGet<Tuple &>())
tuple.append(fieldAsOID(elem));
return tuple.view();
}
default:
throw Exception(ErrorCodes::TYPE_MISMATCH, "{} can't be converted to oid.", field.getType());
}
}
}
}
#endif

View File

@ -604,6 +604,7 @@
M(723, PARQUET_EXCEPTION) \
M(724, TOO_MANY_TABLES) \
M(725, TOO_MANY_DATABASES) \
M(725, FAILED_TO_BUILD_MONGODB_QUERY) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -35,7 +35,7 @@ void JSONArray::format(const FormatSettings & settings, FormatContext & context)
context.offset += settings.indent;
bool single_row = settings.print_simple_arrays_in_single_row && isSimpleArray(values);
bool single_row = settings.solid || (settings.print_simple_arrays_in_single_row && isSimpleArray(values));
bool first = true;
for (const auto & value : values)
@ -48,7 +48,7 @@ void JSONArray::format(const FormatSettings & settings, FormatContext & context)
writeChar('\n', context.out);
writeChar(' ', context.offset, context.out);
}
else if (!first)
else if (!first && !settings.solid)
writeChar(' ', context.out);
first = false;
@ -80,20 +80,33 @@ void JSONMap::format(const FormatSettings & settings, FormatContext & context)
writeChar(',', context.out);
first = false;
writeChar('\n', context.out);
writeChar(' ', context.offset, context.out);
if (!settings.solid)
{
writeChar('\n', context.out);
writeChar(' ', context.offset, context.out);
}
writeJSONString(value.key, context.out, settings.settings);
writeChar(':', context.out);
writeChar(' ', context.out);
if (!settings.solid)
writeChar(' ', context.out);
value.value->format(settings, context);
}
context.offset -= settings.indent;
writeChar('\n', context.out);
writeChar(' ', context.offset, context.out);
if (!settings.solid)
{
writeChar('\n', context.out);
writeChar(' ', context.offset, context.out);
}
writeChar('}', context.out);
}
void JSONNull::format(const FormatSettings &, FormatContext & context)
{
writeString("null", context.out);
}
}

View File

@ -13,6 +13,7 @@ struct FormatSettings
const DB::FormatSettings & settings;
size_t indent = 2;
bool print_simple_arrays_in_single_row = true;
bool solid = false;
};
struct FormatContext
@ -111,4 +112,10 @@ private:
std::vector<Pair> values;
};
class JSONNull : public IItem
{
public:
void format(const FormatSettings & settings, FormatContext & context) override;
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <re2/re2.h>
namespace DB
{
inline bool maskURIPassword(std::string * uri)
{
return RE2::Replace(uri, R"(([^:]+://[^:]*):([^@]*)@(.*))", "\\1:[HIDDEN]@\\3");
}
}

View File

@ -935,7 +935,7 @@ class IColumn;
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
M(Bool, allow_deprecated_error_prone_window_functions, false, "Allow usage of deprecated error prone window functions (neighbor, runningAccumulate, runningDifferenceStartingWithFirstValue, runningDifference)", 0) \
M(Bool, uniform_snowflake_conversion_functions, true, "Enables functions snowflakeIDToDateTime[64] and dateTime[64]ToSnowflakeID while disabling functions snowflakeToDateTime[64] and dateTime[64]ToSnowflake.", 0) \
M(Bool, mongodb_fail_on_query_build_error, true, "If enabled, MongoDB tables will return an error when a MongoDB query can't be built. Not applied for the legacy implementation, or if the new analyzer is used.", 0) \
M(Bool, mongodb_fail_on_query_build_error, true, "If enabled, MongoDB tables will return an error when a MongoDB query can't be built. Not applied for the legacy implementation, or when 'allow_experimental_analyzer=0`.", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS, move obsolete settings to OBSOLETE_SETTINGS and obsolete format settings to OBSOLETE_FORMAT_SETTINGS.

View File

@ -8,6 +8,7 @@
#include <Common/KnownObjectNames.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <Common/maskURIPassword.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
@ -715,8 +716,19 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
}
settings.ostr << "'[HIDDEN]'";
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
if (secret_arguments.is_uri)
{
WriteBufferFromOwnString temp_buf;
FormatSettings tmp_settings(temp_buf, settings.one_line);
FormatState tmp_state;
argument->formatImpl(tmp_settings, tmp_state, nested_dont_need_parens);
maskURIPassword(&temp_buf.str());
settings.ostr << temp_buf.str();
}
else
settings.ostr << "'[HIDDEN]'";
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named && !secret_arguments.is_uri)
break; /// All other arguments should also be hidden.
continue;
}

View File

@ -2,6 +2,7 @@
#include <Poco/String.h>
#include <Common/SipHash.h>
#include <Common/maskURIPassword.h>
#include <IO/Operators.h>
namespace DB
@ -35,6 +36,17 @@ void ASTPair::formatImpl(const FormatSettings & settings, FormatState & state, F
/// SOURCE(CLICKHOUSE(host 'example01-01-1' port 9000 user 'default' password '[HIDDEN]' db 'default' table 'ids'))
settings.ostr << "'[HIDDEN]'";
}
else if (!settings.show_secrets && (first == "uri"))
{
// Hide password from URI in the defention of a dictionary
WriteBufferFromOwnString temp_buf;
FormatSettings tmp_settings(temp_buf, settings.one_line);
FormatState tmp_state;
second->formatImpl(tmp_settings, tmp_state, frame);
maskURIPassword(&temp_buf.str());
settings.ostr << temp_buf.str();
}
else
{
second->formatImpl(settings, state, frame);

View File

@ -15,6 +15,7 @@ public:
size_t count = 0; /// Mostly it's either 0 or 1. There are only a few cases where `count` can be greater than 1 (e.g. see `encrypt`).
/// In all known cases secret arguments are consecutive
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
bool is_uri = false; /// Arguments like 'mongodb://username:password@127.0.0.1:27017'.
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
std::vector<std::string> nested_maps;

View File

@ -47,7 +47,7 @@ private:
const ASTs * arguments = nullptr;
FunctionSecretArgumentsFinder::Result result;
void markSecretArgument(size_t index, bool argument_is_named = false)
void markSecretArgument(size_t index, bool argument_is_named = false, bool is_uri = false)
{
if (index >= arguments->size())
return;
@ -55,6 +55,7 @@ private:
{
result.start = index;
result.are_named = argument_is_named;
result.is_uri = is_uri;
}
chassert(index >= result.start); /// We always check arguments consecutively
result.count = index + 1 - result.start;
@ -64,13 +65,17 @@ private:
void findOrdinaryFunctionSecretArguments()
{
if ((function.name == "mysql") || (function.name == "postgresql") || (function.name == "mongodb"))
if ((function.name == "mysql") || (function.name == "postgresql"))
{
/// mysql('host:port', 'database', 'table', 'user', 'password', ...)
/// postgresql('host:port', 'database', 'table', 'user', 'password', ...)
/// mongodb('host:port', 'database', 'collection', 'user', 'password', ...)
findMySQLFunctionSecretArguments();
}
else if (function.name == "mongodb")
{
findMongoDBSecretArguments();
}
else if ((function.name == "s3") || (function.name == "cosn") || (function.name == "oss") ||
(function.name == "deltaLake") || (function.name == "hudi") || (function.name == "iceberg"))
{
@ -330,8 +335,7 @@ private:
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
findExternalDistributedTableEngineSecretArguments();
}
else if ((engine_name == "MySQL") || (engine_name == "PostgreSQL") ||
(engine_name == "MaterializedPostgreSQL") || (engine_name == "MongoDB"))
else if ((engine_name == "MySQL") || (engine_name == "PostgreSQL") || (engine_name == "MaterializedPostgreSQL"))
{
/// MySQL('host:port', 'database', 'table', 'user', 'password', ...)
/// PostgreSQL('host:port', 'database', 'table', 'user', 'password', ...)
@ -339,6 +343,10 @@ private:
/// MongoDB('host:port', 'database', 'collection', 'user', 'password', ...)
findMySQLFunctionSecretArguments();
}
else if (engine_name == "MongoDB")
{
findMongoDBSecretArguments();
}
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS") ||
(engine_name == "DeltaLake") || (engine_name == "Hudi") || (engine_name == "Iceberg") || (engine_name == "S3Queue"))
{
@ -469,7 +477,7 @@ private:
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
bool findSecretNamedArgument(const std::string_view & key, size_t start = 0, bool is_uri = false)
{
for (size_t i = start; i < arguments->size(); ++i)
{
@ -491,8 +499,30 @@ private:
continue;
if (found_key == key)
markSecretArgument(i, /* argument_is_named= */ true);
{
markSecretArgument(i, /* argument_is_named= */ true, is_uri);
return true;
}
}
return false;
}
void findMongoDBSecretArguments()
{
if (isNamedCollectionName(0))
{
/// MongoDB(named_collection, ..., password = 'password', ...)
if (!findSecretNamedArgument("password", 1, false))
/// MongoDB(named_collection, ..., uri = 'mongodb://username:password@127.0.0.1:27017', ...)
findSecretNamedArgument("uri", 1, true);
}
else if (arguments->size() == 2)
// MongoDB('mongodb://username:password@127.0.0.1:27017', 'collection')
markSecretArgument(0, false, true);
else
// MongoDB('127.0.0.1:27017', 'database', 'collection', 'user, 'password'...)
markSecretArgument(4, false, false);
}
};

View File

@ -3,7 +3,6 @@
#if USE_MONGODB
#include "MongoDBSource.h"
#include <string>
#include <vector>
#include <Columns/ColumnArray.h>
@ -13,14 +12,13 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <Formats/FormatFactory.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <Common/Exception.h>
#include <Common/Base64.h>
#include <Common/BSONCXXHelper.h>
#include <base/range.h>
#include <bsoncxx/document/element.hpp>
#include <bsoncxx/json.hpp>
namespace DB
{
@ -31,304 +29,9 @@ extern const int TYPE_MISMATCH;
extern const int NOT_IMPLEMENTED;
}
std::string escapeJSONString(const std::string & input)
{
std::string escaped;
escaped.reserve(input.length());
for (size_t i = 0; i < input.length(); ++i)
{
switch (input[i])
{
case '"':
escaped += "\\\"";
break;
case '/':
escaped += "\\/";
break;
case '\\':
escaped += "\\\\";
break;
case '\f':
escaped += "\\f";
break;
case '\n':
escaped += "\\n";
break;
case '\r':
escaped += "\\r";
break;
case '\t':
escaped += "\\t";
break;
case '\b':
escaped += "\\b";
break;
default:
escaped += input[i];
break;
}
}
return escaped;
}
template <typename T>
std::string BSONElementAsString(const T & value)
{
switch (value.type())
{
case bsoncxx::type::k_double:
return std::to_string(value.get_double().value);
case bsoncxx::type::k_string:
return static_cast<std::string>(value.get_string().value);
// MongoDB's documents and arrays may not have strict types or be nested, so the most optimal solution is store their JSON representations.
// bsoncxx::to_json function will return something like "'number': {'$numberInt': '321'}", this why we need own realization.
case bsoncxx::type::k_document:
{
String json = "{";
auto first = true;
for (const auto & elem : value.get_document().view())
{
if (!first)
json += ',';
json += '"' + escapeJSONString(std::string(elem.key())) + "\":";
switch (elem.type())
{
// types which need to be quoted
case bsoncxx::type::k_binary:
case bsoncxx::type::k_date:
case bsoncxx::type::k_timestamp:
case bsoncxx::type::k_oid:
case bsoncxx::type::k_symbol:
case bsoncxx::type::k_maxkey:
case bsoncxx::type::k_minkey:
case bsoncxx::type::k_undefined:
json += "\"" + BSONElementAsString<bsoncxx::document::element>(elem) + "\"";
break;
// types which need to be escaped
case bsoncxx::type::k_string:
case bsoncxx::type::k_code:
json += "\"" + escapeJSONString(BSONElementAsString<bsoncxx::document::element>(elem)) + "\"";
break;
default:
json += BSONElementAsString<bsoncxx::document::element>(elem);
}
first = false;
}
json += '}';
return json;
}
case bsoncxx::type::k_array:
{
String json = "[";
auto first = true;
for (const auto & elem : value.get_array().value)
{
if (!first)
json += ',';
switch (elem.type())
{
// types which need to be quoted
case bsoncxx::type::k_binary:
case bsoncxx::type::k_date:
case bsoncxx::type::k_timestamp:
case bsoncxx::type::k_oid:
case bsoncxx::type::k_symbol:
case bsoncxx::type::k_maxkey:
case bsoncxx::type::k_minkey:
case bsoncxx::type::k_undefined:
json += "\"" + BSONElementAsString<bsoncxx::array::element>(elem) + "\"";
break;
// types which need to be escaped
case bsoncxx::type::k_string:
case bsoncxx::type::k_code:
json += "\"" + escapeJSONString(BSONElementAsString<bsoncxx::array::element>(elem)) + "\"";
break;
default:
json += BSONElementAsString<bsoncxx::array::element>(elem);
}
first = false;
}
json += ']';
return json;
}
case bsoncxx::type::k_binary:
return base64Encode(std::string(reinterpret_cast<const char*>(value.get_binary().bytes), value.get_binary().size));
case bsoncxx::type::k_undefined:
return "undefined";
case bsoncxx::type::k_oid:
return value.get_oid().value.to_string();
case bsoncxx::type::k_bool:
return value.get_bool().value ? "true" : "false";
case bsoncxx::type::k_date:
return DateLUT::instance().timeToString(value.get_date().to_int64() / 1000);
case bsoncxx::type::k_null:
return "null";
case bsoncxx::type::k_regex:
return R"({"regex": ")" + escapeJSONString(std::string(value.get_regex().regex)) + R"(","options":")" + escapeJSONString(std::string(value.get_regex().regex)) + "\"}";
case bsoncxx::type::k_dbpointer:
return "{\"" + escapeJSONString(value.get_dbpointer().value.to_string()) + "\":\"" + escapeJSONString(std::string(value.get_dbpointer().collection)) + "\"}";
case bsoncxx::type::k_symbol:
return {1, value.get_symbol().symbol.at(0)};
case bsoncxx::type::k_int32:
return std::to_string(static_cast<Int64>(value.get_int32().value));
case bsoncxx::type::k_timestamp:
return DateLUT::instance().timeToString(value.get_timestamp().timestamp);
case bsoncxx::type::k_int64:
return std::to_string(value.get_int64().value);
case bsoncxx::type::k_decimal128:
return value.get_decimal128().value.to_string();
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "BSON type {} is unserializable", to_string(value.type()));
}
}
template <typename T, typename T2>
T BSONElementAsNumber(const T2 & value, const std::string & name)
{
switch (value.type())
{
case bsoncxx::type::k_int64:
return static_cast<T>(value.get_int64());
case bsoncxx::type::k_int32:
return static_cast<T>(value.get_int32());
case bsoncxx::type::k_double:
return static_cast<T>(value.get_double());
case bsoncxx::type::k_bool:
return static_cast<T>(value.get_bool());
default:
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, {} cannot be converted to number for column {}",
bsoncxx::to_string(value.type()), name);
}
}
Array BSONArrayAsArray(size_t dimensions, const bsoncxx::types::b_array & array, const DataTypePtr & type, const Field & default_value, const std::string & name)
{
auto arr = Array();
if (dimensions > 0)
{
--dimensions;
for (auto const & elem : array.value)
{
if (elem.type() != bsoncxx::type::k_array)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Array {} have less dimensions then defined in the schema", name);
arr.emplace_back(BSONArrayAsArray(dimensions, elem.get_array(), type, default_value, name));
}
}
else
{
for (auto const & value : array.value)
{
if (value.type() == bsoncxx::type::k_null)
arr.emplace_back(default_value);
else
{
switch (type->getTypeId())
{
case TypeIndex::Int8:
arr.emplace_back(BSONElementAsNumber<Int8, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt8:
arr.emplace_back(BSONElementAsNumber<UInt8, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int16:
arr.emplace_back(BSONElementAsNumber<Int16, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt16:
arr.emplace_back(BSONElementAsNumber<UInt16, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int32:
arr.emplace_back(BSONElementAsNumber<Int32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt32:
arr.emplace_back(BSONElementAsNumber<UInt32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int64:
arr.emplace_back(BSONElementAsNumber<Int64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt64:
arr.emplace_back(BSONElementAsNumber<UInt64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int128:
arr.emplace_back(BSONElementAsNumber<Int128, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt128:
arr.emplace_back(BSONElementAsNumber<UInt128, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Int256:
arr.emplace_back(BSONElementAsNumber<Int256, bsoncxx::array::element>(value, name));
break;
case TypeIndex::UInt256:
arr.emplace_back(BSONElementAsNumber<UInt256, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Float32:
arr.emplace_back(BSONElementAsNumber<Float32, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Float64:
arr.emplace_back(BSONElementAsNumber<Float64, bsoncxx::array::element>(value, name));
break;
case TypeIndex::Date:
{
if (value.type() != bsoncxx::type::k_date)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
arr.emplace_back(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType());
break;
}
case TypeIndex::Date32:
{
if (value.type() != bsoncxx::type::k_date)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
arr.emplace_back(static_cast<Int32>(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType()));
break;
}
case TypeIndex::DateTime:
{
if (value.type() != bsoncxx::type::k_date)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
arr.emplace_back(static_cast<UInt32>(value.get_date().to_int64() / 1000));
break;
}
case TypeIndex::DateTime64:
{
if (value.type() != bsoncxx::type::k_date)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
arr.emplace_back(static_cast<Decimal64>(value.get_date().to_int64()));
break;
}
case TypeIndex::UUID:
{
if (value.type() != bsoncxx::type::k_string)
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected string (UUID), got {} for column {}",
bsoncxx::to_string(value.type()), name);
arr.emplace_back(parse<UUID>(value.get_string().value.data()));
break;
}
case TypeIndex::String:
arr.emplace_back(BSONElementAsString(value));
break;
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Array {} has unsupported nested type {}", name, type->getName());
}
}
}
}
return arr;
}
using BSONCXXHelper::BSONElementAsNumber;
using BSONCXXHelper::BSONArrayAsArray;
using BSONCXXHelper::BSONElementAsString;
void MongoDBSource::insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); }
@ -393,7 +96,7 @@ void MongoDBSource::insertValue(IColumn & column, const size_t & idx, const Data
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
assert_cast<ColumnInt32 &>(column).insertValue(static_cast<Int32>(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType()));
assert_cast<ColumnInt32 &>(column).insertValue(DateLUT::instance().toDayNum(value.get_date().to_int64() / 1000).toUnderType());
break;
}
case TypeIndex::DateTime:
@ -411,7 +114,7 @@ void MongoDBSource::insertValue(IColumn & column, const size_t & idx, const Data
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected date, got {} for column {}",
bsoncxx::to_string(value.type()), name);
assert_cast<DB::ColumnDecimal<DB::DateTime64> &>(column).insertValue(static_cast<Decimal64>(value.get_date().to_int64()));
assert_cast<ColumnDecimal<DateTime64> &>(column).insertValue(value.get_date().to_int64());
break;
}
case TypeIndex::UUID:
@ -425,7 +128,7 @@ void MongoDBSource::insertValue(IColumn & column, const size_t & idx, const Data
}
case TypeIndex::String:
{
assert_cast<ColumnString &>(column).insert(BSONElementAsString(value));
assert_cast<ColumnString &>(column).insert(BSONElementAsString(value, json_format_settings));
break;
}
case TypeIndex::Array:
@ -434,7 +137,7 @@ void MongoDBSource::insertValue(IColumn & column, const size_t & idx, const Data
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type mismatch, expected array, got {} for column {}",
bsoncxx::to_string(value.type()), name);
assert_cast<ColumnArray &>(column).insert(BSONArrayAsArray(arrays_info[idx].first, value.get_array(), arrays_info[idx].second.first, arrays_info[idx].second.second, name));
assert_cast<ColumnArray &>(column).insert(BSONArrayAsArray(arrays_info[idx].first, value.get_array(), arrays_info[idx].second.first, arrays_info[idx].second.second, name, json_format_settings));
break;
}
default:

View File

@ -4,6 +4,8 @@
#if USE_MONGODB
#include <Processors/ISource.h>
#include <Interpreters/Context.h>
#include <Common/JSONBuilder.h>
#include <mongocxx/client.hpp>
#include <mongocxx/collection.hpp>
@ -44,6 +46,7 @@ private:
std::unordered_map<size_t, std::pair<size_t, std::pair<DataTypePtr, Field>>> arrays_info;
const UInt64 max_block_size;
JSONBuilder::FormatSettings json_format_settings = {{}, 0, true, true};
bool all_read = false;
};

View File

@ -3,14 +3,13 @@
#if USE_MONGODB
#include <memory>
#include <Analyzer/QueryNode.h>
#include <Analyzer/ColumnNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/QueryNode.h>
#include <Analyzer/SortNode.h>
#include <Formats/BSONTypes.h>
#include <DataTypes/FieldToDataType.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Sources/MongoDBSource.h>
#include <QueryPipeline/Pipe.h>
@ -19,14 +18,15 @@
#include <Storages/StorageMongoDB.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Common/parseAddress.h>
#include <Common/ErrorCodes.h>
#include <Common/BSONCXXHelper.h>
#include <bsoncxx/json.hpp>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/basic/array.hpp>
#include <bsoncxx/json.hpp>
using bsoncxx::builder::basic::document;
using bsoncxx::builder::basic::array;
using bsoncxx::builder::basic::make_document;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::kvp;
namespace DB
@ -35,10 +35,12 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NOT_IMPLEMENTED;
extern const int TYPE_MISMATCH;
extern const int FAILED_TO_BUILD_MONGODB_QUERY;
}
using BSONCXXHelper::fieldAsBSONValue;
using BSONCXXHelper::fieldAsOID;
StorageMongoDB::StorageMongoDB(
const StorageID & table_id_,
MongoDBConfiguration configuration_,
@ -74,7 +76,7 @@ Pipe StorageMongoDB::read(
sample_block.insert({ column_data.type, column_data.name });
}
auto options = mongocxx::options::find();
auto options = mongocxx::options::find{};
return Pipe(std::make_shared<MongoDBSource>(*configuration.uri, configuration.collection, buildMongoDBQuery(context, options, query_info, sample_block),
std::move(options), sample_block, max_block_size));
@ -149,9 +151,9 @@ MongoDBConfiguration StorageMongoDB::getConfiguration(ASTs engine_args, ContextP
std::string mongoFuncName(const std::string & func)
{
if (func == "equals" || func == "isNull")
if (func == "equals")
return "$eq";
if (func == "notEquals" || func == "isNotNull")
if (func == "notEquals")
return "$ne";
if (func == "greaterThan" || func == "greater")
return "$gt";
@ -172,167 +174,113 @@ std::string mongoFuncName(const std::string & func)
if (func == "or")
return "$or";
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Function '{}' is not supported", func);
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Function '{}' is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.", func);
}
bsoncxx::types::bson_value::value fieldAsBSONValue(const Field & field, const DataTypePtr & type)
bsoncxx::document::value StorageMongoDB::visitWhereFunction(const ContextPtr & context, const FunctionNode * func)
{
switch (type->getTypeId())
if (!func->getArguments().getNodes().empty())
{
case TypeIndex::String:
return field.get<String>();
case TypeIndex::UInt8:
const auto & column = func->getArguments().getNodes().at(0)->as<ColumnNode>();
if (!column)
{
if (isBool(type))
return field.get<UInt8>() != 0;
return static_cast<Int32>(field.get<UInt8>());
}
case TypeIndex::UInt16:
return static_cast<Int32>(field.get<UInt16>());
case TypeIndex::UInt32:
return static_cast<Int32>(field.get<UInt32>());
case TypeIndex::UInt64:
return static_cast<Int64>(field.get<UInt64>());
case TypeIndex::Int8:
return field.get<Int8 &>();
case TypeIndex::Int16:
return field.get<Int16>();
case TypeIndex::Int32:
return field.get<Int32>();
case TypeIndex::Int64:
return field.get<Int64>();
case TypeIndex::Float32:
return field.get<Float32>();
case TypeIndex::Float64:
return field.get<Float64>();
case TypeIndex::Date:
return std::chrono::milliseconds(field.get<UInt16>() * 1000);
case TypeIndex::Date32:
return std::chrono::milliseconds(field.get<Int32>() * 1000);
case TypeIndex::DateTime:
return std::chrono::milliseconds(field.get<UInt32>() * 1000);
case TypeIndex::DateTime64:
return std::chrono::milliseconds(field.get<Decimal64>().getValue());
case TypeIndex::UUID:
return static_cast<String>(formatUUID(field.get<UUID>()));
case TypeIndex::Tuple:
{
auto arr = array();
for (const auto & elem : field.get<Tuple &>())
arr.append(fieldAsBSONValue(elem, applyVisitor(FieldToDataType(), elem)));
return arr.view();
}
case TypeIndex::Array:
{
auto arr = array();
for (const auto & elem : field.get<Array &>())
arr.append(fieldAsBSONValue(elem, applyVisitor(FieldToDataType(), elem)));
return arr.view();
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Fields with type '{}' is not supported", type->getPrettyName());
}
}
void checkLiteral(const Field & field, std::string * func)
{
if (*func == "$in" && field.getType() != Field::Types::Array && field.getType() != Field::Types::Tuple)
*func = "$eq";
if (*func == "$nin" && field.getType() != Field::Types::Array && field.getType() != Field::Types::Tuple)
*func = "$ne";
}
bsoncxx::document::value StorageMongoDB::visitWhereFunction(ContextPtr context, const ASTFunction * func)
{
auto func_name = mongoFuncName(func->name);
if (const auto & explist = func->children.at(0)->as<ASTExpressionList>())
{
if (const auto & identifier = explist->children.at(0)->as<ASTIdentifier>())
{
if (explist->children.size() < 2)
auto arr = bsoncxx::builder::basic::array{};
for (const auto & elem : func->getArguments().getNodes())
{
if (identifier->shortName() == "_id")
throw Exception(ErrorCodes::TYPE_MISMATCH, "oid can't be null");
return make_document(kvp(identifier->shortName(), make_document(kvp(func_name, bsoncxx::types::b_null{}))));
if (const auto & elem_func = elem->as<FunctionNode>())
arr.append(visitWhereFunction(context, elem_func));
}
if (!arr.view().empty())
return make_document(kvp(mongoFuncName(func->getFunctionName()), arr));
}
else
{
if (func->getFunctionName() == "isNull")
return make_document(kvp(
column->getColumnName(),
make_document(kvp(
"$eq",
bsoncxx::types::b_null{}))));
if (func->getFunctionName() == "isNotNull")
return make_document(kvp(
column->getColumnName(),
make_document(kvp(
"$ne",
bsoncxx::types::b_null{}))));
if (func->getFunctionName() == "empty")
return make_document(kvp(
column->getColumnName(),
make_document(kvp(
"$in",
make_array(bsoncxx::types::b_null{}, "")))));
if (func->getFunctionName() == "notEmpty")
return make_document(kvp(
column->getColumnName(),
make_document(kvp(
"$nin",
make_array(bsoncxx::types::b_null{}, "")))));
if (auto result = tryEvaluateConstantExpression(explist->children.at(1), context))
if (func->getArguments().getNodes().size() == 2)
{
checkLiteral(result->first, &func_name);
if (identifier->shortName() == "_id")
auto func_name = mongoFuncName(func->getFunctionName());
const auto & value = func->getArguments().getNodes().at(1);
if (const auto & const_value = value->as<ConstantNode>())
{
switch (result->second->getColumnType())
{
case TypeIndex::String:
return make_document(kvp(identifier->shortName(), make_document(kvp(func_name, bsoncxx::oid(result->first.get<String>())))));
case TypeIndex::Tuple:
{
auto oid_arr = array();
for (const auto & elem : result->first.get<Tuple &>())
{
if (elem.getType() != Field::Types::String)
throw Exception(ErrorCodes::TYPE_MISMATCH, "{} can't be converted to oid", elem.getTypeName());
oid_arr.append(bsoncxx::oid(elem.get<String>()));
}
return make_document(kvp(identifier->shortName(), make_document(kvp(func_name, oid_arr))));
}
case TypeIndex::Array:
{
auto oid_arr = array();
for (const auto & elem : result->first.get<Array &>())
{
if (elem.getType() != Field::Types::String)
throw Exception(ErrorCodes::TYPE_MISMATCH, "{} can't be converted to oid", elem.getTypeName());
oid_arr.append(bsoncxx::oid(elem.get<String>()));
}
return make_document(kvp(identifier->shortName(), make_document(kvp(func_name, oid_arr))));
}
default:
throw Exception(ErrorCodes::TYPE_MISMATCH, "{} can't be converted to oid", result->second->getPrettyName());
}
std::optional<bsoncxx::types::bson_value::value> func_value{};
if (column->getColumnName() == "_id")
func_value = fieldAsOID(const_value->getValue());
else
func_value = fieldAsBSONValue(const_value->getValue(), const_value->getResultType());
if (func_name == "$in" && func_value->view().type() != bsoncxx::v_noabi::type::k_array)
func_name = "$eq";
if (func_name == "$nin" && func_value->view().type() != bsoncxx::v_noabi::type::k_array)
func_name = "$ne";
return make_document(kvp(
column->getColumnName(),
make_document(kvp(
func_name,
std::move(*func_value)))));
}
return make_document(kvp(identifier->shortName(), make_document(kvp(func_name, fieldAsBSONValue(result->first, result->second)))));
if (const auto & func_value = value->as<FunctionNode>())
return make_document(kvp(column->getColumnName(), make_document(kvp(func_name, visitWhereFunction(context, func_value)))));
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only constant expressions are supported in WHERE section");
}
auto arr = array();
for (const auto & child : explist->children)
{
if (const auto & child_func = child->as<ASTFunction>())
arr.append(visitWhereFunction(context, child_func));
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only constant expressions are supported in WHERE section");
}
return make_document(kvp(func_name, std::move(arr)));
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only constant expressions are supported in WHERE section");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Only constant expressions are supported in WHERE section. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
}
bsoncxx::document::value StorageMongoDB::buildMongoDBQuery(ContextPtr context, mongocxx::options::find & options, const SelectQueryInfo & query, const Block & sample_block)
bsoncxx::document::value StorageMongoDB::buildMongoDBQuery(const ContextPtr & context, mongocxx::options::find & options, const SelectQueryInfo & query, const Block & sample_block)
{
document projection{};
for (const auto & column : sample_block)
projection.append(kvp(column.name, 1));
LOG_DEBUG(log, "MongoDB projection has built: '{}'.", bsoncxx::to_json(projection));
options.projection(projection.extract());
if (!context->getSettingsRef().allow_experimental_analyzer)
return make_document();
auto & query_tree = query.query_tree->as<QueryNode &>();
std::cout << query_tree.dumpTree() << std::endl << std::endl;
if (context->getSettingsRef().mongodb_fail_on_query_build_error)
{
if (query_tree.hasHaving())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "HAVING section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "HAVING section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (query_tree.hasGroupBy())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GROUP BY section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "GROUP BY section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (query_tree.hasWindow())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WINDOW section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "WINDOW section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (query_tree.hasPrewhere())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "PREWHERE section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "PREWHERE section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (query_tree.hasLimitBy())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "LIMIT BY section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "LIMIT BY section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (query_tree.hasOffset())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "OFFSET section is not supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "OFFSET section is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
}
if (query_tree.hasLimit())
@ -342,13 +290,13 @@ bsoncxx::document::value StorageMongoDB::buildMongoDBQuery(ContextPtr context, m
if (const auto & limit = query_tree.getLimit()->as<ConstantNode>())
options.limit(limit->getValue().safeGet<UInt64>());
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown LIMIT AST");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Only simple limit is supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
}
catch (...)
catch (Exception & e)
{
if (context->getSettingsRef().mongodb_fail_on_query_build_error)
throw;
tryLogCurrentException(log);
LOG_WARNING(log, "Failed to build MongoDB limit: '{}'.", e.message());
}
}
@ -362,49 +310,50 @@ bsoncxx::document::value StorageMongoDB::buildMongoDBQuery(ContextPtr context, m
if (const auto & sort_node = child->as<SortNode>())
{
if (sort_node->withFill() || sort_node->hasFillTo() || sort_node->hasFillFrom() || sort_node->hasFillStep())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only simple sort is supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "ORDER BY WITH FILL is not supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
if (const auto & column = sort_node->getExpression()->as<ColumnNode>())
sort.append(kvp(column->getColumnName(), sort_node->getSortDirection() == SortDirection::ASCENDING ? 1 : -1));
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only simple sort is supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Only simple sort is supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only simple sort is supported");
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Only simple sort is supported. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
}
LOG_DEBUG(log, "MongoDB sort has built: '{}'", bsoncxx::to_json(sort));
LOG_DEBUG(log, "MongoDB sort has built: '{}'.", bsoncxx::to_json(sort));
options.sort(sort.extract());
}
catch (...)
catch (Exception & e)
{
if (context->getSettingsRef().mongodb_fail_on_query_build_error)
throw;
tryLogCurrentException(log);
LOG_WARNING(log, "Failed to build MongoDB sort: '{}'.", e.message());
}
}
document projection{};
for (const auto & column : sample_block)
projection.append(kvp(column.name, 1));
LOG_DEBUG(log, "MongoDB projection has built: '{}'", bsoncxx::to_json(projection));
options.projection(projection.extract());
if (query_tree.hasWhere())
{
try
{
auto ast = query_tree.getWhere()->toAST();
if (const auto & func = ast->as<ASTFunction>())
std::optional<bsoncxx::document::value> filter{};
if (const auto & func = query_tree.getWhere()->as<FunctionNode>())
filter = visitWhereFunction(context, func);
else if (const auto & const_expr = query_tree.getWhere()->as<ConstantNode>(); const_expr->hasSourceExpression())
{
auto filter = visitWhereFunction(context, func);
LOG_DEBUG(log, "MongoDB query has built: '{}'", bsoncxx::to_json(filter));
return filter;
if (const auto & func_expr = const_expr->getSourceExpression()->as<FunctionNode>())
filter = visitWhereFunction(context, func_expr);
}
if (!filter.has_value())
throw Exception(ErrorCodes::FAILED_TO_BUILD_MONGODB_QUERY, "Only constant expressions are supported in WHERE section. You can disable this error with 'SET mongodb_fail_on_query_build_error=0', but this may cause poor performance, and is highly not recommended.");
LOG_DEBUG(log, "MongoDB query has built: '{}'.", bsoncxx::to_json(*filter));
return std::move(*filter);
}
catch (...)
catch (Exception & e)
{
if (context->getSettingsRef().mongodb_fail_on_query_build_error)
throw;
tryLogCurrentException(log);
LOG_WARNING(log, "Failed to build MongoDB query: '{}'.", e.message());
}
}

View File

@ -21,7 +21,7 @@ struct MongoDBConfiguration
std::unique_ptr<mongocxx::uri> uri;
String collection;
void checkHosts(ContextPtr context) const
void checkHosts(const ContextPtr & context) const
{
// Because domain records will be resolved inside the driver, we can't check IPs for our restrictions.
for (const auto & host : uri->hosts())
@ -60,8 +60,8 @@ public:
size_t num_streams) override;
private:
static bsoncxx::document::value visitWhereFunction(ContextPtr context, const ASTFunction * func);
bsoncxx::document::value buildMongoDBQuery(ContextPtr context, mongocxx::options::find & options, const SelectQueryInfo & query, const Block & sample_block);
static bsoncxx::document::value visitWhereFunction(const ContextPtr & context, const FunctionNode * func);
bsoncxx::document::value buildMongoDBQuery(const ContextPtr & context, mongocxx::options::find & options, const SelectQueryInfo & query, const Block & sample_block);
const MongoDBConfiguration configuration;
LoggerPtr log;

View File

@ -13,7 +13,7 @@ from helpers.cluster import ClickHouseCluster
def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
cluster.add_instance(
"node",
main_configs=[
"configs/named_collections.xml",
@ -35,12 +35,9 @@ def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
)
)
if with_credentials:
return pymongo.MongoClient(
"mongodb://root:clickhouse@localhost:{}".format(started_cluster.mongo_port)
)
return pymongo.MongoClient(
"mongodb://localhost:{}".format(started_cluster.mongo_no_cred_port)
)
return pymongo.MongoClient("mongodb://root:clickhouse@localhost:{}".format(started_cluster.mongo_port))
return pymongo.MongoClient("mongodb://localhost:{}".format(started_cluster.mongo_no_cred_port))
def test_simple_select(started_cluster):
@ -59,15 +56,9 @@ def test_simple_select(started_cluster):
)
assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n"
assert (
node.query("SELECT sum(key) FROM simple_mongo_table")
== str(sum(range(0, 100))) + "\n"
)
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n"
assert (
node.query("SELECT data from simple_mongo_table where key = 42")
== hex(42 * 42) + "\n"
)
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()
@ -88,15 +79,9 @@ def test_simple_select_uri(started_cluster):
)
assert node.query("SELECT COUNT() FROM simple_table_uri") == "100\n"
assert (
node.query("SELECT sum(key) FROM simple_table_uri")
== str(sum(range(0, 100))) + "\n"
)
assert node.query("SELECT sum(key) FROM simple_table_uri") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from simple_table_uri where key = 42") == hex(42 * 42) + "\n"
assert (
node.query("SELECT data from simple_table_uri where key = 42")
== hex(42 * 42) + "\n"
)
node.query("DROP TABLE simple_table_uri")
simple_mongo_table.drop()
@ -120,15 +105,9 @@ def test_simple_select_from_view(started_cluster):
)
assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n"
assert (
node.query("SELECT sum(key) FROM simple_mongo_table")
== str(sum(range(0, 100))) + "\n"
)
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n"
assert (
node.query("SELECT data from simple_mongo_table where key = 42")
== hex(42 * 42) + "\n"
)
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table_view.drop()
simple_mongo_table.drop()
@ -227,82 +206,26 @@ def test_arrays(started_cluster):
assert node.query("SELECT COUNT() FROM arrays_mongo_table") == "100\n"
for column_name in ["arr_int64", "arr_int32", "arr_int16", "arr_int8"]:
assert (
node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42")
== "[-43,-44,-45]\n"
)
assert node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[-43,-44,-45]\n"
for column_name in ["arr_uint64", "arr_uint32", "arr_uint16", "arr_uint8"]:
assert (
node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42")
== "[43,44,45]\n"
)
assert node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[43,44,45]\n"
for column_name in ["arr_float32", "arr_float64"]:
assert (
node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42")
== "[43.125,44.5,45.75]\n"
)
assert node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[43.125,44.5,45.75]\n"
assert (
node.query(f"SELECT arr_date FROM arrays_mongo_table WHERE key = 42")
== "['2002-10-27','2024-01-08']\n"
)
assert (
node.query(f"SELECT arr_date32 FROM arrays_mongo_table WHERE key = 42")
== "['2002-10-27','2024-01-08']\n"
)
assert (
node.query(f"SELECT arr_datetime FROM arrays_mongo_table WHERE key = 42")
== "['2023-03-31 06:03:12','1999-02-28 12:46:34']\n"
)
assert (
node.query(f"SELECT arr_datetime64 FROM arrays_mongo_table WHERE key = 42")
== "['2023-03-31 06:03:12.000','1999-02-28 12:46:34.000']\n"
)
assert (
node.query(f"SELECT arr_string FROM arrays_mongo_table WHERE key = 42")
== "['43','44','45']\n"
)
assert (
node.query(f"SELECT arr_uuid FROM arrays_mongo_table WHERE key = 42")
== "['f0e77736-91d1-48ce-8f01-15123ca1c7ed','93376a07-c044-4281-a76e-ad27cf6973c5']\n"
)
assert (
node.query(f"SELECT arr_arr_bool FROM arrays_mongo_table WHERE key = 42")
== "[[true,false,true],[true],[false],[false]]\n"
)
assert (
node.query(
f"SELECT arr_arr_bool_nullable FROM arrays_mongo_table WHERE key = 42"
)
== "[[true,false,true],[true,NULL],[NULL],[false]]\n"
)
assert (
node.query(f"SELECT arr_empty FROM arrays_mongo_table WHERE key = 42") == "[]\n"
)
assert (
node.query(f"SELECT arr_null FROM arrays_mongo_table WHERE key = 42") == "[]\n"
)
assert (
node.query(f"SELECT arr_arr_null FROM arrays_mongo_table WHERE key = 42")
== "[]\n"
)
assert (
node.query(f"SELECT arr_nullable FROM arrays_mongo_table WHERE key = 42")
== "[]\n"
)
assert node.query(f"SELECT arr_date FROM arrays_mongo_table WHERE key = 42") == "['2002-10-27','2024-01-08']\n"
assert node.query(f"SELECT arr_date32 FROM arrays_mongo_table WHERE key = 42") == "['2002-10-27','2024-01-08']\n"
assert node.query(f"SELECT arr_datetime FROM arrays_mongo_table WHERE key = 42") == "['2023-03-31 06:03:12','1999-02-28 12:46:34']\n"
assert node.query(f"SELECT arr_datetime64 FROM arrays_mongo_table WHERE key = 42") == "['2023-03-31 06:03:12.000','1999-02-28 12:46:34.000']\n"
assert node.query(f"SELECT arr_string FROM arrays_mongo_table WHERE key = 42") == "['43','44','45']\n"
assert node.query(f"SELECT arr_uuid FROM arrays_mongo_table WHERE key = 42") == "['f0e77736-91d1-48ce-8f01-15123ca1c7ed','93376a07-c044-4281-a76e-ad27cf6973c5']\n"
assert node.query(f"SELECT arr_arr_bool FROM arrays_mongo_table WHERE key = 42") == "[[true,false,true],[true],[false],[false]]\n"
assert node.query(f"SELECT arr_arr_bool_nullable FROM arrays_mongo_table WHERE key = 42") == "[[true,false,true],[true,NULL],[NULL],[false]]\n"
assert node.query(f"SELECT arr_empty FROM arrays_mongo_table WHERE key = 42") == "[]\n"
assert node.query(f"SELECT arr_null FROM arrays_mongo_table WHERE key = 42") == "[]\n"
assert node.query(f"SELECT arr_arr_null FROM arrays_mongo_table WHERE key = 42") == "[]\n"
assert node.query(f"SELECT arr_nullable FROM arrays_mongo_table WHERE key = 42") == "[]\n"
node.query("DROP TABLE arrays_mongo_table")
arrays_mongo_table.drop()
@ -324,15 +247,9 @@ def test_complex_data_type(started_cluster):
)
assert node.query("SELECT COUNT() FROM incomplete_mongo_table") == "100\n"
assert (
node.query("SELECT sum(key) FROM incomplete_mongo_table")
== str(sum(range(0, 100))) + "\n"
)
assert node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + "\n"
assert (
node.query("SELECT data from incomplete_mongo_table where key = 42")
== hex(42 * 42) + "\n"
)
node.query("DROP TABLE incomplete_mongo_table")
incomplete_mongo_table.drop()
@ -350,15 +267,11 @@ def test_secure_connection(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo_secure:27017', 'test', 'simple_table', 'root', 'clickhouse', 'tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true')"
)
assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n"
assert (
node.query("SELECT sum(key) FROM simple_mongo_table")
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query("SELECT data from simple_mongo_table where key = 42")
== hex(42 * 42) + "\n"
)
assert node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n"
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()
@ -377,6 +290,7 @@ def test_secure_connection_with_validation(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo_secure:27017', 'test', 'simple_table', 'root', 'clickhouse', 'tls=true')"
)
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM simple_mongo_table")
@ -396,15 +310,11 @@ def test_secure_connection_uri(started_cluster):
node.query(
"CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true', 'test_secure_connection_uri')"
)
assert node.query("SELECT COUNT() FROM test_secure_connection_uri") == "100\n"
assert (
node.query("SELECT sum(key) FROM test_secure_connection_uri")
== str(sum(range(0, 100))) + "\n"
)
assert (
node.query("SELECT data from test_secure_connection_uri where key = 42")
== hex(42 * 42) + "\n"
)
assert node.query("SELECT sum(key) FROM test_secure_connection_uri") == str(sum(range(0, 100))) + "\n"
assert node.query("SELECT data from test_secure_connection_uri where key = 42") == hex(42 * 42) + "\n"
node.query("DROP TABLE test_secure_connection_uri")
simple_mongo_table.drop()
@ -422,10 +332,12 @@ def test_secure_connection_uri_with_validation(started_cluster):
node.query(
"CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true', 'test_secure_connection_uri')"
)
with pytest.raises(QueryRuntimeException):
node.query("SELECT COUNT() FROM test_secure_connection_uri")
node.query("DROP TABLE test_secure_connection_uri")
simple_mongo_table.drop()
@ -443,7 +355,9 @@ def test_predefined_connection_configuration(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB(mongo1)"
)
assert node.query("SELECT count() FROM simple_mongo_table") == "100\n"
node.query("DROP TABLE simple_mongo_table")
simple_mongo_table.drop()
@ -462,7 +376,9 @@ def test_predefined_connection_configuration_uri(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_table_uri(key UInt64, data String) ENGINE = MongoDB(mongo1_uri)"
)
assert node.query("SELECT count() FROM simple_table_uri") == "100\n"
node.query("DROP TABLE simple_table_uri")
simple_mongo_table.drop()
@ -480,7 +396,9 @@ def test_no_credentials(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_table(key UInt64, data String) ENGINE = MongoDB('mongo_no_cred:27017', 'test', 'simple_table', '', '')"
)
assert node.query("SELECT count() FROM simple_table") == "100\n"
node.query("DROP TABLE simple_table")
simple_mongo_table.drop()
@ -498,7 +416,9 @@ def test_no_credentials_uri(started_cluster):
node.query(
"CREATE OR REPLACE TABLE simple_table_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://mongo_no_cred:27017/test', 'simple_table_uri')"
)
assert node.query("SELECT count() FROM simple_table_uri") == "100\n"
node.query("DROP TABLE simple_table_uri")
simple_mongo_table.drop()
@ -561,14 +481,9 @@ def test_missing_columns(started_cluster):
not_exists_nullable Nullable(Int64)
) ENGINE = MongoDB(mongo1)"""
)
assert (
node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)")
== "10\n"
)
assert (
node.query("SELECT count() FROM simple_mongo_table WHERE isNull(not_exists)")
== "0\n"
)
assert node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)") == "10\n"
assert node.query("SELECT count() FROM simple_mongo_table WHERE isNull(not_exists)") == "0\n"
node.query("DROP TABLE IF EXISTS simple_mongo_table")
simple_mongo_table.drop()
@ -638,17 +553,10 @@ def test_string_casting(started_cluster):
assert node.query("SELECT k_doubleP FROM strings_table") == "6.660000\n"
assert node.query("SELECT k_doubleN FROM strings_table") == "-6.660000\n"
assert node.query("SELECT k_date FROM strings_table") == "1999-02-28 00:00:00\n"
assert (
node.query("SELECT k_timestamp FROM strings_table") == "1999-02-28 12:46:34\n"
)
assert node.query("SELECT k_timestamp FROM strings_table") == "1999-02-28 12:46:34\n"
assert node.query("SELECT k_string FROM strings_table") == "ClickHouse\n"
assert (
json.loads(node.query("SELECT k_document FROM strings_table"))
== data["k_document"]
)
assert (
json.loads(node.query("SELECT k_array FROM strings_table")) == data["k_array"]
)
assert json.dumps(json.loads(node.query("SELECT k_document FROM strings_table"))) == json.dumps(data["k_document"])
assert json.dumps(json.loads(node.query("SELECT k_array FROM strings_table"))) == json.dumps(data["k_array"])
node.query("DROP TABLE strings_table")
string_mongo_table.drop()
@ -678,10 +586,7 @@ def test_dates_casting(started_cluster):
)
assert node.query("SELECT COUNT() FROM dates_table") == "1\n"
assert (
node.query("SELECT * FROM dates_table")
== "1999-02-28 11:23:16\t1999-02-28 11:23:16.000\t1999-02-28\t1999-02-28\n"
)
assert node.query("SELECT * FROM dates_table") == "1999-02-28 11:23:16\t1999-02-28 11:23:16.000\t1999-02-28\t1999-02-28\n"
node.query("DROP TABLE dates_table")
dates_mongo_table.drop()
@ -717,26 +622,10 @@ def test_order_by(started_cluster):
assert node.query("SELECT COUNT() FROM sort_table") == "900\n"
assert node.query("SELECT keyInt FROM sort_table ORDER BY keyInt LIMIT 1") == "1\n"
assert (
node.query("SELECT keyInt FROM sort_table ORDER BY keyInt DESC LIMIT 1")
== "30\n"
)
assert (
node.query(
"SELECT keyInt, keyFloat FROM sort_table ORDER BY keyInt, keyFloat DESC LIMIT 1"
)
== "1\t1.03\n"
)
assert (
node.query(
"SELECT keyDateTime FROM sort_table ORDER BY keyDateTime DESC LIMIT 1"
)
== "1999-12-30 11:23:16\n"
)
assert (
node.query("SELECT keyDate FROM sort_table ORDER BY keyDate DESC LIMIT 1")
== "1999-12-30\n"
)
assert node.query("SELECT keyInt FROM sort_table ORDER BY keyInt DESC LIMIT 1") == "30\n"
assert node.query("SELECT keyInt, keyFloat FROM sort_table ORDER BY keyInt, keyFloat DESC LIMIT 1") == "1\t1.03\n"
assert node.query("SELECT keyDateTime FROM sort_table ORDER BY keyDateTime DESC LIMIT 1") == "1999-12-30 11:23:16\n"
assert node.query("SELECT keyDate FROM sort_table ORDER BY keyDate DESC LIMIT 1") == "1999-12-30\n"
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM sort_table ORDER BY keyInt WITH FILL")
@ -782,122 +671,47 @@ def test_where(started_cluster):
keyDateTime DateTime,
keyDate Date,
keyNull Nullable(UInt8),
keyNotExists Int
keyNotExists Nullable(Int)
) ENGINE = MongoDB('mongo1:27017', 'test', 'where_table', 'root', 'clickhouse')"""
)
assert node.query("SELECT COUNT() FROM where_table") == "4\n"
assert (
node.query("SELECT keyString FROM where_table WHERE id = '11'") == "1string\n"
)
assert (
node.query(
"SELECT keyString FROM where_table WHERE id != '11' ORDER BY keyFloat"
)
== "2string\n1string\n2string\n"
)
assert (
node.query(
"SELECT keyString FROM where_table WHERE id = '11' AND keyString = '1string'"
)
== "1string\n"
)
assert (
node.query("SELECT id FROM where_table WHERE keyInt = 1 AND keyFloat = 1.001")
== "11\n"
)
assert (
node.query("SELECT id FROM where_table WHERE keyInt = 0 OR keyFloat = 1.001")
== "11\n"
)
assert node.query("SELECT keyString FROM where_table WHERE id = '11'") == "1string\n"
assert node.query("SELECT keyString FROM where_table WHERE id != '11' ORDER BY keyFloat") == "2string\n1string\n2string\n"
assert node.query("SELECT keyString FROM where_table WHERE id = '11' AND keyString = '1string'") == "1string\n"
assert node.query("SELECT id FROM where_table WHERE keyInt = 1 AND keyFloat = 1.001") == "11\n"
assert node.query("SELECT id FROM where_table WHERE keyInt = 0 OR keyFloat = 1.001") == "11\n"
assert (
node.query("SELECT id FROM where_table WHERE keyInt BETWEEN 1 AND 2")
== "11\n12\n21\n22\n"
)
assert node.query("SELECT id FROM where_table WHERE keyInt BETWEEN 1 AND 2") == "11\n12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE keyInt > 10") == ""
assert (
node.query("SELECT id FROM where_table WHERE keyInt < 10.1 ORDER BY keyFloat")
== "11\n12\n21\n22\n"
)
assert node.query("SELECT id FROM where_table WHERE keyInt < 10.1 ORDER BY keyFloat") == "11\n12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE id IN ('11')") == "11\n"
assert node.query("SELECT id FROM where_table WHERE id IN ['11']") == "11\n"
assert node.query("SELECT id FROM where_table WHERE id IN ('11', 100)") == "11\n"
assert (
node.query(
"SELECT id FROM where_table WHERE id IN ('11', '22') ORDER BY keyFloat"
)
== "11\n22\n"
)
assert (
node.query(
"SELECT id FROM where_table WHERE id IN ['11', '22'] ORDER BY keyFloat"
)
== "11\n22\n"
)
assert node.query("SELECT id FROM where_table WHERE id IN ('11', '22') ORDER BY keyFloat") == "11\n22\n"
assert node.query("SELECT id FROM where_table WHERE id IN ['11', '22'] ORDER BY keyFloat") == "11\n22\n"
assert (
node.query(
"SELECT id FROM where_table WHERE id NOT IN ('11') ORDER BY keyFloat"
)
== "12\n21\n22\n"
)
assert (
node.query(
"SELECT id FROM where_table WHERE id NOT IN ['11'] ORDER BY keyFloat"
)
== "12\n21\n22\n"
)
assert (
node.query(
"SELECT id FROM where_table WHERE id NOT IN ('11', 100) ORDER BY keyFloat"
)
== "12\n21\n22\n"
)
assert (
node.query("SELECT id FROM where_table WHERE id NOT IN ('11') AND id IN ('12')")
== "12\n"
)
assert (
node.query("SELECT id FROM where_table WHERE id NOT IN ['11'] AND id IN ('12')")
== "12\n"
)
assert node.query("SELECT id FROM where_table WHERE id NOT IN ('11') ORDER BY keyFloat") == "12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE id NOT IN ['11'] ORDER BY keyFloat") == "12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE id NOT IN ('11', 100) ORDER BY keyFloat") == "12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE id NOT IN ('11') AND id IN ('12')") == "12\n"
assert node.query("SELECT id FROM where_table WHERE id NOT IN ['11'] AND id IN ('12')") == "12\n"
with pytest.raises(QueryRuntimeException):
assert (
node.query(
"SELECT id FROM where_table WHERE id NOT IN ['11', 100] ORDER BY keyFloat"
)
== "12\n21\n22\n"
)
assert node.query("SELECT id FROM where_table WHERE id NOT IN ['11', 100] ORDER BY keyFloat") == "12\n21\n22\n"
assert node.query("SELECT id FROM where_table WHERE keyDateTime > now()") == ""
assert (
node.query(
"SELECT keyInt FROM where_table WHERE keyDateTime < now() AND keyString = '1string' AND keyInt IS NOT NULL ORDER BY keyInt"
)
== "1\n2\n"
)
assert node.query("SELECT keyInt FROM where_table WHERE keyDateTime < now() AND keyString = '1string' AND keyInt IS NOT NULL ORDER BY keyInt") == "1\n2\n"
assert node.query("SELECT count() FROM where_table WHERE isNotNull(id)") == "4\n"
assert (
node.query("SELECT count() FROM where_table WHERE isNotNull(keyNull)") == "0\n"
)
assert node.query("SELECT count() FROM where_table WHERE isNotNull(keyNull)") == "0\n"
assert node.query("SELECT count() FROM where_table WHERE isNull(keyNull)") == "4\n"
assert (
node.query("SELECT count() FROM where_table WHERE isNotNull(keyNotExists)")
== "0\n"
)
assert (
node.query("SELECT count() FROM where_table WHERE isNull(keyNotExists)")
== "0\n"
)
assert node.query("SELECT count() FROM where_table WHERE isNotNull(keyNotExists)") == "0\n"
assert node.query("SELECT count() FROM where_table WHERE isNull(keyNotExists)") == "4\n"
assert node.query("SELECT count() FROM where_table WHERE keyNotExists = 0") == "0\n"
assert (
node.query("SELECT count() FROM where_table WHERE keyNotExists != 0") == "0\n"
)
assert node.query("SELECT count() FROM where_table WHERE keyNotExists != 0") == "0\n"
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM where_table WHERE keyInt = keyFloat")
@ -940,6 +754,7 @@ def test_defaults(started_cluster):
) ENGINE = MongoDB('mongo1:27017', 'test', 'defaults_table', 'root', 'clickhouse')
"""
)
assert node.query("SELECT COUNT() FROM defaults_table") == "1\n"
assert (
@ -990,6 +805,7 @@ def test_nulls(started_cluster):
) ENGINE = MongoDB('mongo1:27017', 'test', 'nulls_table', 'root', 'clickhouse')
"""
)
assert node.query("SELECT COUNT() FROM nulls_table") == "1\n"
assert (
@ -1034,36 +850,17 @@ def test_oid(started_cluster):
) ENGINE = MongoDB('mongo1:27017', 'test', 'oid_table', 'root', 'clickhouse')
"""
)
assert node.query("SELECT COUNT() FROM oid_table") == "5\n"
assert node.query(f"SELECT key FROM oid_table WHERE _id = '{oid[0]}'") == "a\n"
assert (
node.query(f"SELECT * FROM oid_table WHERE _id = '{oid[2]}'")
== f"{oid[2]}\tc\n"
)
assert node.query(f"SELECT * FROM oid_table WHERE _id = '{oid[2]}'") == f"{oid[2]}\tc\n"
assert node.query(f"SELECT COUNT() FROM oid_table WHERE _id != '{oid[0]}'") == "4\n"
assert (
node.query(
f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}', '{oid[1]}') ORDER BY key"
)
== "a\nb\n"
)
assert (
node.query(
f"SELECT key FROM oid_table WHERE _id in ['{oid[0]}', '{oid[1]}'] ORDER BY key"
)
== "a\nb\n"
)
assert (
node.query(f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}') ORDER BY key")
== "a\n"
)
assert (
node.query(f"SELECT key FROM oid_table WHERE _id in ['{oid[1]}'] ORDER BY key")
== "b\n"
)
assert node.query(f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}', '{oid[1]}') ORDER BY key") == "a\nb\n"
assert node.query(f"SELECT key FROM oid_table WHERE _id in ['{oid[0]}', '{oid[1]}'] ORDER BY key") == "a\nb\n"
assert node.query(f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}') ORDER BY key") == "a\n"
assert node.query(f"SELECT key FROM oid_table WHERE _id in ['{oid[1]}'] ORDER BY key") == "b\n"
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM oid_table WHERE _id = 'invalidOID'")
@ -1097,10 +894,8 @@ def test_uuid(started_cluster):
) ENGINE = MongoDB('mongo1:27017', 'test', 'uuid_table', 'root', 'clickhouse')
"""
)
assert (
node.query(f"SELECT kUUID FROM uuid_table WHERE isValid = 1")
== "f0e77736-91d1-48ce-8f01-15123ca1c7ed\n"
)
assert node.query(f"SELECT kUUID FROM uuid_table WHERE isValid = 1") == "f0e77736-91d1-48ce-8f01-15123ca1c7ed\n"
with pytest.raises(QueryRuntimeException):
node.query("SELECT * FROM uuid_table WHERE isValid = 0")
@ -1126,19 +921,11 @@ def test_no_fail_on_unsupported_clauses(started_cluster):
) ENGINE = MongoDB('mongo1:27017', 'test', 'unsupported_clauses', 'root', 'clickhouse')
"""
)
node.query(
f"SELECT * FROM unsupported_clauses WHERE a > rand() SETTINGS mongodb_fail_on_query_build_error = false"
)
node.query(
f"SELECT * FROM unsupported_clauses WHERE a / 1000 > 0 SETTINGS mongodb_fail_on_query_build_error = false"
)
node.query(
f"SELECT * FROM unsupported_clauses WHERE toFloat64(a) < 6.66 > rand() SETTINGS mongodb_fail_on_query_build_error = false"
)
node.query(
f"SELECT * FROM unsupported_clauses ORDER BY a, b LIMIT 2 BY a SETTINGS mongodb_fail_on_query_build_error = false"
)
node.query(f"SELECT * FROM unsupported_clauses WHERE a > rand() SETTINGS mongodb_fail_on_query_build_error = 0")
node.query(f"SELECT * FROM unsupported_clauses WHERE a / 1000 > 0 SETTINGS mongodb_fail_on_query_build_error = 0")
node.query(f"SELECT * FROM unsupported_clauses WHERE toFloat64(a) < 6.66 > rand() SETTINGS mongodb_fail_on_query_build_error = 0")
node.query(f"SELECT * FROM unsupported_clauses ORDER BY a, b LIMIT 2 BY a SETTINGS mongodb_fail_on_query_build_error = 0")
node.query("DROP TABLE unsupported_clauses")
unsupported_clauses_table.drop()

View File

@ -10,7 +10,7 @@ from helpers.cluster import ClickHouseCluster
def started_cluster(request):
try:
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
cluster.add_instance(
"node",
with_mongo=True,
main_configs=[
@ -32,12 +32,9 @@ def get_mongo_connection(started_cluster, secure=False, with_credentials=True):
)
)
if with_credentials:
return pymongo.MongoClient(
"mongodb://root:clickhouse@localhost:{}".format(started_cluster.mongo_port)
)
return pymongo.MongoClient(
"mongodb://localhost:{}".format(started_cluster.mongo_no_cred_port)
)
return pymongo.MongoClient("mongodb://root:clickhouse@localhost:{}".format(started_cluster.mongo_port))
return pymongo.MongoClient("mongodb://localhost:{}".format(started_cluster.mongo_no_cred_port))
def test_simple_select(started_cluster):

View File

@ -0,0 +1,6 @@
CREATE TABLE default.mongodb_uri_password_masking (`_id` String) ENGINE = MongoDB(\'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\', \'test_clickhouse\')
MongoDB(\'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\', \'test_clickhouse\')
CREATE DICTIONARY default.mongodb_dictionary_uri_password_masking (`_id` String) PRIMARY KEY _id SOURCE(MONGODB(URI \'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\' COLLECTION \'test_clickhouse\')) LIFETIME(MIN 0 MAX 0) LAYOUT(FLAT())
CREATE TABLE default.mongodb_password_masking (`_id` String) ENGINE = MongoDB(\'127.0.0.1:27017\', \'example\', \'test_clickhouse\', \'testuser\', \'[HIDDEN]\')
MongoDB(\'127.0.0.1:27017\', \'example\', \'test_clickhouse\', \'testuser\', \'[HIDDEN]\')
CREATE DICTIONARY default.mongodb_dictionary_password_masking (`_id` String) PRIMARY KEY _id SOURCE(MONGODB(HOST \'127.0.0.1\' PORT 27017 USER \'testuser\' PASSWORD \'[HIDDEN]\' DB \'example\' COLLECTION \'test_clickhouse\' OPTIONS \'ssl=true\')) LIFETIME(MIN 0 MAX 0) LAYOUT(FLAT())

View File

@ -0,0 +1,42 @@
create or replace table mongodb_uri_password_masking (_id String)
engine = MongoDB('mongodb://testuser:mypassword@127.0.0.1:27017/example', 'test_clickhouse');
select replaceAll(create_table_query, currentDatabase(), 'default') from system.tables
where table = 'mongodb_uri_password_masking' and database = currentDatabase();
select replaceAll(engine_full, currentDatabase(), 'default') from system.tables
where table = 'mongodb_uri_password_masking' and database = currentDatabase();
drop table if exists mongodb_uri_password_masking;
create or replace dictionary mongodb_dictionary_uri_password_masking (_id String)
primary key _id
source(MONGODB(uri 'mongodb://testuser:mypassword@127.0.0.1:27017/example' collection 'test_clickhouse'))
layout(FLAT())
lifetime(0);
select replaceAll(create_table_query, currentDatabase(), 'default') from system.tables
where table = 'mongodb_dictionary_uri_password_masking' and database = currentDatabase();
drop dictionary if exists mongodb_dictionary_uri_password_masking;
create table mongodb_password_masking (_id String)
engine = MongoDB('127.0.0.1:27017', 'example', 'test_clickhouse', 'testuser', 'mypassword');
select replaceAll(create_table_query, currentDatabase(), 'default') from system.tables
where table = 'mongodb_password_masking' and database = currentDatabase();
select replaceAll(engine_full, currentDatabase(), 'default') from system.tables
where table = 'mongodb_password_masking' and database = currentDatabase();
drop table if exists mongodb_password_masking;
create or replace dictionary mongodb_dictionary_password_masking (_id String)
primary key _id
source(MONGODB(
host '127.0.0.1'
port 27017
user 'testuser'
password 'mypassword'
db 'example'
collection 'test_clickhouse'
options 'ssl=true'
))
layout(FLAT())
lifetime(0);
select replaceAll(create_table_query, currentDatabase(), 'default') from system.tables
where table = 'mongodb_dictionary_password_masking' and database = currentDatabase();
drop dictionary if exists mongodb_dictionary_password_masking;