dbms: NULL support for MergeTree [#METR-19266]

This commit is contained in:
Alexey Arno 2016-07-24 22:32:21 +03:00
parent beeeb0ab13
commit 1fe6786b78
5 changed files with 205 additions and 53 deletions

View File

@ -349,6 +349,7 @@ namespace ErrorCodes
extern const int INVALID_FUNCTION_GENUS = 343;
extern const int SUPPORT_IS_DISABLED = 344;
extern const int TABLE_DIFFERS_TOO_MUCH = 345;
extern const int RESHARDING_NULLABLE_SHARDING_KEY = 346;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -16,6 +16,7 @@
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/Common/localBackup.h>
#include <DB/Functions/FunctionFactory.h>
#include <Poco/DirectoryIterator.h>
@ -109,13 +110,19 @@ void MergeTreeData::initPrimaryKey()
size_t primary_key_size = primary_key_sample.columns();
/// Primary key cannot contain constants. It is meaningless.
/// A primary key cannot contain constants. It is meaningless.
/// (And also couldn't work because primary key is serialized with method of IDataType that doesn't support constants).
/// Also a primary key must not contain any nullable column.
for (size_t i = 0; i < primary_key_size; ++i)
{
const ColumnPtr & column = primary_key_sample.unsafeGetByPosition(i).column;
const auto & element = primary_key_sample.unsafeGetByPosition(i);
const ColumnPtr & column = element.column;
if (column && column->isConst())
throw Exception("Primary key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN);
throw Exception{"Primary key cannot contain constants", ErrorCodes::ILLEGAL_COLUMN};
if (element.type->isNullable())
throw Exception{"Primary key cannot contain nullable columns", ErrorCodes::ILLEGAL_COLUMN};
}
primary_key_data_types.resize(primary_key_size);
@ -604,18 +611,34 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
for (const NameAndTypePair & column : old_columns)
{
bool is_nullable = column.type.get()->isNullable();
if (!new_types.count(column.name))
{
if (!part || part->hasColumnFiles(column.name))
{
/// Столбец нужно удалить.
DataTypePtr observed_type;
if (is_nullable)
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*(column.type.get()));
observed_type = nullable_type.getNestedType();
}
else
observed_type = column.type;
String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_column + ".bin"] = "";
out_rename_map[escaped_column + ".mrk"] = "";
if (is_nullable)
{
out_rename_map[escaped_column + ".null"] = "";
out_rename_map[escaped_column + ".null_mrk"] = "";
}
/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
if (typeid_cast<const DataTypeArray *>(&*column.type))
if (typeid_cast<const DataTypeArray *>(observed_type.get()))
{
String nested_table = DataTypeNested::extractNestedTableName(column.name);
/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
@ -671,6 +694,12 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const String escaped_column = escapeForFileName(column.name);
out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
if (is_nullable)
{
out_rename_map[escaped_expr + ".null"] = escaped_column + ".null";
out_rename_map[escaped_expr + ".null_mrk"] = escaped_column + ".null_mrk";
}
}
}
}
@ -1359,6 +1388,9 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
/// For nullable columns.
const auto null_file_name = escaped_name + ".null";
const auto null_mrk_file_name = escaped_name + ".null_mrk";
auto & column_size = column_sizes[column.name];
@ -1367,6 +1399,12 @@ void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
if (files.count(mrk_file_name))
column_size += files.find(mrk_file_name)->second.file_size;
if (files.count(null_file_name))
column_size += files.at(null_file_name).file_size;
if (files.count(null_mrk_file_name))
column_size += files.at(null_mrk_file_name).file_size;
}
}
@ -1379,6 +1417,9 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
const auto escaped_name = escapeForFileName(column.name);
const auto bin_file_name = escaped_name + ".bin";
const auto mrk_file_name = escaped_name + ".mrk";
/// For nullable columns.
const auto null_file_name = escaped_name + ".null";
const auto null_mrk_file_name = escaped_name + ".null_mrk";
auto & column_size = column_sizes[column.name];
@ -1387,6 +1428,12 @@ void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part
if (files.count(mrk_file_name))
column_size -= files.find(mrk_file_name)->second.file_size;
if (files.count(null_file_name))
column_size -= files.at(null_file_name).file_size;
if (files.count(null_mrk_file_name))
column_size -= files.at(null_mrk_file_name).file_size;
}
}

