ClickHouse/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp

440 lines
13 KiB
C++
Raw Normal View History

#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
2015-12-28 14:24:31 +00:00
#include <DB/DataStreams/MarkInCompressedFile.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeFixedString.h>
2014-08-13 08:53:38 +00:00
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/HashingReadBuffer.h>
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Common/CurrentMetrics.h>
2016-02-14 04:58:47 +00:00
#include <DB/Common/escapeForFileName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int INCORRECT_MARK;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int UNKNOWN_TYPE;
}
namespace
{
struct Stream
{
2014-08-13 08:53:38 +00:00
static const size_t UNKNOWN = std::numeric_limits<size_t>::max();
bool data_type_is_string = false;
size_t data_type_fixed_length = 0;
String path;
String name;
ReadBufferFromFile file_buf;
HashingReadBuffer compressed_hashing_buf;
CompressedReadBuffer uncompressing_buf;
HashingReadBuffer uncompressed_hashing_buf;
ReadBufferFromFile mrk_file_buf;
HashingReadBuffer mrk_hashing_buf;
Stream(const String & path_, const String & name_, DataTypePtr type) : path(path_), name(name_),
file_buf(path + name + ".bin"), compressed_hashing_buf(file_buf), uncompressing_buf(compressed_hashing_buf),
uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf)
{
data_type_is_string = typeid_cast<const DataTypeString *>(type.get());
if (!data_type_is_string)
{
if (typeid_cast<const DataTypeUInt8 *>(type.get())
|| typeid_cast<const DataTypeInt8 *>(type.get()))
data_type_fixed_length = sizeof(UInt8);
else if (typeid_cast<const DataTypeUInt16 *>(type.get())
|| typeid_cast<const DataTypeInt16 *>(type.get())
|| typeid_cast<const DataTypeDate *>(type.get()))
data_type_fixed_length = sizeof(UInt16);
else if (typeid_cast<const DataTypeUInt32 *>(type.get())
|| typeid_cast<const DataTypeInt32 *>(type.get())
|| typeid_cast<const DataTypeFloat32 *>(type.get())
|| typeid_cast<const DataTypeDateTime *>(type.get()))
data_type_fixed_length = sizeof(UInt32);
else if (typeid_cast<const DataTypeUInt64 *>(type.get())
|| typeid_cast<const DataTypeInt64 *>(type.get())
|| typeid_cast<const DataTypeFloat64 *>(type.get()))
data_type_fixed_length = sizeof(UInt64);
else if (auto string = typeid_cast<const DataTypeFixedString *>(type.get()))
data_type_fixed_length = string->getN();
else
throw Exception("Unexpected data type: " + type->getName() + " of column " + name, ErrorCodes::UNKNOWN_TYPE);
}
}
2014-07-22 08:50:08 +00:00
bool marksEOF()
{
2014-07-22 08:50:08 +00:00
return mrk_hashing_buf.eof();
}
2014-08-13 08:53:38 +00:00
void ignore()
{
uncompressed_hashing_buf.ignore(std::numeric_limits<size_t>::max());
mrk_hashing_buf.ignore(std::numeric_limits<size_t>::max());
}
size_t read(size_t rows)
{
if (data_type_is_string)
{
for (size_t i = 0; i < rows; ++i)
{
if (uncompressed_hashing_buf.eof())
return i;
UInt64 size;
readVarUInt(size, uncompressed_hashing_buf);
if (size > (1ul << 31))
throw Exception("A string of length " + toString(size) + " is too long.", ErrorCodes::CORRUPTED_DATA);
uncompressed_hashing_buf.ignore(size);
}
return rows;
}
else
{
size_t size = uncompressed_hashing_buf.tryIgnore(data_type_fixed_length * rows);
if (size % data_type_fixed_length)
throw Exception("Read " + toString(size) + " bytes, which is not divisible by " + toString(data_type_fixed_length),
ErrorCodes::CORRUPTED_DATA);
return size / data_type_fixed_length;
}
}
size_t readUInt64(size_t rows, ColumnUInt64::Container_t & data)
{
if (data.size() < rows)
data.resize(rows);
size_t size = uncompressed_hashing_buf.readBig(reinterpret_cast<char *>(&data[0]), sizeof(UInt64) * rows);
if (size % sizeof(UInt64))
throw Exception("Read " + toString(size) + " bytes, which is not divisible by " + toString(sizeof(UInt64)),
ErrorCodes::CORRUPTED_DATA);
return size / sizeof(UInt64);
}
2014-08-08 08:28:13 +00:00
void assertMark()
{
MarkInCompressedFile mrk_mark;
readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf);
readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf);
2014-08-06 09:25:38 +00:00
bool has_alternative_mark = false;
MarkInCompressedFile alternative_data_mark;
MarkInCompressedFile data_mark;
2014-08-06 09:25:38 +00:00
/// Если засечка должна быть ровно на границе блоков, нам подходит и засечка, указывающая на конец предыдущего блока,
/// и на начало следующего.
if (!uncompressed_hashing_buf.hasPendingData())
2014-07-22 08:50:08 +00:00
{
2014-08-06 09:25:38 +00:00
/// Получим засечку, указывающую на конец предыдущего блока.
has_alternative_mark = true;
alternative_data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
alternative_data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
2014-07-22 08:50:08 +00:00
2014-08-06 09:25:38 +00:00
if (mrk_mark == alternative_data_mark)
2014-07-23 09:50:29 +00:00
return;
2014-07-22 10:35:24 +00:00
uncompressed_hashing_buf.next();
2014-08-06 09:25:38 +00:00
/// В конце файла compressed_hashing_buf.count() указывает на конец файла даже до вызова next(),
/// и только что выполненная проверка работает неправильно. Для простоты не будем проверять последнюю засечку.
if (uncompressed_hashing_buf.eof())
return;
2014-07-22 10:35:24 +00:00
}
2014-07-22 10:35:24 +00:00
data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed();
data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset();
if (mrk_mark != data_mark)
2014-08-06 09:25:38 +00:00
throw Exception("Incorrect mark: " + data_mark.toString() +
(has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
mrk_mark.toString() + " in .mrk file", ErrorCodes::INCORRECT_MARK);
}
void assertEnd(MergeTreeData::DataPart::Checksums & checksums)
{
if (!uncompressed_hashing_buf.eof())
throw Exception("EOF expected in column data", ErrorCodes::CORRUPTED_DATA);
if (!mrk_hashing_buf.eof())
throw Exception("EOF expected in .mrk file", ErrorCodes::CORRUPTED_DATA);
checksums.files[name + ".bin"] = MergeTreeData::DataPart::Checksums::Checksum(
compressed_hashing_buf.count(), compressed_hashing_buf.getHash(),
uncompressed_hashing_buf.count(), uncompressed_hashing_buf.getHash());
checksums.files[name + ".mrk"] = MergeTreeData::DataPart::Checksums::Checksum(
mrk_hashing_buf.count(), mrk_hashing_buf.getHash());
}
};
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
static size_t checkColumn(
const String & path,
const String & name,
DataTypePtr type,
const MergeTreePartChecker::Settings & settings,
MergeTreeData::DataPart::Checksums & checksums,
volatile bool * is_cancelled)
{
size_t rows = 0;
try
{
2016-03-29 23:03:15 +00:00
if (auto array = typeid_cast<const DataTypeArray *>(type.get()))
{
2014-07-22 08:50:08 +00:00
String sizes_name = DataTypeNested::extractNestedTableName(name);
Stream sizes_stream(path, escapeForFileName(sizes_name) + ".size0", new DataTypeUInt64);
Stream data_stream(path, escapeForFileName(name), array->getNestedType());
ColumnUInt64::Container_t sizes;
while (true)
{
if (is_cancelled && *is_cancelled)
return 0;
2014-07-22 08:50:08 +00:00
if (sizes_stream.marksEOF())
break;
2014-08-08 08:28:13 +00:00
sizes_stream.assertMark();
data_stream.assertMark();
2014-08-08 08:28:13 +00:00
size_t cur_rows = sizes_stream.readUInt64(settings.index_granularity, sizes);
size_t sum = 0;
for (size_t i = 0; i < cur_rows; ++i)
{
size_t new_sum = sum + sizes[i];
if (sizes[i] > (1ul << 31) || new_sum < sum)
throw Exception("Array size " + toString(sizes[i]) + " is too long.", ErrorCodes::CORRUPTED_DATA);
sum = new_sum;
}
data_stream.read(sum);
rows += cur_rows;
2014-08-08 08:28:13 +00:00
if (cur_rows < settings.index_granularity)
break;
}
sizes_stream.assertEnd(checksums);
data_stream.assertEnd(checksums);
return rows;
}
2016-03-29 23:03:15 +00:00
else if (typeid_cast<const DataTypeAggregateFunction *>(type.get()))
2014-08-13 08:53:38 +00:00
{
Stream data_stream(path, escapeForFileName(name), type);
data_stream.ignore();
return Stream::UNKNOWN;
}
else
{
2014-07-22 08:50:08 +00:00
Stream data_stream(path, escapeForFileName(name), type);
size_t rows = 0;
while (true)
{
if (is_cancelled && *is_cancelled)
return 0;
2014-07-22 08:50:08 +00:00
if (data_stream.marksEOF())
break;
2014-08-08 08:28:13 +00:00
data_stream.assertMark();
2014-08-08 08:28:13 +00:00
size_t cur_rows = data_stream.read(settings.index_granularity);
2014-08-13 08:53:38 +00:00
if (cur_rows == Stream::UNKNOWN)
rows = Stream::UNKNOWN;
else
rows += cur_rows;
2014-08-08 08:28:13 +00:00
if (cur_rows < settings.index_granularity)
break;
}
data_stream.assertEnd(checksums);
return rows;
}
}
catch (DB::Exception & e)
{
e.addMessage(" (column: " + path + name + ", last mark at " + toString(rows) + " rows)");
throw;
}
}
}
void MergeTreePartChecker::checkDataPart(
String path,
const Settings & settings,
const DataTypes & primary_key_data_types,
MergeTreeData::DataPart::Checksums * out_checksums,
volatile bool * is_cancelled)
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
2014-10-09 23:14:06 +00:00
if (!path.empty() && path.back() != '/')
path += "/";
NamesAndTypesList columns;
2014-11-30 05:11:04 +00:00
/// Чексуммы из файла checksums.txt. Могут отсутствовать. Если присутствуют - впоследствии сравниваются с реальными чексуммами данных.
MergeTreeData::DataPart::Checksums checksums_txt;
{
ReadBufferFromFile buf(path + "columns.txt");
columns.readText(buf);
assertEOF(buf);
}
2014-08-08 08:28:13 +00:00
if (settings.require_checksums || Poco::File(path + "checksums.txt").exists())
{
ReadBufferFromFile buf(path + "checksums.txt");
checksums_txt.read(buf);
assertEOF(buf);
}
2014-11-30 05:11:04 +00:00
/// Реальные чексуммы по содержимому данных. Их несоответствие checksums_txt будет говорить о битых данных.
MergeTreeData::DataPart::Checksums checksums_data;
2014-07-22 10:35:24 +00:00
size_t marks_in_primary_key = 0;
2014-07-22 10:35:24 +00:00
{
ReadBufferFromFile file_buf(path + "primary.idx");
HashingReadBuffer hashing_buf(file_buf);
if (!primary_key_data_types.empty())
{
size_t key_size = primary_key_data_types.size();
Columns tmp_columns(key_size);
for (size_t j = 0; j < key_size; ++j)
tmp_columns[j] = primary_key_data_types[j].get()->createColumn();
while (!hashing_buf.eof())
{
if (is_cancelled && *is_cancelled)
return;
++marks_in_primary_key;
for (size_t j = 0; j < key_size; ++j)
primary_key_data_types[j].get()->deserializeBinary(*tmp_columns[j].get(), hashing_buf);
}
}
else
{
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
}
size_t primary_idx_size = hashing_buf.count();
2014-07-22 10:35:24 +00:00
checksums_data.files["primary.idx"] = MergeTreeData::DataPart::Checksums::Checksum(primary_idx_size, hashing_buf.getHash());
}
if (is_cancelled && *is_cancelled)
return;
2014-08-13 08:53:38 +00:00
String any_column_name;
size_t rows = Stream::UNKNOWN;
2015-10-05 05:40:27 +00:00
std::exception_ptr first_exception;
2014-07-22 10:35:24 +00:00
for (const NameAndTypePair & column : columns)
{
2014-08-08 08:28:13 +00:00
if (settings.verbose)
2014-07-22 10:35:24 +00:00
{
std::cerr << column.name << ":";
std::cerr.flush();
}
2014-07-22 08:50:08 +00:00
2014-07-22 10:35:24 +00:00
bool ok = false;
try
{
2014-08-08 08:28:13 +00:00
if (!settings.require_column_files && !Poco::File(path + escapeForFileName(column.name) + ".bin").exists())
2014-07-22 10:35:24 +00:00
{
2014-08-08 08:28:13 +00:00
if (settings.verbose)
2014-07-22 10:35:24 +00:00
std::cerr << " no files" << std::endl;
continue;
}
size_t cur_rows = checkColumn(path, column.name, column.type, settings, checksums_data, is_cancelled);
if (is_cancelled && *is_cancelled)
return;
2014-08-13 08:53:38 +00:00
if (cur_rows != Stream::UNKNOWN)
2014-07-22 10:35:24 +00:00
{
2014-08-13 08:53:38 +00:00
if (rows == Stream::UNKNOWN)
{
rows = cur_rows;
any_column_name = column.name;
}
else if (rows != cur_rows)
{
throw Exception("Different number of rows in columns " + any_column_name + " and " + column.name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
2014-07-22 10:35:24 +00:00
}
ok = true;
}
2014-07-22 10:35:24 +00:00
catch (...)
{
2014-08-08 08:28:13 +00:00
if (!settings.verbose)
2014-07-22 10:35:24 +00:00
throw;
2015-10-05 05:40:27 +00:00
std::exception_ptr e = std::current_exception();
2014-07-22 10:35:24 +00:00
if (!first_exception)
first_exception = e;
2015-10-05 05:40:27 +00:00
std::cerr << getCurrentExceptionMessage(true) << std::endl;
}
2014-08-08 08:28:13 +00:00
if (settings.verbose && ok)
2014-07-22 10:35:24 +00:00
std::cerr << " ok" << std::endl;
}
2014-08-13 08:53:38 +00:00
if (rows == Stream::UNKNOWN)
2014-07-22 08:50:08 +00:00
throw Exception("No columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
if (!primary_key_data_types.empty())
{
const size_t expected_marks = (rows - 1) / settings.index_granularity + 1;
if (expected_marks != marks_in_primary_key)
throw Exception("Size of primary key doesn't match expected number of marks."
" Number of rows in columns: " + toString(rows)
+ ", index_granularity: " + toString(settings.index_granularity)
+ ", expected number of marks: " + toString(expected_marks)
+ ", size of primary key: " + toString(marks_in_primary_key),
ErrorCodes::CORRUPTED_DATA);
}
2014-07-22 10:35:24 +00:00
2014-08-08 08:28:13 +00:00
if (settings.require_checksums || !checksums_txt.files.empty())
2014-07-22 10:35:24 +00:00
checksums_txt.checkEqual(checksums_data, true);
if (first_exception)
2015-10-05 05:40:27 +00:00
std::rethrow_exception(first_exception);
2014-11-30 05:11:04 +00:00
if (out_checksums)
*out_checksums = checksums_data;
}
}