#include #if USE_POCO_MONGODB #include #include #include #include #include #include #include #include "DictionaryStructure.h" #include "MongoDBBlockInputStream.h" #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int TYPE_MISMATCH; } MongoDBBlockInputStream::MongoDBBlockInputStream( std::shared_ptr & connection_, std::unique_ptr cursor_, const Block & sample_block, const size_t max_block_size) : connection(connection_), cursor{std::move(cursor_)}, max_block_size{max_block_size} { description.init(sample_block); } MongoDBBlockInputStream::~MongoDBBlockInputStream() = default; namespace { using ValueType = ExternalResultDescription::ValueType; using ObjectId = Poco::MongoDB::ObjectId; template void insertNumber(IColumn & column, const Poco::MongoDB::Element & value, const std::string & name) { switch (value.type()) { case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().push_back(static_cast &>(value).value()); break; case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().push_back(static_cast &>(value).value()); break; case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().push_back(static_cast &>(value).value()); break; case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().push_back(static_cast &>(value).value()); break; case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().emplace_back(); break; case Poco::MongoDB::ElementTraits::TypeId: static_cast &>(column).getData().push_back( parse(static_cast &>(value).value())); break; default: throw Exception("Type mismatch, expected a number, got type id = " + toString(value.type()) + " for column " + name, ErrorCodes::TYPE_MISMATCH); } } void insertValue( IColumn & column, const ValueType type, const Poco::MongoDB::Element & value, const std::string & name) { switch (type) { case ValueType::UInt8: insertNumber(column, value, name); break; case ValueType::UInt16: insertNumber(column, value, name); break; case ValueType::UInt32: insertNumber(column, value, name); break; case ValueType::UInt64: insertNumber(column, value, name); break; case ValueType::Int8: insertNumber(column, value, name); break; case ValueType::Int16: insertNumber(column, value, name); break; case ValueType::Int32: insertNumber(column, value, name); break; case ValueType::Int64: insertNumber(column, value, name); break; case ValueType::Float32: insertNumber(column, value, name); break; case ValueType::Float64: insertNumber(column, value, name); break; case ValueType::String: { if (value.type() == Poco::MongoDB::ElementTraits::TypeId) { std::string string_id = value.toString(); static_cast(column).insertDataWithTerminatingZero(string_id.data(), string_id.size() + 1); break; } else if (value.type() == Poco::MongoDB::ElementTraits::TypeId) { String string = static_cast &>(value).value(); static_cast(column).insertDataWithTerminatingZero(string.data(), string.size() + 1); break; } throw Exception{"Type mismatch, expected String, got type id = " + toString(value.type()) + " for column " + name, ErrorCodes::TYPE_MISMATCH}; } case ValueType::Date: { if (value.type() != Poco::MongoDB::ElementTraits::TypeId) throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, ErrorCodes::TYPE_MISMATCH}; static_cast(column).getData().push_back( UInt16{DateLUT::instance().toDayNum( static_cast &>(value).value().epochTime())}); break; } case ValueType::DateTime: { if (value.type() != Poco::MongoDB::ElementTraits::TypeId) throw Exception{"Type mismatch, expected Timestamp, got type id = " + toString(value.type()) + " for column " + name, ErrorCodes::TYPE_MISMATCH}; static_cast(column).getData().push_back( static_cast &>(value).value().epochTime()); break; } case ValueType::UUID: { if (value.type() == Poco::MongoDB::ElementTraits::TypeId) { String string = static_cast &>(value).value(); static_cast(column).getData().push_back(parse(string)); } else throw Exception{"Type mismatch, expected String (UUID), got type id = " + toString(value.type()) + " for column " + name, ErrorCodes::TYPE_MISMATCH}; break; } } } void insertDefaultValue(IColumn & column, const IColumn & sample_column) { column.insertFrom(sample_column, 0); } } Block MongoDBBlockInputStream::readImpl() { if (all_read) return {}; MutableColumns columns(description.sample_block.columns()); const size_t size = columns.size(); for (const auto i : ext::range(0, size)) columns[i] = description.sample_block.getByPosition(i).column->cloneEmpty(); size_t num_rows = 0; while (num_rows < max_block_size) { Poco::MongoDB::ResponseMessage & response = cursor->next(*connection); for (const auto & document : response.documents()) { ++num_rows; for (const auto idx : ext::range(0, size)) { const auto & name = description.sample_block.getByPosition(idx).name; const Poco::MongoDB::Element::Ptr value = document->get(name); if (value.isNull() || value->type() == Poco::MongoDB::ElementTraits::TypeId) insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column); else { if (description.types[idx].second) { ColumnNullable & column_nullable = static_cast(*columns[idx]); insertValue(column_nullable.getNestedColumn(), description.types[idx].first, *value, name); column_nullable.getNullMapData().emplace_back(0); } else insertValue(*columns[idx], description.types[idx].first, *value, name); } } } if (response.cursorID() == 0) { all_read = true; break; } } if (num_rows == 0) return {}; return description.sample_block.cloneWithColumns(std::move(columns)); } } #endif