From 80d511093b5c126e9534cfc20b1faeda07a165f1 Mon Sep 17 00:00:00 2001 From: kothiga Date: Tue, 19 Sep 2023 12:16:11 -0700 Subject: [PATCH] Provide support for BSON on BE --- .../Impl/BSONEachRowRowInputFormat.cpp | 56 ++++++++++--------- .../Impl/BSONEachRowRowOutputFormat.cpp | 9 ++- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp index 2972f9da743..ea8ed960595 100644 --- a/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp @@ -118,7 +118,7 @@ static UInt8 readBSONType(ReadBuffer & in) static size_t readBSONSize(ReadBuffer & in) { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); return size; } @@ -131,19 +131,19 @@ static void readAndInsertInteger(ReadBuffer & in, IColumn & column, const DataTy if (bson_type == BSONType::INT32) { UInt32 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast &>(column).insertValue(static_cast(value)); } else if (bson_type == BSONType::INT64) { UInt64 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast &>(column).insertValue(static_cast(value)); } else if (bson_type == BSONType::BOOL) { UInt8 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast &>(column).insertValue(static_cast(value)); } else @@ -160,7 +160,7 @@ static void readAndInsertIPv4(ReadBuffer & in, IColumn & column, BSONType bson_t throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON Int32 into column with type IPv4"); UInt32 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast(column).insertValue(IPv4(value)); } @@ -172,7 +172,7 @@ static void readAndInsertDouble(ReadBuffer & in, IColumn & column, const DataTyp getBSONTypeName(bson_type), data_type->getName()); Float64 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast &>(column).insertValue(static_cast(value)); } @@ -184,7 +184,7 @@ static void readAndInsertSmallDecimal(ReadBuffer & in, IColumn & column, const D getBSONTypeName(bson_type), data_type->getName()); DecimalType value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast &>(column).insertValue(value); } @@ -194,7 +194,7 @@ static void readAndInsertDateTime64(ReadBuffer & in, IColumn & column, BSONType throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON {} into DateTime64 column", getBSONTypeName(bson_type)); DateTime64 value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast(column).insertValue(value); } @@ -222,7 +222,7 @@ static void readAndInsertBigInteger(ReadBuffer & in, IColumn & column, const Dat sizeof(ValueType)); ValueType value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast(column).insertValue(value); } @@ -355,7 +355,7 @@ static void readAndInsertUUID(ReadBuffer & in, IColumn & column, BSONType bson_t sizeof(UUID)); UUID value; - readBinary(value, in); + readBinaryLittleEndian(value, in); assert_cast(column).insertValue(value); } @@ -371,7 +371,7 @@ void BSONEachRowRowInputFormat::readArray(IColumn & column, const DataTypePtr & size_t document_start = in->count(); BSONSizeT document_size; - readBinary(document_size, *in); + readBinaryLittleEndian(document_size, *in); if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size); @@ -401,7 +401,7 @@ void BSONEachRowRowInputFormat::readTuple(IColumn & column, const DataTypePtr & size_t document_start = in->count(); BSONSizeT document_size; - readBinary(document_size, *in); + readBinaryLittleEndian(document_size, *in); if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size); @@ -462,7 +462,7 @@ void BSONEachRowRowInputFormat::readMap(IColumn & column, const DataTypePtr & da size_t document_start = in->count(); BSONSizeT document_size; - readBinary(document_size, *in); + readBinaryLittleEndian(document_size, *in); if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size); @@ -696,7 +696,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type) case BSONType::STRING: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); in.ignore(size); break; } @@ -704,7 +704,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type) case BSONType::ARRAY: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); if (size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", size); in.ignore(size - sizeof(size)); @@ -713,7 +713,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type) case BSONType::BINARY: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); in.ignore(size + 1); break; } @@ -738,14 +738,14 @@ static void skipBSONField(ReadBuffer & in, BSONType type) case BSONType::DB_POINTER: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); in.ignore(size + BSON_DB_POINTER_SIZE); break; } case BSONType::JAVA_SCRIPT_CODE_W_SCOPE: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); if (size < sizeof(BSONSizeT)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid java code_w_scope size: {}", size); in.ignore(size - sizeof(size)); @@ -787,7 +787,7 @@ bool BSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi size_t key_index = 0; current_document_start = in->count(); - readBinary(current_document_size, *in); + readBinaryLittleEndian(current_document_size, *in); if (current_document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", current_document_size); @@ -844,7 +844,7 @@ size_t BSONEachRowRowInputFormat::countRows(size_t max_block_size) BSONSizeT document_size; while (!in->eof() && num_rows < max_block_size) { - readBinary(document_size, *in); + readBinaryLittleEndian(document_size, *in); if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END)) throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size); in->ignore(document_size - sizeof(BSONSizeT)); @@ -893,7 +893,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo case BSONType::STRING: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); in.ignore(size); return std::make_shared(); } @@ -947,7 +947,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo case BSONType::BINARY: { BSONSizeT size; - readBinary(size, in); + readBinaryLittleEndian(size, in); auto subtype = getBSONBinarySubtype(readBSONType(in)); in.ignore(size); switch (subtype) @@ -982,7 +982,7 @@ NamesAndTypesList BSONEachRowSchemaReader::getDataTypesFromBSONDocument(bool all { size_t document_start = in.count(); BSONSizeT document_size; - readBinary(document_size, in); + readBinaryLittleEndian(document_size, in); NamesAndTypesList names_and_types; while (in.count() - document_start + sizeof(BSON_DOCUMENT_END) != document_size) { @@ -1028,7 +1028,7 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t while (!in.eof() && memory.size() < min_bytes && number_of_rows < max_rows) { BSONSizeT document_size; - readBinary(document_size, in); + readBinaryLittleEndian(document_size, in); if (document_size < sizeof(document_size)) throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid"); @@ -1045,7 +1045,13 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t size_t old_size = memory.size(); memory.resize(old_size + document_size); - unalignedStore(memory.data() + old_size, document_size); + + // Ensure the document size we write to the memory is byte arranged for LE. + BSONSizeT size_out = document_size; + if constexpr(std::endian::native == std::endian::big) + size_out = std::byteswap(size_out); + unalignedStore(memory.data() + old_size, size_out); + in.readStrict(memory.data() + old_size + sizeof(document_size), document_size - sizeof(document_size)); ++number_of_rows; } diff --git a/src/Processors/Formats/Impl/BSONEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/BSONEachRowRowOutputFormat.cpp index 2bb5410781c..b7f415ff449 100644 --- a/src/Processors/Formats/Impl/BSONEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/BSONEachRowRowOutputFormat.cpp @@ -58,7 +58,7 @@ static void writeBSONSize(size_t size, WriteBuffer & buf) if (size > MAX_BSON_SIZE) throw Exception(ErrorCodes::INCORRECT_DATA, "Too large document/value size: {}. Maximum allowed size: {}.", size, MAX_BSON_SIZE); - writePODBinary(BSONSizeT(size), buf); + writeBinaryLittleEndian(BSONSizeT(size), buf); } template @@ -79,7 +79,7 @@ template static void writeBSONNumber(BSONType type, const IColumn & column, size_t row_num, const String & name, WriteBuffer & buf) { writeBSONTypeAndKeyName(type, name, buf); - writePODBinary(assert_cast(column).getElement(row_num), buf); + writeBinaryLittleEndian(ValueType(assert_cast(column).getElement(row_num)), buf); } template @@ -109,8 +109,7 @@ static void writeBSONBigInteger(const IColumn & column, size_t row_num, const St writeBSONTypeAndKeyName(BSONType::BINARY, name, buf); writeBSONSize(sizeof(typename ColumnType::ValueType), buf); writeBSONType(BSONBinarySubtype::BINARY, buf); - auto data = assert_cast(column).getDataAt(row_num); - buf.write(data.data, data.size); + writeBinaryLittleEndian(assert_cast(column).getElement(row_num), buf); } size_t BSONEachRowRowOutputFormat::countBSONFieldSize(const IColumn & column, const DataTypePtr & data_type, size_t row_num, const String & name, const String & path, std::unordered_map & nested_document_sizes) @@ -407,7 +406,7 @@ void BSONEachRowRowOutputFormat::serializeField(const IColumn & column, const Da writeBSONTypeAndKeyName(BSONType::BINARY, name, out); writeBSONSize(sizeof(UUID), out); writeBSONType(BSONBinarySubtype::UUID, out); - writeBinary(assert_cast(column).getElement(row_num), out); + writeBinaryLittleEndian(assert_cast(column).getElement(row_num), out); break; } case TypeIndex::LowCardinality: