dbms: cleanup [#METR-19266]

This commit is contained in:
Alexey Arno 2016-08-19 15:01:10 +03:00
parent 7a4683140e
commit 2a5abb17be

View File

@ -31,8 +31,9 @@ namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto NULL_MAP_EXTENSION = ".null";
constexpr auto NULL_MAP_FILE_EXTENSION = ".null";
constexpr auto MARKS_FILE_EXTENSION = ".mrk";
constexpr auto NULL_MARKS_FILE_EXTENSION = ".null_mrk";
struct Stream
{
@ -150,8 +151,44 @@ public:
HashingReadBuffer mrk_hashing_buf;
};
/// Returns the number of rows. Updates the "checksums" variable with the checksum of
/// each column's bin file.
/// Updates the checksum value for the null map information of the
/// specified column. Returns the number of read rows.
size_t checkNullMap(const String & path,
const String & name,
const MergeTreePartChecker::Settings & settings,
MergeTreeData::DataPart::Checksums & checksums,
std::atomic<bool> * is_cancelled)
{
size_t rows = 0;
DataTypePtr type = std::make_shared<DataTypeUInt8>();
Stream data_stream(path, escapeForFileName(name), type,
NULL_MAP_FILE_EXTENSION, NULL_MARKS_FILE_EXTENSION);
while (true)
{
if (is_cancelled && *is_cancelled)
return 0;
if (data_stream.marksEOF())
break;
data_stream.assertMark();
size_t cur_rows = data_stream.read(settings.index_granularity);
rows += cur_rows;
if (cur_rows < settings.index_granularity)
break;
}
data_stream.assertEnd(checksums);
return rows;
}
/// Updates the checksum value for the specified column.
/// Returns the number of read rows.
size_t checkColumn(
const String & path,
const String & name,
@ -164,13 +201,7 @@ size_t checkColumn(
try
{
if (type->isNullable())
{
const auto & nullable_type = static_cast<const DataTypeNullable &>(*type);
auto nested_type = nullable_type.getNestedType();
return checkColumn(path, name, nested_type, settings, checksums, is_cancelled);
}
else if (auto array = typeid_cast<const DataTypeArray *>(type.get()))
if (auto array = typeid_cast<const DataTypeArray *>(type.get()))
{
String sizes_name = DataTypeNested::extractNestedTableName(name);
Stream sizes_stream(path, escapeForFileName(sizes_name) + ".size0", std::make_shared<DataTypeUInt64>(),
@ -218,7 +249,6 @@ size_t checkColumn(
Stream data_stream(path, escapeForFileName(name), type,
DATA_FILE_EXTENSION, MARKS_FILE_EXTENSION);
size_t rows = 0;
while (true)
{
if (is_cancelled && *is_cancelled)
@ -327,6 +357,19 @@ void MergeTreePartChecker::checkDataPart(
size_t rows = UNKNOWN;
std::exception_ptr first_exception;
/// Verify that the number of rows is consistent between all the columns.
auto check_row_count = [&rows, &any_column_name](size_t cur_rows, const std::string & col_name)
{
if (rows == UNKNOWN)
{
rows = cur_rows;
any_column_name = col_name;
}
else if (rows != cur_rows)
throw Exception{"Different number of rows in columns " + any_column_name + " and " + col_name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
};
for (const NameAndTypePair & column : columns)
{
if (settings.verbose)
@ -345,21 +388,32 @@ void MergeTreePartChecker::checkDataPart(
continue;
}
size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data, is_cancelled);
const DataTypePtr * observed_type;
/// If the current column is nullable, first we process its null map and the
/// corresponding marks.
if (column.type->isNullable())
{
const auto & nullable_type = static_cast<const DataTypeNullable &>(column.type);
observed_type = &nullable_type.getNestedType();
size_t cur_rows = checkNullMap(path, column.name, settings, checksums_data, is_cancelled);
if (is_cancelled && *is_cancelled)
return;
check_row_count(cur_rows, column.name);
}
else
observed_type = &column.type;
/// Update the checksum from the data of the column.
size_t cur_rows = checkColumn(path, column.name, *observed_type, settings, checksums_data, is_cancelled);
if (is_cancelled && *is_cancelled)
return;
if (rows == UNKNOWN)
{
rows = cur_rows;
any_column_name = column.name;
}
else if (rows != cur_rows)
{
throw Exception("Different number of rows in columns " + any_column_name + " and " + column.name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
check_row_count(cur_rows, column.name);
ok = true;
}