import datetime import json import bson import pymongo import pytest from helpers.client import QueryRuntimeException from helpers.cluster import ClickHouseCluster @pytest.fixture(scope="module") def started_cluster(request): try: cluster = ClickHouseCluster(__file__) cluster.add_instance( "node", main_configs=[ "configs/named_collections.xml", "configs/feature_flag.xml", ], user_configs=["configs/users.xml"], with_mongo=True, ) cluster.start() yield cluster finally: cluster.shutdown() def get_mongo_connection(started_cluster, secure=False, with_credentials=True): if secure: return pymongo.MongoClient( "mongodb://root:clickhouse@localhost:{}/?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true".format( started_cluster.mongo_secure_port ) ) if with_credentials: return pymongo.MongoClient( "mongodb://root:clickhouse@localhost:{}".format(started_cluster.mongo_port) ) return pymongo.MongoClient( "mongodb://localhost:{}".format(started_cluster.mongo_no_cred_port) ) def test_simple_select(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'root', 'clickhouse')" ) assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" assert ( node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE simple_mongo_table") simple_mongo_table.drop() def test_simple_select_uri(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table_uri"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_table_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo1:27017/test', 'simple_table_uri')" ) assert node.query("SELECT COUNT() FROM simple_table_uri") == "100\n" assert ( node.query("SELECT sum(key) FROM simple_table_uri") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from simple_table_uri where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE simple_table_uri") simple_mongo_table.drop() def test_simple_select_from_view(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) simple_mongo_table_view = db.create_collection( "simple_table_view", viewOn="simple_table" ) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table_view', 'root', 'clickhouse')" ) assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" assert ( node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE simple_mongo_table") simple_mongo_table_view.drop() simple_mongo_table.drop() def test_arrays(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) arrays_mongo_table = db["arrays_table"] data = [] for i in range(0, 100): data.append( { "key": i, "arr_int64": [-(i + 1), -(i + 2), -(i + 3)], "arr_int32": [-(i + 1), -(i + 2), -(i + 3)], "arr_int16": [-(i + 1), -(i + 2), -(i + 3)], "arr_int8": [-(i + 1), -(i + 2), -(i + 3)], "arr_uint64": [i + 1, i + 2, i + 3], "arr_uint32": [i + 1, i + 2, i + 3], "arr_uint16": [i + 1, i + 2, i + 3], "arr_uint8": [i + 1, i + 2, i + 3], "arr_float32": [i + 1.125, i + 2.5, i + 3.750], "arr_float64": [i + 1.125, i + 2.5, i + 3.750], "arr_date": [ datetime.datetime(2002, 10, 27), datetime.datetime(2024, 1, 8, 23, 59, 59), ], "arr_date32": [ datetime.datetime(2002, 10, 27), datetime.datetime(2024, 1, 8, 23, 59, 59), ], "arr_datetime": [ datetime.datetime(2023, 3, 31, 6, 3, 12), datetime.datetime(1999, 2, 28, 12, 46, 34), ], "arr_datetime64": [ datetime.datetime(2023, 3, 31, 6, 3, 12), datetime.datetime(1999, 2, 28, 12, 46, 34), ], "arr_string": [str(i + 1), str(i + 2), str(i + 3)], "arr_uuid": [ "f0e77736-91d1-48ce-8f01-15123ca1c7ed", "93376a07-c044-4281-a76e-ad27cf6973c5", ], "arr_arr_bool": [ [True, False, True], [True], [None], [False], ], "arr_arr_bool_nullable": [ [True, False, True], [True, None], [None], [False], ], "arr_empty": [], "arr_null": None, "arr_nullable": None, } ) arrays_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE arrays_mongo_table(" "key UInt64," "arr_int64 Array(Int64)," "arr_int32 Array(Int32)," "arr_int16 Array(Int16)," "arr_int8 Array(Int8)," "arr_uint64 Array(UInt64)," "arr_uint32 Array(UInt32)," "arr_uint16 Array(UInt16)," "arr_uint8 Array(UInt8)," "arr_float32 Array(Float32)," "arr_float64 Array(Float64)," "arr_date Array(Date)," "arr_date32 Array(Date32)," "arr_datetime Array(DateTime)," "arr_datetime64 Array(DateTime64)," "arr_string Array(String)," "arr_uuid Array(UUID)," "arr_arr_bool Array(Array(Bool))," "arr_arr_bool_nullable Array(Array(Nullable(Bool)))," "arr_empty Array(UInt64)," "arr_null Array(UInt64)," "arr_arr_null Array(Array(UInt64))," "arr_nullable Array(Nullable(UInt64))" ") ENGINE = MongoDB('mongo1:27017', 'test', 'arrays_table', 'root', 'clickhouse')" ) assert node.query("SELECT COUNT() FROM arrays_mongo_table") == "100\n" for column_name in ["arr_int64", "arr_int32", "arr_int16", "arr_int8"]: assert ( node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[-43,-44,-45]\n" ) for column_name in ["arr_uint64", "arr_uint32", "arr_uint16", "arr_uint8"]: assert ( node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[43,44,45]\n" ) for column_name in ["arr_float32", "arr_float64"]: assert ( node.query(f"SELECT {column_name} FROM arrays_mongo_table WHERE key = 42") == "[43.125,44.5,45.75]\n" ) assert ( node.query(f"SELECT arr_date FROM arrays_mongo_table WHERE key = 42") == "['2002-10-27','2024-01-08']\n" ) assert ( node.query(f"SELECT arr_date32 FROM arrays_mongo_table WHERE key = 42") == "['2002-10-27','2024-01-08']\n" ) assert ( node.query(f"SELECT arr_datetime FROM arrays_mongo_table WHERE key = 42") == "['2023-03-31 06:03:12','1999-02-28 12:46:34']\n" ) assert ( node.query(f"SELECT arr_datetime64 FROM arrays_mongo_table WHERE key = 42") == "['2023-03-31 06:03:12.000','1999-02-28 12:46:34.000']\n" ) assert ( node.query(f"SELECT arr_string FROM arrays_mongo_table WHERE key = 42") == "['43','44','45']\n" ) assert ( node.query(f"SELECT arr_uuid FROM arrays_mongo_table WHERE key = 42") == "['f0e77736-91d1-48ce-8f01-15123ca1c7ed','93376a07-c044-4281-a76e-ad27cf6973c5']\n" ) assert ( node.query(f"SELECT arr_arr_bool FROM arrays_mongo_table WHERE key = 42") == "[[true,false,true],[true],[false],[false]]\n" ) assert ( node.query( f"SELECT arr_arr_bool_nullable FROM arrays_mongo_table WHERE key = 42" ) == "[[true,false,true],[true,NULL],[NULL],[false]]\n" ) assert ( node.query(f"SELECT arr_empty FROM arrays_mongo_table WHERE key = 42") == "[]\n" ) assert ( node.query(f"SELECT arr_null FROM arrays_mongo_table WHERE key = 42") == "[]\n" ) assert ( node.query(f"SELECT arr_arr_null FROM arrays_mongo_table WHERE key = 42") == "[]\n" ) assert ( node.query(f"SELECT arr_nullable FROM arrays_mongo_table WHERE key = 42") == "[]\n" ) node.query("DROP TABLE arrays_mongo_table") arrays_mongo_table.drop() def test_complex_data_type(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) incomplete_mongo_table = db["complex_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i), "dict": {"a": i, "b": str(i)}}) incomplete_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE incomplete_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo1:27017', 'test', 'complex_table', 'root', 'clickhouse')" ) assert node.query("SELECT COUNT() FROM incomplete_mongo_table") == "100\n" assert ( node.query("SELECT sum(key) FROM incomplete_mongo_table") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from incomplete_mongo_table where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE incomplete_mongo_table") incomplete_mongo_table.drop() def test_secure_connection(started_cluster): mongo_connection = get_mongo_connection(started_cluster, secure=True) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo_secure:27017', 'test', 'simple_table', 'root', 'clickhouse', 'tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true')" ) assert node.query("SELECT COUNT() FROM simple_mongo_table") == "100\n" assert ( node.query("SELECT sum(key) FROM simple_mongo_table") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from simple_mongo_table where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE simple_mongo_table") simple_mongo_table.drop() def test_secure_connection_with_validation(started_cluster): mongo_connection = get_mongo_connection(started_cluster, secure=True) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB('mongo_secure:27017', 'test', 'simple_table', 'root', 'clickhouse', 'tls=true')" ) with pytest.raises(QueryRuntimeException): node.query("SELECT COUNT() FROM simple_mongo_table") node.query("DROP TABLE simple_mongo_table") simple_mongo_table.drop() def test_secure_connection_uri(started_cluster): mongo_connection = get_mongo_connection(started_cluster, secure=True) db = mongo_connection["test"] simple_mongo_table = db["test_secure_connection_uri"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true&tlsAllowInvalidCertificates=true&tlsAllowInvalidHostnames=true', 'test_secure_connection_uri')" ) assert node.query("SELECT COUNT() FROM test_secure_connection_uri") == "100\n" assert ( node.query("SELECT sum(key) FROM test_secure_connection_uri") == str(sum(range(0, 100))) + "\n" ) assert ( node.query("SELECT data from test_secure_connection_uri where key = 42") == hex(42 * 42) + "\n" ) node.query("DROP TABLE test_secure_connection_uri") simple_mongo_table.drop() def test_secure_connection_uri_with_validation(started_cluster): mongo_connection = get_mongo_connection(started_cluster, secure=True) db = mongo_connection["test"] simple_mongo_table = db["test_secure_connection_uri"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE test_secure_connection_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://root:clickhouse@mongo_secure:27017/test?tls=true', 'test_secure_connection_uri')" ) with pytest.raises(QueryRuntimeException): node.query("SELECT COUNT() FROM test_secure_connection_uri") node.query("DROP TABLE test_secure_connection_uri") simple_mongo_table.drop() def test_predefined_connection_configuration(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table(key UInt64, data String) ENGINE = MongoDB(mongo1)" ) assert node.query("SELECT count() FROM simple_mongo_table") == "100\n" node.query("DROP TABLE simple_mongo_table") simple_mongo_table.drop() def test_predefined_connection_configuration_uri(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table_uri"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_table_uri(key UInt64, data String) ENGINE = MongoDB(mongo1_uri)" ) assert node.query("SELECT count() FROM simple_table_uri") == "100\n" node.query("DROP TABLE simple_table_uri") simple_mongo_table.drop() def test_no_credentials(started_cluster): mongo_connection = get_mongo_connection(started_cluster, with_credentials=False) db = mongo_connection["test"] simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_table(key UInt64, data String) ENGINE = MongoDB('mongo_no_cred:27017', 'test', 'simple_table', '', '')" ) assert node.query("SELECT count() FROM simple_table") == "100\n" node.query("DROP TABLE simple_table") simple_mongo_table.drop() def test_no_credentials_uri(started_cluster): mongo_connection = get_mongo_connection(started_cluster, with_credentials=False) db = mongo_connection["test"] simple_mongo_table = db["simple_table_uri"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_table_uri(key UInt64, data String) ENGINE = MongoDB('mongodb://mongo_no_cred:27017/test', 'simple_table_uri')" ) assert node.query("SELECT count() FROM simple_table_uri") == "100\n" node.query("DROP TABLE simple_table_uri") simple_mongo_table.drop() def test_auth_source(started_cluster): mongo_connection = get_mongo_connection(started_cluster, with_credentials=False) admin_db = mongo_connection["admin"] admin_db.command("dropAllUsersFromDatabase") admin_db.command( "createUser", "root", pwd="clickhouse", roles=[{"role": "userAdminAnyDatabase", "db": "admin"}, "readWriteAnyDatabase"], ) simple_mongo_table = admin_db["simple_table"] data = [] for i in range(0, 50): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) db = mongo_connection["test"] simple_mongo_table = db["simple_table"] data = [] for i in range(0, 100): data.append({"key": i, "data": hex(i * i)}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( "CREATE OR REPLACE TABLE simple_mongo_table_fail(key UInt64, data String) ENGINE = MongoDB('mongo_no_cred:27017', 'test', 'simple_table', 'root', 'clickhouse')" ) with pytest.raises(QueryRuntimeException): node.query("SELECT count() FROM simple_mongo_table_fail") node.query( "CREATE OR REPLACE TABLE simple_mongo_table_ok(key UInt64, data String) ENGINE = MongoDB('mongo_no_cred:27017', 'test', 'simple_table', 'root', 'clickhouse', 'authSource=admin')" ) assert node.query("SELECT count() FROM simple_mongo_table_ok") == "100\n" node.query("DROP TABLE simple_mongo_table_fail") node.query("DROP TABLE simple_mongo_table_ok") simple_mongo_table.drop() def test_missing_columns(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) simple_mongo_table = db["simple_table"] data = [] for i in range(0, 10): data.append({"key": i, "data": hex(i * i)}) for i in range(0, 10): data.append({"key": i}) simple_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( """CREATE OR REPLACE TABLE simple_mongo_table( key UInt64, data Nullable(String), not_exists Int64, not_exists_nullable Nullable(Int64) ) ENGINE = MongoDB(mongo1)""" ) assert ( node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)") == "10\n" ) assert ( node.query("SELECT count() FROM simple_mongo_table WHERE isNull(not_exists)") == "0\n" ) node.query("DROP TABLE IF EXISTS simple_mongo_table") simple_mongo_table.drop() def test_string_casting(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) string_mongo_table = db["strings_table"] data = { "k_boolT": True, "k_boolF": False, "k_int32P": 100, "k_int32N": -100, "k_int64P": bson.int64.Int64(100), "k_int64N": bson.int64.Int64(-100), "k_doubleP": 6.66, "k_doubleN": -6.66, "k_date": datetime.datetime(1999, 2, 28, 0, 0, 0), "k_timestamp": datetime.datetime(1999, 2, 28, 12, 46, 34), "k_string": "ClickHouse", "k_document": { "Hello": "world!", "meow123": True, "number": 321, "doc": {"Hello": "world!"}, "arr": [{"Hello": "world!"}, 1, "c"], }, "k_array": [ "Hello", "world!", {"cat": "meow!"}, [1, 2, 3], ], } string_mongo_table.insert_one(data) node = started_cluster.instances["node"] node.query( """CREATE OR REPLACE TABLE strings_table ( _id String, k_boolT String, k_boolF String, k_int32P String, k_int32N String, k_int64P String, k_int64N String, k_doubleP String, k_doubleN String, k_date String, k_timestamp String, k_string String, k_document String, k_array String ) ENGINE = MongoDB('mongo1:27017', 'test', 'strings_table', 'root', 'clickhouse')""" ) assert node.query("SELECT COUNT() FROM strings_table") == "1\n" assert node.query("SELECT _id FROM strings_table") != "" assert node.query("SELECT k_boolT FROM strings_table") == "true\n" assert node.query("SELECT k_boolF FROM strings_table") == "false\n" assert node.query("SELECT k_int32P FROM strings_table") == "100\n" assert node.query("SELECT k_int32N FROM strings_table") == "-100\n" assert node.query("SELECT k_int64P FROM strings_table") == "100\n" assert node.query("SELECT k_int64N FROM strings_table") == "-100\n" assert node.query("SELECT k_doubleP FROM strings_table") == "6.660000\n" assert node.query("SELECT k_doubleN FROM strings_table") == "-6.660000\n" assert node.query("SELECT k_date FROM strings_table") == "1999-02-28 00:00:00\n" assert ( node.query("SELECT k_timestamp FROM strings_table") == "1999-02-28 12:46:34\n" ) assert node.query("SELECT k_string FROM strings_table") == "ClickHouse\n" assert json.dumps( json.loads(node.query("SELECT k_document FROM strings_table")) ) == json.dumps(data["k_document"]) assert json.dumps( json.loads(node.query("SELECT k_array FROM strings_table")) ) == json.dumps(data["k_array"]) node.query("DROP TABLE strings_table") string_mongo_table.drop() def test_dates_casting(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) dates_mongo_table = db["dates_table"] data = { "k_dateTime": datetime.datetime(1999, 2, 28, 11, 23, 16), "k_dateTime64": datetime.datetime(1999, 2, 28, 11, 23, 16), "k_date": datetime.datetime(1999, 2, 28, 11, 23, 16), "k_date32": datetime.datetime(1999, 2, 28, 11, 23, 16), } dates_mongo_table.insert_one(data) node = started_cluster.instances["node"] node.query( """CREATE OR REPLACE TABLE dates_table ( k_dateTime DateTime, k_dateTime64 DateTime64, k_date Date, k_date32 Date32 ) ENGINE = MongoDB('mongo1:27017', 'test', 'dates_table', 'root', 'clickhouse')""" ) assert node.query("SELECT COUNT() FROM dates_table") == "1\n" assert ( node.query("SELECT * FROM dates_table") == "1999-02-28 11:23:16\t1999-02-28 11:23:16.000\t1999-02-28\t1999-02-28\n" ) node.query("DROP TABLE dates_table") dates_mongo_table.drop() def test_order_by(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) sort_mongo_table = db["sort_table"] data = [] for i in range(1, 31): for d in range(1, 31): data.append( { "keyInt": i, "keyFloat": i + (d * 0.001), "keyDateTime": datetime.datetime(1999, 12, i, 11, 23, 16), "keyDate": datetime.datetime(1999, 12, i, 11, 23, 16), } ) sort_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( """CREATE OR REPLACE TABLE sort_table ( keyInt Int, keyFloat Float64, keyDateTime DateTime, keyDate Date ) ENGINE = MongoDB('mongo1:27017', 'test', 'sort_table', 'root', 'clickhouse')""" ) assert node.query("SELECT COUNT() FROM sort_table") == "900\n" assert node.query("SELECT keyInt FROM sort_table ORDER BY keyInt LIMIT 1") == "1\n" assert ( node.query("SELECT keyInt FROM sort_table ORDER BY keyInt DESC LIMIT 1") == "30\n" ) assert ( node.query( "SELECT keyInt, keyFloat FROM sort_table ORDER BY keyInt, keyFloat DESC LIMIT 1" ) == "1\t1.03\n" ) assert ( node.query( "SELECT keyDateTime FROM sort_table ORDER BY keyDateTime DESC LIMIT 1" ) == "1999-12-30 11:23:16\n" ) assert ( node.query("SELECT keyDate FROM sort_table ORDER BY keyDate DESC LIMIT 1") == "1999-12-30\n" ) with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM sort_table ORDER BY keyInt WITH FILL") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM sort_table ORDER BY keyInt WITH FILL TO sort_table") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM sort_table ORDER BY keyInt WITH FILL FROM sort_table") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM sort_table ORDER BY keyInt WITH FILL STEP 1") node.query("DROP TABLE sort_table") sort_mongo_table.drop() def test_where(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) where_mongo_table = db["where_table"] data = [] for i in range(1, 3): for d in range(1, 3): data.append( { "id": str(i) + str(d), "keyInt": i, "keyFloat": i + (d * 0.001), "keyString": str(d) + "string", "keyDateTime": datetime.datetime(1999, d, i, 11, 23, 16), "keyDate": datetime.datetime(1999, d, i, 11, 23, 16), "keyNull": None, } ) where_mongo_table.insert_many(data) node = started_cluster.instances["node"] node.query( """CREATE OR REPLACE TABLE where_table ( id String, keyInt Int, keyFloat Float64, keyString String, keyDateTime DateTime, keyDate Date, keyNull Nullable(UInt8), keyNotExists Nullable(Int) ) ENGINE = MongoDB('mongo1:27017', 'test', 'where_table', 'root', 'clickhouse')""" ) assert node.query("SELECT COUNT() FROM where_table") == "4\n" assert ( node.query("SELECT keyString FROM where_table WHERE id = '11'") == "1string\n" ) assert ( node.query( "SELECT keyString FROM where_table WHERE id != '11' ORDER BY keyFloat" ) == "2string\n1string\n2string\n" ) assert ( node.query( "SELECT keyString FROM where_table WHERE id = '11' AND keyString = '1string'" ) == "1string\n" ) assert ( node.query("SELECT id FROM where_table WHERE keyInt = 1 AND keyFloat = 1.001") == "11\n" ) assert ( node.query("SELECT id FROM where_table WHERE keyInt = 0 OR keyFloat = 1.001") == "11\n" ) assert ( node.query("SELECT id FROM where_table WHERE keyInt BETWEEN 1 AND 2") == "11\n12\n21\n22\n" ) assert node.query("SELECT id FROM where_table WHERE keyInt > 10") == "" assert ( node.query("SELECT id FROM where_table WHERE keyInt < 10.1 ORDER BY keyFloat") == "11\n12\n21\n22\n" ) assert node.query("SELECT id FROM where_table WHERE id IN ('11')") == "11\n" assert node.query("SELECT id FROM where_table WHERE id IN ['11']") == "11\n" assert node.query("SELECT id FROM where_table WHERE id IN ('11', 100)") == "11\n" assert ( node.query( "SELECT id FROM where_table WHERE id IN ('11', '22') ORDER BY keyFloat" ) == "11\n22\n" ) assert ( node.query( "SELECT id FROM where_table WHERE id IN ['11', '22'] ORDER BY keyFloat" ) == "11\n22\n" ) assert ( node.query( "SELECT id FROM where_table WHERE id NOT IN ('11') ORDER BY keyFloat" ) == "12\n21\n22\n" ) assert ( node.query( "SELECT id FROM where_table WHERE id NOT IN ['11'] ORDER BY keyFloat" ) == "12\n21\n22\n" ) assert ( node.query( "SELECT id FROM where_table WHERE id NOT IN ('11', 100) ORDER BY keyFloat" ) == "12\n21\n22\n" ) assert ( node.query("SELECT id FROM where_table WHERE id NOT IN ('11') AND id IN ('12')") == "12\n" ) assert ( node.query("SELECT id FROM where_table WHERE id NOT IN ['11'] AND id IN ('12')") == "12\n" ) with pytest.raises(QueryRuntimeException): assert ( node.query( "SELECT id FROM where_table WHERE id NOT IN ['11', 100] ORDER BY keyFloat" ) == "12\n21\n22\n" ) assert node.query("SELECT id FROM where_table WHERE keyDateTime > now()") == "" assert ( node.query( "SELECT keyInt FROM where_table WHERE keyDateTime < now() AND keyString = '1string' AND keyInt IS NOT NULL ORDER BY keyInt" ) == "1\n2\n" ) assert node.query("SELECT count() FROM where_table WHERE isNotNull(id)") == "4\n" assert ( node.query("SELECT count() FROM where_table WHERE isNotNull(keyNull)") == "0\n" ) assert node.query("SELECT count() FROM where_table WHERE isNull(keyNull)") == "4\n" assert ( node.query("SELECT count() FROM where_table WHERE isNotNull(keyNotExists)") == "0\n" ) assert ( node.query("SELECT count() FROM where_table WHERE isNull(keyNotExists)") == "4\n" ) assert node.query("SELECT count() FROM where_table WHERE keyNotExists = 0") == "0\n" assert ( node.query("SELECT count() FROM where_table WHERE keyNotExists != 0") == "0\n" ) with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM where_table WHERE keyInt = keyFloat") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM where_table WHERE equals(keyInt, keyFloat)") node.query("DROP TABLE where_table") where_mongo_table.drop() def test_defaults(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) defaults_mongo_table = db["defaults_table"] defaults_mongo_table.insert_one({"key": "key"}) node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE defaults_table( _id String, k_int64 Int64, k_int32 Int32, k_int16 Int16, k_int8 Int8, k_uint64 UInt64, k_uint32 UInt32, k_uint16 UInt16, k_uint8 UInt8, k_float32 Float32, k_float64 Float64, k_date Date, k_date32 Date32, k_datetime DateTime, k_datetime64 DateTime64, k_string String, k_uuid UUID, k_arr Array(Bool) ) ENGINE = MongoDB('mongo1:27017', 'test', 'defaults_table', 'root', 'clickhouse') """ ) assert node.query("SELECT COUNT() FROM defaults_table") == "1\n" assert ( node.query( "SELECT k_int64, k_int32, k_int16, k_int8, k_uint64, k_uint32, k_uint16, k_uint8, k_float32, k_float64 FROM defaults_table" ) == "0\t0\t0\t0\t0\t0\t0\t0\t0\t0\n" ) assert ( node.query( "SELECT k_date, k_date32, k_datetime, k_datetime64, k_string, k_uuid, k_arr FROM defaults_table" ) == "1970-01-01\t1900-01-01\t1970-01-01 00:00:00\t1970-01-01 00:00:00.000\t\t00000000-0000-0000-0000-000000000000\t[]\n" ) node.query("DROP TABLE defaults_table") defaults_mongo_table.drop() def test_nulls(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) nulls_mongo_table = db["nulls_table"] nulls_mongo_table.insert_one({"key": "key"}) node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE nulls_table( _id String, k_int64 Nullable(Int64), k_int32 Nullable(Int32), k_int16 Nullable(Int16), k_int8 Nullable(Int8), k_uint64 Nullable(UInt64), k_uint32 Nullable(UInt32), k_uint16 Nullable(UInt16), k_uint8 Nullable(UInt8), k_float32 Nullable(Float32), k_float64 Nullable(Float64), k_date Nullable(Date), k_date32 Nullable(Date32), k_datetime Nullable(DateTime), k_datetime64 Nullable(DateTime64), k_string Nullable(String), k_uuid Nullable(UUID) ) ENGINE = MongoDB('mongo1:27017', 'test', 'nulls_table', 'root', 'clickhouse') """ ) assert node.query("SELECT COUNT() FROM nulls_table") == "1\n" assert ( node.query( "SELECT k_int64, k_int32, k_int16, k_int8, k_uint64, k_uint32, k_uint16, k_uint8, k_float32, k_float64 FROM nulls_table" ) == "\\N\t\\N\t\\N\t\\N\t\\N\t\\N\t\\N\t\\N\t\\N\t\\N\n" ) assert ( node.query( "SELECT k_date, k_date32, k_datetime, k_datetime64, k_string, k_uuid FROM nulls_table" ) == "\\N\t\\N\t\\N\t\\N\t\\N\t\\N\n" ) node.query("DROP TABLE nulls_table") nulls_mongo_table.drop() def test_oid(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) oid_mongo_table = db["oid_table"] inserted_result = oid_mongo_table.insert_many( [ {"key": "a"}, {"key": "b"}, {"key": "c"}, {"key": "d"}, {"key": "e"}, ] ) oid = inserted_result.inserted_ids node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE oid_table( _id String, key String ) ENGINE = MongoDB('mongo1:27017', 'test', 'oid_table', 'root', 'clickhouse') """ ) assert node.query("SELECT COUNT() FROM oid_table") == "5\n" assert node.query(f"SELECT key FROM oid_table WHERE _id = '{oid[0]}'") == "a\n" assert ( node.query(f"SELECT * FROM oid_table WHERE _id = '{oid[2]}'") == f"{oid[2]}\tc\n" ) assert node.query(f"SELECT COUNT() FROM oid_table WHERE _id != '{oid[0]}'") == "4\n" assert ( node.query( f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}', '{oid[1]}') ORDER BY key" ) == "a\nb\n" ) assert ( node.query( f"SELECT key FROM oid_table WHERE _id in ['{oid[0]}', '{oid[1]}'] ORDER BY key" ) == "a\nb\n" ) assert ( node.query(f"SELECT key FROM oid_table WHERE _id in ('{oid[0]}') ORDER BY key") == "a\n" ) assert ( node.query(f"SELECT key FROM oid_table WHERE _id in ['{oid[1]}'] ORDER BY key") == "b\n" ) with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM oid_table WHERE _id = 'invalidOID'") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM oid_table WHERE _id = 123123") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM oid_table WHERE _id in (123123, 123)") node.query("DROP TABLE oid_table") oid_mongo_table.drop() def test_uuid(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) uuid_mongo_table = db["uuid_table"] uuid_mongo_table.insert_many( [ {"isValid": 0, "kUUID": "bad_uuid"}, {"isValid": 1, "kUUID": "f0e77736-91d1-48ce-8f01-15123ca1c7ed"}, ] ) node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE uuid_table( isValid UInt8, kUUID UUID ) ENGINE = MongoDB('mongo1:27017', 'test', 'uuid_table', 'root', 'clickhouse') """ ) assert ( node.query(f"SELECT kUUID FROM uuid_table WHERE isValid = 1") == "f0e77736-91d1-48ce-8f01-15123ca1c7ed\n" ) with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM uuid_table WHERE isValid = 0") with pytest.raises(QueryRuntimeException): node.query("SELECT * FROM uuid_table") node.query("DROP TABLE uuid_table") uuid_mongo_table.drop() def test_no_fail_on_unsupported_clauses(started_cluster): mongo_connection = get_mongo_connection(started_cluster) db = mongo_connection["test"] db.command("dropAllUsersFromDatabase") db.command("createUser", "root", pwd="clickhouse", roles=["readWrite"]) unsupported_clauses_table = db["unsupported_clauses"] node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE unsupported_clauses( a UInt64, b UInt64 ) ENGINE = MongoDB('mongo1:27017', 'test', 'unsupported_clauses', 'root', 'clickhouse') """ ) node.query( f"SELECT * FROM unsupported_clauses WHERE a > rand() SETTINGS mongodb_throw_on_unsupported_query = 0" ) node.query( f"SELECT * FROM unsupported_clauses WHERE a / 1000 > 0 SETTINGS mongodb_throw_on_unsupported_query = 0" ) node.query( f"SELECT * FROM unsupported_clauses WHERE toFloat64(a) < 6.66 > rand() SETTINGS mongodb_throw_on_unsupported_query = 0" ) node.query( f"SELECT * FROM unsupported_clauses ORDER BY a, b LIMIT 2 BY a SETTINGS mongodb_throw_on_unsupported_query = 0" ) node.query("DROP TABLE unsupported_clauses") unsupported_clauses_table.drop() def test_password_masking(started_cluster): node = started_cluster.instances["node"] node.query( """ CREATE OR REPLACE TABLE mongodb_uri_password_masking (_id String) ENGINE = MongoDB('mongodb://testuser:mypassword@127.0.0.1:27017/example', 'test_clickhouse'); """ ) assert ( node.query( """ SELECT replaceAll(create_table_query, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_uri_password_masking' AND database = currentDatabase(); """ ) == "CREATE TABLE default.mongodb_uri_password_masking (`_id` String) ENGINE = MongoDB(\\'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\\', \\'test_clickhouse\\')\n" ) assert ( node.query( """ SELECT replaceAll(engine_full, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_uri_password_masking' AND database = currentDatabase(); """ ) == "MongoDB(\\'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\\', \\'test_clickhouse\\')\n" ) node.query("DROP TABLE IF EXISTS mongodb_uri_password_masking;") node.query( """ CREATE OR REPLACE DICTIONARY mongodb_dictionary_uri_password_masking (_id String) PRIMARY KEY _id SOURCE(MONGODB(uri 'mongodb://testuser:mypassword@127.0.0.1:27017/example' collection 'test_clickhouse')) LAYOUT(FLAT()) LIFETIME(0); """ ) assert ( node.query( """ SELECT replaceAll(create_table_query, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_dictionary_uri_password_masking' AND database = currentDatabase();""" ) == "CREATE DICTIONARY default.mongodb_dictionary_uri_password_masking (`_id` String) PRIMARY KEY _id SOURCE(MONGODB(URI \\'mongodb://testuser:[HIDDEN]@127.0.0.1:27017/example\\' COLLECTION \\'test_clickhouse\\')) LIFETIME(MIN 0 MAX 0) LAYOUT(FLAT())\n" ) node.query("DROP DICTIONARY IF EXISTS mongodb_dictionary_uri_password_masking;") node.query( """ CREATE TABLE mongodb_password_masking (_id String) ENGINE = MongoDB('127.0.0.1:27017', 'example', 'test_clickhouse', 'testuser', 'mypassword'); """ ) assert ( node.query( """ SELECT replaceAll(create_table_query, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_password_masking' AND database = currentDatabase(); """ ) == "CREATE TABLE default.mongodb_password_masking (`_id` String) ENGINE = MongoDB(\\'127.0.0.1:27017\\', \\'example\\', \\'test_clickhouse\\', \\'testuser\\', \\'[HIDDEN]\\')\n" ) assert ( node.query( """ SELECT replaceAll(engine_full, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_password_masking' AND database = currentDatabase(); """ ) == "MongoDB(\\'127.0.0.1:27017\\', \\'example\\', \\'test_clickhouse\\', \\'testuser\\', \\'[HIDDEN]\\')\n" ) node.query("DROP TABLE IF EXISTS mongodb_password_masking;") node.query( """ CREATE OR REPLACE DICTIONARY mongodb_dictionary_password_masking (_id String) PRIMARY KEY _id SOURCE(MONGODB( host '127.0.0.1' port 27017 user 'testuser' password 'mypassword' db 'example' collection 'test_clickhouse' options 'ssl=true' )) LAYOUT(FLAT()) LIFETIME(0); """ ) assert ( node.query( """ SELECT replaceAll(create_table_query, currentDatabase(), 'default') FROM system.tables WHERE table = 'mongodb_dictionary_password_masking' AND database = currentDatabase(); """ ) == "CREATE DICTIONARY default.mongodb_dictionary_password_masking (`_id` String) PRIMARY KEY _id SOURCE(MONGODB(HOST \\'127.0.0.1\\' PORT 27017 USER \\'testuser\\' PASSWORD \\'[HIDDEN]\\' DB \\'example\\' COLLECTION \\'test_clickhouse\\' OPTIONS \\'ssl=true\\')) LIFETIME(MIN 0 MAX 0) LAYOUT(FLAT())\n" ) node.query("DROP DICTIONARY IF EXISTS mongodb_dictionary_password_masking;")