Unification of serde of data types: development [#CLICKHOUSE-2838].

This commit is contained in:
Alexey Milovidov 2017-12-03 07:19:46 +03:00
parent 294c0ba5cf
commit 3786786aeb
2 changed files with 25 additions and 11 deletions

View File

@ -49,7 +49,7 @@ if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "i386")
endif () endif ()
if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386) if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386)
set (ARCH_32 1) set (ARCH_32 1)
message (WARNING "Support 32bit platforms is highly experimental") message (WARNING "Support for 32bit platforms is highly experimental")
endif () endif ()
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wall") # -Werror is also added inside directories with our own code. set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wall") # -Werror is also added inside directories with our own code.

View File

@ -34,8 +34,8 @@ namespace
struct Stream struct Stream
{ {
String base_name; String base_name;
String bin_file_name; String bin_file_path;
String mrk_file_name; String mrk_file_path;
ReadBufferFromFile file_buf; ReadBufferFromFile file_buf;
HashingReadBuffer compressed_hashing_buf; HashingReadBuffer compressed_hashing_buf;
@ -48,13 +48,13 @@ struct Stream
Stream(const String & path, const String & base_name) Stream(const String & path, const String & base_name)
: :
base_name(base_name), base_name(base_name),
bin_file_name(path + base_name + ".bin"), bin_file_path(path + base_name + ".bin"),
mrk_file_name(path + base_name + ".mrk"), mrk_file_path(path + base_name + ".mrk"),
file_buf(bin_file_name), file_buf(bin_file_path),
compressed_hashing_buf(file_buf), compressed_hashing_buf(file_buf),
uncompressing_buf(compressed_hashing_buf), uncompressing_buf(compressed_hashing_buf),
uncompressed_hashing_buf(uncompressing_buf), uncompressed_hashing_buf(uncompressing_buf),
mrk_file_buf(mrk_file_name), mrk_file_buf(mrk_file_path),
mrk_hashing_buf(mrk_file_buf) mrk_hashing_buf(mrk_file_buf)
{} {}
@ -94,15 +94,15 @@ struct Stream
if (mrk_mark != data_mark) if (mrk_mark != data_mark)
throw Exception("Incorrect mark: " + data_mark.toString() + throw Exception("Incorrect mark: " + data_mark.toString() +
(has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " + (has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
mrk_mark.toString() + " in " + mrk_file_name + " file", ErrorCodes::INCORRECT_MARK); mrk_mark.toString() + " in " + mrk_file_path + " file", ErrorCodes::INCORRECT_MARK);
} }
void assertEnd() void assertEnd()
{ {
if (!uncompressed_hashing_buf.eof()) if (!uncompressed_hashing_buf.eof())
throw Exception("EOF expected in " + bin_file_name + " file", ErrorCodes::CORRUPTED_DATA); throw Exception("EOF expected in " + bin_file_path + " file", ErrorCodes::CORRUPTED_DATA);
if (!mrk_hashing_buf.eof()) if (!mrk_hashing_buf.eof())
throw Exception("EOF expected in " + mrk_file_name + " file", ErrorCodes::CORRUPTED_DATA); throw Exception("EOF expected in " + mrk_file_path + " file", ErrorCodes::CORRUPTED_DATA);
} }
void saveChecksums(MergeTreeData::DataPart::Checksums & checksums) void saveChecksums(MergeTreeData::DataPart::Checksums & checksums)
@ -235,6 +235,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
std::map<String, Stream> streams; std::map<String, Stream> streams;
size_t column_size = 0; size_t column_size = 0;
size_t mark_num = 0;
while (true) while (true)
{ {
@ -243,9 +244,22 @@ MergeTreeData::DataPart::Checksums checkDataPart(
{ {
String file_name = IDataType::getFileNameForStream(name_type.name, substream_path); String file_name = IDataType::getFileNameForStream(name_type.name, substream_path);
auto & stream = streams.try_emplace(file_name, path, file_name).first->second; auto & stream = streams.try_emplace(file_name, path, file_name).first->second;
stream.assertMark();
try
{
stream.assertMark();
}
catch (Exception & e)
{
e.addMessage("Cannot read mark " + toString(mark_num) + " at row " + toString(column_size)
+ " in file " + stream.mrk_file_path
+ ", mrk file offset: " + toString(stream.mrk_hashing_buf.count()));
throw;
}
}, {}); }, {});
++mark_num;
/// Read index_granularity rows from column. /// Read index_granularity rows from column.
ColumnPtr tmp_column = name_type.type->createColumn(); ColumnPtr tmp_column = name_type.type->createColumn();
name_type.type->deserializeBinaryBulkWithMultipleStreams( name_type.type->deserializeBinaryBulkWithMultipleStreams(