Merge pull request #51876 from evillique/fix-mongodb-inserts

Fix inserts into MongoDB tables
This commit is contained in:
robot-ch-test-poll 2023-07-16 05:28:43 +02:00 committed by GitHub
commit 3b0a4040cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 3 deletions

View File

@ -19,6 +19,8 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <unordered_set>
#include <DataTypes/DataTypeArray.h>
namespace DB
{
@ -127,9 +129,7 @@ public:
for (const auto j : collections::range(0, num_cols))
{
WriteBufferFromOwnString ostr;
data_types[j]->getDefaultSerialization()->serializeText(*columns[j], i, ostr, FormatSettings{});
document->add(data_names[j], ostr.str());
insertValueIntoMongoDB(*document, data_names[j], *data_types[j], *columns[j], i);
}
documents.push_back(std::move(document));
@ -151,6 +151,60 @@ public:
}
private:
void insertValueIntoMongoDB(
Poco::MongoDB::Document & document,
const std::string & name,
const IDataType & data_type,
const IColumn & column,
size_t idx)
{
WhichDataType which(data_type);
if (which.isArray())
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[idx - 1];
size_t next_offset = offsets[idx];
const IColumn & nested_column = column_array.getData();
const auto * array_type = assert_cast<const DataTypeArray *>(&data_type);
const DataTypePtr & nested_type = array_type->getNestedType();
Poco::MongoDB::Array::Ptr array = new Poco::MongoDB::Array();
for (size_t i = 0; i + offset < next_offset; ++i)
{
insertValueIntoMongoDB(*array, Poco::NumberFormatter::format(i), *nested_type, nested_column, i + offset);
}
document.add(name, array);
return;
}
/// MongoDB does not support UInt64 type, so just cast it to Int64
if (which.isNativeUInt())
document.add(name, static_cast<Poco::Int64>(column.getUInt(idx)));
else if (which.isNativeInt())
document.add(name, static_cast<Poco::Int64>(column.getInt(idx)));
else if (which.isFloat32())
document.add(name, static_cast<Float64>(column.getFloat32(idx)));
else if (which.isFloat64())
document.add(name, static_cast<Float64>(column.getFloat64(idx)));
else if (which.isDate())
document.add(name, Poco::Timestamp(DateLUT::instance().fromDayNum(DayNum(column.getUInt(idx))) * 1000000));
else if (which.isDateTime())
document.add(name, Poco::Timestamp(column.getUInt(idx) * 1000000));
else
{
WriteBufferFromOwnString ostr;
data_type.getDefaultSerialization()->serializeText(column, idx, ostr, FormatSettings{});
document.add(name, ostr.str());
}
}
String collection_name;
String db_name;
StorageMetadataPtr metadata_snapshot;

View File

@ -245,6 +245,12 @@ def test_arrays(started_cluster):
== "[]\n"
)
# Test INSERT SELECT
node.query("INSERT INTO arrays_mongo_table SELECT * FROM arrays_mongo_table")
assert node.query("SELECT COUNT() FROM arrays_mongo_table") == "200\n"
assert node.query("SELECT COUNT(DISTINCT *) FROM arrays_mongo_table") == "100\n"
node.query("DROP TABLE arrays_mongo_table")
arrays_mongo_table.drop()