View File

@ -221,16 +221,24 @@ void MergeTreeDataPartChecksums::addFile(const String & file_name, size_t file_s
files[file_name] = Checksum(file_size, file_hash);
}
/// Контрольная сумма от множества контрольных сумм .bin файлов.
/// Control sum computed from the set of control sums of .bin files.
/// For nullable columns, .null files are taken into account as well.
void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
{
static constexpr auto bin_len = strlen_constexpr(".bin");
static constexpr auto null_len = strlen_constexpr(".null");
/// Пользуемся тем, что итерирование в детерминированном (лексикографическом) порядке.
for (const auto & it : files)
{
const String & name = it.first;
const Checksum & sum = it.second;
if (name.size() < strlen(".bin") || name.substr(name.size() - 4) != ".bin")
if (name.size() < bin_len || name.substr(name.size() - bin_len) != ".bin")
continue;
if (name.size() < null_len || name.substr(name.size() - null_len) != ".null")
continue;
size_t len = name.size();
hash.update(reinterpret_cast<const char *>(&len), sizeof(len));
hash.update(name.data(), len);
@ -260,7 +268,8 @@ MergeTreeDataPartChecksums MergeTreeDataPartChecksums::parse(const String & s)
}
/// Returns the size of .bin file for column `name` if found, zero otherwise
/// Returns the size of .bin file for column `name` if found, zero otherwise.
/// If this column is nullable, take into account the size of the .null file as well.
std::size_t MergeTreeDataPart::getColumnSize(const String & name) const
{
if (checksums.empty())
@ -273,7 +282,14 @@ std::size_t MergeTreeDataPart::getColumnSize(const String & name) const
if (0 == files.count(bin_file_name))
return {};
return files.find(bin_file_name)->second.file_size;
const auto null_file_name = escapeForFileName(name) + ".null";
size_t null_size;
if (0 == files.count(null_file_name))
null_size = 0;
else
null_size = files.at(null_file_name).file_size;
return files.at(bin_file_name).file_size + null_size;
}
/** Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
@ -501,8 +517,13 @@ void MergeTreeDataPart::loadColumns(bool require)
/// Если нет файла со списком столбцов, запишем его.
for (const NameAndTypePair & column : *storage.columns)
{
if (Poco::File(storage.full_path + name + "/" + escapeForFileName(column.name) + ".bin").exists())
columns.push_back(column);
const auto prefix = storage.full_path + name + "/" + escapeForFileName(column.name);
if (Poco::File(prefix + ".bin").exists())
{
if (!column.type.get()->isNullable()
|| Poco::File(prefix + ".null").exists())
columns.push_back(column);
}
}
if (columns.empty())
@ -538,6 +559,12 @@ void MergeTreeDataPart::checkNotBroken(bool require_part_metadata)
if (!checksums.files.count(name + ".mrk") ||
!checksums.files.count(name + ".bin"))
throw Exception("No .mrk or .bin file checksum for column " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
if (it.type.get()->isNullable())
{
if (!checksums.files.count(name + ".null_mrk") ||
!checksums.files.count(name + ".null"))
throw Exception("No .null_mrk or .mrk file checksum for column " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
}
}
}
@ -556,30 +583,36 @@ void MergeTreeDataPart::checkNotBroken(bool require_part_metadata)
/// Проверяем, что все засечки непусты и имеют одинаковый размер.
ssize_t marks_size = -1;
for (const NameAndTypePair & it : columns)
auto check_marks = [](const std::string & path, const NamesAndTypesList & columns, const std::string & extension)
{
Poco::File marks_file(path + "/" + escapeForFileName(it.name) + ".mrk");
/// При добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
if (!marks_file.exists())
continue;
if (marks_size == -1)
ssize_t marks_size = -1;
for (const NameAndTypePair & it : columns)
{
marks_size = marks_file.getSize();
Poco::File marks_file(path + "/" + escapeForFileName(it.name) + extension);
if (0 == marks_size)
throw Exception("Part " + path + " is broken: " + marks_file.path() + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
/// При добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
if (!marks_file.exists())
continue;
if (marks_size == -1)
{
marks_size = marks_file.getSize();
if (0 == marks_size)
throw Exception("Part " + path + " is broken: " + marks_file.path() + " is empty.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
else
{
if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}
else
{
if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
throw Exception("Part " + path + " is broken: marks have different sizes.",
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}
}
};
check_marks(path, columns, ".mrk");
check_marks(path, columns, ".null_mrk");
}
}

View File

@ -29,23 +29,26 @@ namespace ErrorCodes
namespace
{
constexpr auto DATA_FILE_EXTENSION = ".bin";
constexpr auto NULL_MAP_EXTENSION = ".null";
constexpr auto MARKS_FILE_EXTENSION = ".mrk";
constexpr auto NULL_MARKS_FILE_EXTENSION = ".null_mrk";
/// bin / mrk
/// null / null_mrk
struct Stream
{
String path;
String name;
DataTypePtr type;
ReadBufferFromFile file_buf;
HashingReadBuffer compressed_hashing_buf;
CompressedReadBuffer uncompressing_buf;
HashingReadBuffer uncompressed_hashing_buf;
ReadBufferFromFile mrk_file_buf;
HashingReadBuffer mrk_hashing_buf;
Stream(const String & path, const String & name, const DataTypePtr & type) : path(path), name(name), type(type),
file_buf(path + name + ".bin"), compressed_hashing_buf(file_buf), uncompressing_buf(compressed_hashing_buf),
uncompressed_hashing_buf(uncompressing_buf), mrk_file_buf(path + name + ".mrk"), mrk_hashing_buf(mrk_file_buf)
public:
Stream(const String & path, const String & name, const DataTypePtr & type,
const std::string & extension_, const std::string & mrk_extension_)
: path(path), name(name), type(type),
extension{extension_}, mrk_extension{mrk_extension_},
file_buf(path + name + extension), compressed_hashing_buf(file_buf),
uncompressing_buf(compressed_hashing_buf),
uncompressed_hashing_buf(uncompressing_buf),
mrk_file_buf(path + name + mrk_extension),
mrk_hashing_buf(mrk_file_buf)
{
/// Stream создаётся для типа - внутренностей массива. Случай, когда внутренность массива - массив - не поддерживается.
if (typeid_cast<const DataTypeArray *>(type.get()))
@ -117,7 +120,7 @@ struct Stream
if (mrk_mark != data_mark)
throw Exception("Incorrect mark: " + data_mark.toString() +
(has_alternative_mark ? " or " + alternative_data_mark.toString() : "") + " in data, " +
mrk_mark.toString() + " in .mrk file", ErrorCodes::INCORRECT_MARK);
mrk_mark.toString() + " in " + mrk_extension + " file", ErrorCodes::INCORRECT_MARK);
}
void assertEnd(MergeTreeData::DataPart::Checksums & checksums)
@ -127,17 +130,68 @@ struct Stream
if (!mrk_hashing_buf.eof())
throw Exception("EOF expected in .mrk file", ErrorCodes::CORRUPTED_DATA);
checksums.files[name + ".bin"] = MergeTreeData::DataPart::Checksums::Checksum(
checksums.files[name + extension] = MergeTreeData::DataPart::Checksums::Checksum(
compressed_hashing_buf.count(), compressed_hashing_buf.getHash(),
uncompressed_hashing_buf.count(), uncompressed_hashing_buf.getHash());
checksums.files[name + ".mrk"] = MergeTreeData::DataPart::Checksums::Checksum(
checksums.files[name + mrk_extension] = MergeTreeData::DataPart::Checksums::Checksum(
mrk_hashing_buf.count(), mrk_hashing_buf.getHash());
}
public:
String path;
String name;
DataTypePtr type;
std::string extension;
std::string mrk_extension;
ReadBufferFromFile file_buf;
HashingReadBuffer compressed_hashing_buf;
CompressedReadBuffer uncompressing_buf;
HashingReadBuffer uncompressed_hashing_buf;
ReadBufferFromFile mrk_file_buf;
HashingReadBuffer mrk_hashing_buf;
};
/// Returns the number of rows. Updates the "checksums" variable with the checksum of
/// each column's null byte map file.
size_t checkNullableColumn(const String & path,
const String & name,
const MergeTreePartChecker::Settings & settings,
MergeTreeData::DataPart::Checksums & checksums,
volatile bool * is_cancelled)
{
size_t rows = 0;
/// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца.
static size_t checkColumn(
DataTypePtr type = std::make_shared<DataTypeUInt8>();
Stream data_stream(path, escapeForFileName(name), type,
NULL_MAP_EXTENSION, NULL_MARKS_FILE_EXTENSION);
while (true)
{
if (is_cancelled && *is_cancelled)
return 0;
if (data_stream.marksEOF())
break;
data_stream.assertMark();
size_t cur_rows = data_stream.read(settings.index_granularity);
rows += cur_rows;
if (cur_rows < settings.index_granularity)
break;
}
data_stream.assertEnd(checksums);
return rows;
}
/// Returns the number of rows. Updates the "checksums" variable with the checksum of
/// each column's bin file.
size_t checkColumn(
const String & path,
const String & name,
DataTypePtr type,
@ -152,8 +206,10 @@ static size_t checkColumn(
if (auto array = typeid_cast<const DataTypeArray *>(type.get()))
{
String sizes_name = DataTypeNested::extractNestedTableName(name);
Stream sizes_stream(path, escapeForFileName(sizes_name) + ".size0", std::make_shared<DataTypeUInt64>());
Stream data_stream(path, escapeForFileName(name), array->getNestedType());
Stream sizes_stream(path, escapeForFileName(sizes_name) + ".size0", std::make_shared<DataTypeUInt64>(),
DATA_FILE_EXTENSION, MARKS_FILE_EXTENSION);
Stream data_stream(path, escapeForFileName(name), array->getNestedType(),
DATA_FILE_EXTENSION, MARKS_FILE_EXTENSION);
ColumnUInt64::Container_t sizes;
while (true)
@ -192,7 +248,8 @@ static size_t checkColumn(
}
else
{
Stream data_stream(path, escapeForFileName(name), type);
Stream data_stream(path, escapeForFileName(name), type,
DATA_FILE_EXTENSION, MARKS_FILE_EXTENSION);
size_t rows = 0;
while (true)
@ -337,6 +394,15 @@ void MergeTreePartChecker::checkDataPart(
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
if (column.type.get()->isNullable())
{
size_t row_count_from_null_map = checkNullableColumn(path,
column.name, settings, checksums_data, is_cancelled);
if (row_count_from_null_map != rows)
throw Exception{"Inconsistent number of rows in null byte map for column " + column.name,
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
}
ok = true;
}
catch (...)

View File

@ -88,6 +88,7 @@ namespace ErrorCodes
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int UNFINISHED;
extern const int METADATA_MISMATCH;
extern const int RESHARDING_NULLABLE_SHARDING_KEY;
}
@ -3318,6 +3319,10 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
if (has_coordinator)
block_number = resharding_worker.subscribe(coordinator_id, queryToString(query));
NameAndTypePair column_desc = ITableDeclaration::getColumn(sharding_key_expr->getColumnName());
if (column_desc.type.get()->isNullable())
throw Exception{"Sharding key must not be nullable", ErrorCodes::RESHARDING_NULLABLE_SHARDING_KEY};
for (const auto & weighted_path : weighted_zookeeper_paths)
{
UInt64 weight = weighted_path.second;