Provide support for BSON on BE

This commit is contained in:
kothiga 2023-09-19 12:16:11 -07:00
parent 6a4e4cc361
commit 80d511093b
No known key found for this signature in database
2 changed files with 35 additions and 30 deletions

View File

@ -118,7 +118,7 @@ static UInt8 readBSONType(ReadBuffer & in)
static size_t readBSONSize(ReadBuffer & in)
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
return size;
}
@ -131,19 +131,19 @@ static void readAndInsertInteger(ReadBuffer & in, IColumn & column, const DataTy
if (bson_type == BSONType::INT32)
{
UInt32 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else if (bson_type == BSONType::INT64)
{
UInt64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else if (bson_type == BSONType::BOOL)
{
UInt8 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else
@ -160,7 +160,7 @@ static void readAndInsertIPv4(ReadBuffer & in, IColumn & column, BSONType bson_t
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON Int32 into column with type IPv4");
UInt32 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnIPv4 &>(column).insertValue(IPv4(value));
}
@ -172,7 +172,7 @@ static void readAndInsertDouble(ReadBuffer & in, IColumn & column, const DataTyp
getBSONTypeName(bson_type), data_type->getName());
Float64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
@ -184,7 +184,7 @@ static void readAndInsertSmallDecimal(ReadBuffer & in, IColumn & column, const D
getBSONTypeName(bson_type), data_type->getName());
DecimalType value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnDecimal<DecimalType> &>(column).insertValue(value);
}
@ -194,7 +194,7 @@ static void readAndInsertDateTime64(ReadBuffer & in, IColumn & column, BSONType
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON {} into DateTime64 column", getBSONTypeName(bson_type));
DateTime64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(value);
}
@ -222,7 +222,7 @@ static void readAndInsertBigInteger(ReadBuffer & in, IColumn & column, const Dat
sizeof(ValueType));
ValueType value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnType &>(column).insertValue(value);
}
@ -355,7 +355,7 @@ static void readAndInsertUUID(ReadBuffer & in, IColumn & column, BSONType bson_t
sizeof(UUID));
UUID value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnUUID &>(column).insertValue(value);
}
@ -371,7 +371,7 @@ void BSONEachRowRowInputFormat::readArray(IColumn & column, const DataTypePtr &
size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);
@ -401,7 +401,7 @@ void BSONEachRowRowInputFormat::readTuple(IColumn & column, const DataTypePtr &
size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);
@ -462,7 +462,7 @@ void BSONEachRowRowInputFormat::readMap(IColumn & column, const DataTypePtr & da
size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);
@ -696,7 +696,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::STRING:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size);
break;
}
@ -704,7 +704,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::ARRAY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
if (size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", size);
in.ignore(size - sizeof(size));
@ -713,7 +713,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::BINARY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size + 1);
break;
}
@ -738,14 +738,14 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::DB_POINTER:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size + BSON_DB_POINTER_SIZE);
break;
}
case BSONType::JAVA_SCRIPT_CODE_W_SCOPE:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
if (size < sizeof(BSONSizeT))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid java code_w_scope size: {}", size);
in.ignore(size - sizeof(size));
@ -787,7 +787,7 @@ bool BSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
size_t key_index = 0;
current_document_start = in->count();
readBinary(current_document_size, *in);
readBinaryLittleEndian(current_document_size, *in);
if (current_document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", current_document_size);
@ -844,7 +844,7 @@ size_t BSONEachRowRowInputFormat::countRows(size_t max_block_size)
BSONSizeT document_size;
while (!in->eof() && num_rows < max_block_size)
{
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);
in->ignore(document_size - sizeof(BSONSizeT));
@ -893,7 +893,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
case BSONType::STRING:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size);
return std::make_shared<DataTypeString>();
}
@ -947,7 +947,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
case BSONType::BINARY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
auto subtype = getBSONBinarySubtype(readBSONType(in));
in.ignore(size);
switch (subtype)
@ -982,7 +982,7 @@ NamesAndTypesList BSONEachRowSchemaReader::getDataTypesFromBSONDocument(bool all
{
size_t document_start = in.count();
BSONSizeT document_size;
readBinary(document_size, in);
readBinaryLittleEndian(document_size, in);
NamesAndTypesList names_and_types;
while (in.count() - document_start + sizeof(BSON_DOCUMENT_END) != document_size)
{
@ -1028,7 +1028,7 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t
while (!in.eof() && memory.size() < min_bytes && number_of_rows < max_rows)
{
BSONSizeT document_size;
readBinary(document_size, in);
readBinaryLittleEndian(document_size, in);
if (document_size < sizeof(document_size))
throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid");
@ -1045,7 +1045,13 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t
size_t old_size = memory.size();
memory.resize(old_size + document_size);
unalignedStore<BSONSizeT>(memory.data() + old_size, document_size);
// Ensure the document size we write to the memory is byte arranged for LE.
BSONSizeT size_out = document_size;
if constexpr(std::endian::native == std::endian::big)
size_out = std::byteswap(size_out);
unalignedStore<BSONSizeT>(memory.data() + old_size, size_out);
in.readStrict(memory.data() + old_size + sizeof(document_size), document_size - sizeof(document_size));
++number_of_rows;
}

View File

@ -58,7 +58,7 @@ static void writeBSONSize(size_t size, WriteBuffer & buf)
if (size > MAX_BSON_SIZE)
throw Exception(ErrorCodes::INCORRECT_DATA, "Too large document/value size: {}. Maximum allowed size: {}.", size, MAX_BSON_SIZE);
writePODBinary<BSONSizeT>(BSONSizeT(size), buf);
writeBinaryLittleEndian(BSONSizeT(size), buf);
}
template <typename Type>
@ -79,7 +79,7 @@ template <typename ColumnType, typename ValueType>
static void writeBSONNumber(BSONType type, const IColumn & column, size_t row_num, const String & name, WriteBuffer & buf)
{
writeBSONTypeAndKeyName(type, name, buf);
writePODBinary<ValueType>(assert_cast<const ColumnType &>(column).getElement(row_num), buf);
writeBinaryLittleEndian(ValueType(assert_cast<const ColumnType &>(column).getElement(row_num)), buf);
}
template <typename StringColumnType>
@ -109,8 +109,7 @@ static void writeBSONBigInteger(const IColumn & column, size_t row_num, const St
writeBSONTypeAndKeyName(BSONType::BINARY, name, buf);
writeBSONSize(sizeof(typename ColumnType::ValueType), buf);
writeBSONType(BSONBinarySubtype::BINARY, buf);
auto data = assert_cast<const ColumnType &>(column).getDataAt(row_num);
buf.write(data.data, data.size);
writeBinaryLittleEndian(assert_cast<const ColumnType &>(column).getElement(row_num), buf);
}
size_t BSONEachRowRowOutputFormat::countBSONFieldSize(const IColumn & column, const DataTypePtr & data_type, size_t row_num, const String & name, const String & path, std::unordered_map<String, size_t> & nested_document_sizes)
@ -407,7 +406,7 @@ void BSONEachRowRowOutputFormat::serializeField(const IColumn & column, const Da
writeBSONTypeAndKeyName(BSONType::BINARY, name, out);
writeBSONSize(sizeof(UUID), out);
writeBSONType(BSONBinarySubtype::UUID, out);
writeBinary(assert_cast<const ColumnUUID &>(column).getElement(row_num), out);
writeBinaryLittleEndian(assert_cast<const ColumnUUID &>(column).getElement(row_num), out);
break;
}
case TypeIndex::LowCardinality: