This commit is contained in:
kssenii 2022-04-19 10:59:47 +02:00
parent d8e2d693e5
commit f8f66dd23d
3 changed files with 10 additions and 14 deletions

View File

@ -156,14 +156,11 @@ MongoDBSource::MongoDBSource(
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const Block & sample_block,
UInt64 max_block_size_,
bool strict_check_names_)
UInt64 max_block_size_)
: SourceWithProgress(sample_block.cloneEmpty())
, connection(connection_)
, cursor{std::move(cursor_)}
, max_block_size{max_block_size_}
, strict_check_names(strict_check_names_)
{
description.init(sample_block);
}
@ -342,10 +339,13 @@ Chunk MongoDBSource::generate()
for (const auto idx : collections::range(0, size))
{
const auto & name = description.sample_block.getByPosition(idx).name;
bool is_nullable = description.types[idx].second;
if (!is_nullable && strict_check_names && !document->exists(name))
throw Exception(fmt::format("Column {} is absent in MongoDB collection", backQuote(name)), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
bool exists_in_current_document = document->exists(name);
if (!exists_in_current_document)
{
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
continue;
}
const Poco::MongoDB::Element::Ptr value = document->get(name);
@ -355,6 +355,7 @@ Chunk MongoDBSource::generate()
}
else
{
bool is_nullable = description.types[idx].second;
if (is_nullable)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);

View File

@ -29,8 +29,7 @@ public:
std::shared_ptr<Poco::MongoDB::Connection> & connection_,
std::unique_ptr<Poco::MongoDB::Cursor> cursor_,
const Block & sample_block,
UInt64 max_block_size_,
bool strict_check_names_ = false);
UInt64 max_block_size_);
~MongoDBSource() override;
@ -44,10 +43,6 @@ private:
const UInt64 max_block_size;
ExternalResultDescription description;
bool all_read = false;
/// if true stream will check, that all required fields present in MongoDB
/// collection, otherwise throw exception.
bool strict_check_names;
};
}

View File

@ -108,7 +108,7 @@ Pipe StorageMongoDB::read(
sample_block.insert({ column_data.type, column_data.name });
}
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size, true));
return Pipe(std::make_shared<MongoDBSource>(connection, createCursor(database_name, collection_name, sample_block), sample_block, max_block_size));
}