Allow missing columns for mongo storage

This commit is contained in:
kssenii 2022-04-14 19:31:07 +02:00
parent 4bad2cd364
commit d8e2d693e5
2 changed files with 29 additions and 3 deletions

View File

@ -162,7 +162,8 @@ MongoDBSource::MongoDBSource(
, connection(connection_)
, cursor{std::move(cursor_)}
, max_block_size{max_block_size_}
, strict_check_names{strict_check_names_}
, strict_check_names(strict_check_names_)
{
description.init(sample_block);
}
@ -341,17 +342,20 @@ Chunk MongoDBSource::generate()
for (const auto idx : collections::range(0, size))
{
const auto & name = description.sample_block.getByPosition(idx).name;
bool is_nullable = description.types[idx].second;
if (strict_check_names && !document->exists(name))
if (!is_nullable && strict_check_names && !document->exists(name))
throw Exception(fmt::format("Column {} is absent in MongoDB collection", backQuote(name)), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
const Poco::MongoDB::Element::Ptr value = document->get(name);
if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits<Poco::MongoDB::NullValue>::TypeId)
{
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
}
else
{
if (description.types[idx].second)
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name);

View File

@ -231,3 +231,25 @@ def test_auth_source(started_cluster):
)
assert node.query("SELECT count() FROM simple_mongo_table_ok") == "100\n"
simple_mongo_table.drop()
@pytest.mark.parametrize("started_cluster", [False], indirect=["started_cluster"])
def test_missing_columns(started_cluster):
mongo_connection = get_mongo_connection(started_cluster)
db = mongo_connection["test"]
db.add_user("root", "clickhouse")
simple_mongo_table = db["simple_table"]
data = []
for i in range(0, 10):
data.append({"key": i, "data": hex(i * i)})
for i in range(0, 10):
data.append({"key": i})
simple_mongo_table.insert_many(data)
node = started_cluster.instances["node"]
node.query(
"create table simple_mongo_table(key UInt64, data Nullable(String)) engine = MongoDB(mongo1)"
)
result = node.query("SELECT count() FROM simple_mongo_table WHERE isNull(data)")
assert result == "10\n"
simple_mongo_table.drop()