safeguards to protect against distinct partition values with the same partition_id [#CLICKHOUSE-3000]

This commit is contained in:
Alexey Zatelepin 2017-09-11 20:55:41 +03:00 committed by alexey-milovidov
parent 728a23f60e
commit 1be62b567e
10 changed files with 238 additions and 165 deletions

View File

@ -1293,6 +1293,15 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{
if (part->partition.value != existing_part_in_partition->partition.value)
throw Exception(
"Partition value mismatch between two parts with the same partition ID. Existing part: "
+ existing_part_in_partition->name + ", newly added part: " + part->name,
ErrorCodes::CORRUPTED_DATA);
}
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn't yet contain new part.
*/
@ -1407,18 +1416,6 @@ void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataParts
}
}
void MergeTreeData::attachPart(const DataPartPtr & part)
{
std::lock_guard<std::mutex> lock(data_parts_mutex);
std::lock_guard<std::mutex> lock_all(all_data_parts_mutex);
if (!all_data_parts.insert(part).second)
throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
data_parts.insert(part);
addPartContributionToColumnSizes(part);
}
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
{
LOG_INFO(log, "Renaming " << part->relative_path << " to " << prefix << part->name << " and detaching it.");
@ -1733,7 +1730,7 @@ void MergeTreeData::freezePartition(const ASTPtr & partition_ast, const String &
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
const auto & partition = dynamic_cast<const ASTPartition &>(*partition_ast);
/// Month-partitioning specific - allow partition ID can be passed in the partition value.
/// Month-partitioning specific - partition ID can be passed in the partition value.
if (const auto * partition_lit = dynamic_cast<const ASTLiteral *>(partition.value.get()))
prefix = partition_lit->value.getType() == Field::Types::UInt64
? toString(partition_lit->value.get<UInt64>())
@ -1803,17 +1800,17 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
return size;
}
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & partition_ast, const Context & context)
String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context & context)
{
const auto & partition = typeid_cast<const ASTPartition &>(*partition_ast);
const auto & partition_ast = typeid_cast<const ASTPartition &>(*ast);
if (!partition.value)
return partition.id;
if (!partition_ast.value)
return partition_ast.id;
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
/// Month-partitioning specific - allow partition ID can be passed in the partition value.
const auto * partition_lit = typeid_cast<const ASTLiteral *>(partition.value.get());
/// Month-partitioning specific - partition ID can be passed in the partition value.
const auto * partition_lit = typeid_cast<const ASTLiteral *>(partition_ast.value.get());
if (partition_lit && partition_lit->value.getType() == Field::Types::String)
{
String partition_id = partition_lit->value.get<String>();
@ -1828,9 +1825,9 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & partition_ast, cons
/// Re-parse partition key fields using the information about expected field types.
size_t fields_count = partition_expr_column_types.size();
if (partition.fields_count != fields_count)
if (partition_ast.fields_count != fields_count)
throw Exception(
"Wrong number of fields in the partition expression: " + toString(partition.fields_count) +
"Wrong number of fields in the partition expression: " + toString(partition_ast.fields_count) +
", must be: " + toString(fields_count),
ErrorCodes::INVALID_PARTITION_VALUE);
@ -1839,7 +1836,7 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & partition_ast, cons
if (fields_count)
{
ReadBufferFromMemory left_paren_buf("(", 1);
ReadBufferFromMemory fields_buf(partition.fields_str.data, partition.fields_str.size);
ReadBufferFromMemory fields_buf(partition_ast.fields_str.data, partition_ast.fields_str.size);
ReadBufferFromMemory right_paren_buf(")", 1);
ConcatReadBuffer buf({&left_paren_buf, &fields_buf, &right_paren_buf});
@ -1850,14 +1847,31 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & partition_ast, cons
if (!input_stream.read(block))
throw Exception(
"Could not parse partition value: `" + partition.fields_str.toString() + "`",
"Could not parse partition value: `" + partition_ast.fields_str.toString() + "`",
ErrorCodes::INVALID_PARTITION_VALUE);
for (size_t i = 0; i < fields_count; ++i)
block.getByPosition(i).column->get(0, partition_row[i]);
}
return MergeTreeDataPart::Partition(std::move(partition_row)).getID(*this);
MergeTreePartition partition(std::move(partition_row));
String partition_id = partition.getID(*this);
{
std::lock_guard<std::mutex> data_parts_lock(data_parts_mutex);
DataPartPtr existing_part_in_partition = getAnyPartInPartition(partition_id, data_parts_lock);
if (existing_part_in_partition && existing_part_in_partition->partition.value != partition.value)
{
WriteBufferFromOwnString buf;
writeCString("Parsed partition value: ", buf);
partition.serializeTextQuoted(*this, buf);
writeCString(" doesn't match partition value for an existing part with the same partition ID: ", buf);
writeString(existing_part_in_partition->name, buf);
throw Exception(buf.str(), ErrorCodes::INVALID_PARTITION_VALUE);
}
}
return partition_id;
}
void MergeTreeData::Transaction::rollback()
@ -1888,5 +1902,15 @@ void MergeTreeData::Transaction::rollback()
}
}
MergeTreeData::DataPartPtr MergeTreeData::getAnyPartInPartition(
const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock)
{
auto min_block = std::numeric_limits<Int64>::min();
MergeTreePartInfo dummy_part_info(partition_id, min_block, min_block, 0);
auto it = data_parts.lower_bound(dummy_part_info);
if (it != data_parts.end() && (*it)->info.partition_id == partition_id)
return *it;
return {};
}
}

View File

@ -337,9 +337,6 @@ public:
/// clearOldParts (ignoring old_parts_lifetime).
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout);
/// Adds new part to the list of known parts and to the working set.
void attachPart(const DataPartPtr & part);
/// Renames the part to detached/<prefix>_<part> and forgets about it. The data won't be deleted in
/// clearOldParts.
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
@ -568,6 +565,9 @@ private:
/// Adds or subtracts the contribution of the part to compressed column sizes.
void addPartContributionToColumnSizes(const DataPartPtr & part);
void removePartContributionToColumnSizes(const DataPartPtr & part);
/// If there is no part in the partition with ID `partition_id`, returns empty ptr.
DataPartPtr getAnyPartInPartition(const String & partition_id, std::lock_guard<std::mutex> & data_parts_lock);
};
}

View File

@ -73,6 +73,14 @@ void MergeTreeDataMerger::FuturePart::assign(MergeTreeData::DataPartsVector part
if (parts_.empty())
return;
for (size_t i = 0; i < parts_.size(); ++i)
{
if (parts_[i]->partition.value != parts_[0]->partition.value)
throw Exception(
"Attempting to merge parts " + parts_[i]->name + " and " + parts_[0]->name + " that are in different partitions",
ErrorCodes::LOGICAL_ERROR);
}
parts = std::move(parts_);
UInt32 max_level = 0;

View File

@ -27,7 +27,7 @@ public:
MergeTreePartInfo part_info;
MergeTreeData::DataPartsVector parts;
const MergeTreeDataPart::Partition & getPartition() const { return parts.front()->partition; }
const MergeTreePartition & getPartition() const { return parts.front()->partition; }
FuturePart() = default;
explicit FuturePart(MergeTreeData::DataPartsVector parts_)

View File

@ -8,16 +8,12 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Core/Defines.h>
#include <Core/FieldVisitors.h>
#include <Common/SipHash.h>
#include <Common/escapeForFileName.h>
#include <Common/StringUtils.h>
#include <Common/hex.h>
#include <Common/typeid_cast.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeDate.h>
#include <Poco/File.h>
#include <Poco/Path.h>
@ -38,6 +34,7 @@ namespace ErrorCodes
extern const int NO_FILE_IN_DATA_PART;
extern const int EXPECTED_END_OF_FILE;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int CORRUPTED_DATA;
extern const int FORMAT_VERSION_TOO_OLD;
extern const int UNKNOWN_FORMAT;
extern const int UNEXPECTED_FILE_IN_DATA_PART;
@ -295,113 +292,6 @@ static ReadBufferFromFile openForReading(const String & path)
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
}
String MergeTreeDataPart::Partition::getID(const MergeTreeData & storage) const
{
if (value.size() != storage.partition_expr_columns.size())
throw Exception("Invalid partition key size: " + toString(value.size()), ErrorCodes::LOGICAL_ERROR);
if (value.empty())
return "all";
/// In case all partition fields are represented by integral types, try to produce a human-readable partition id.
/// Otherwise use a hex-encoded hash.
bool are_all_integral = true;
for (const Field & field : value)
{
if (field.getType() != Field::Types::UInt64 && field.getType() != Field::Types::Int64)
{
are_all_integral = false;
break;
}
}
String result;
if (are_all_integral)
{
FieldVisitorToString to_string_visitor;
for (size_t i = 0; i < value.size(); ++i)
{
if (i > 0)
result += '-';
if (typeid_cast<const DataTypeDate *>(storage.partition_expr_column_types[i].get()))
result += toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(value[i].get<UInt64>())));
else
result += applyVisitor(to_string_visitor, value[i]);
}
return result;
}
SipHash hash;
FieldVisitorHash hashing_visitor(hash);
for (const Field & field : value)
applyVisitor(hashing_visitor, field);
char hash_data[16];
hash.get128(hash_data);
result.resize(32);
for (size_t i = 0; i < 16; ++i)
writeHexByteLowercase(hash_data[i], &result[2 * i]);
return result;
}
void MergeTreeDataPart::Partition::serializeTextQuoted(const MergeTreeData & storage, WriteBuffer & out) const
{
size_t key_size = storage.partition_expr_column_types.size();
if (key_size == 0)
{
writeCString("tuple()", out);
return;
}
if (key_size > 1)
writeChar('(', out);
for (size_t i = 0; i < key_size; ++i)
{
if (i > 0)
writeCString(", ", out);
const DataTypePtr & type = storage.partition_expr_column_types[i];
ColumnPtr column = type->createColumn();
column->insert(value[i]);
type->serializeTextQuoted(*column, 0, out);
}
if (key_size > 1)
writeChar(')', out);
}
void MergeTreeDataPart::Partition::load(const MergeTreeData & storage, const String & part_path)
{
if (!storage.partition_expr)
return;
ReadBufferFromFile file = openForReading(part_path + "partition.dat");
value.resize(storage.partition_expr_column_types.size());
for (size_t i = 0; i < storage.partition_expr_column_types.size(); ++i)
storage.partition_expr_column_types[i]->deserializeBinary(value[i], file);
}
void MergeTreeDataPart::Partition::store(const MergeTreeData & storage, const String & part_path, Checksums & checksums) const
{
if (!storage.partition_expr)
return;
WriteBufferFromFile out(part_path + "partition.dat");
HashingWriteBuffer out_hashing(out);
for (size_t i = 0; i < value.size(); ++i)
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
}
void MergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & storage, const String & part_path)
{
size_t minmax_idx_size = storage.minmax_idx_column_types.size();
@ -817,7 +707,7 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
MergeTreePartInfo::parseMinMaxDatesFromPartName(name, min_date, max_date);
const auto & date_lut = DateLUT::instance();
partition = Partition(date_lut.toNumYYYYMM(min_date));
partition = MergeTreePartition(date_lut.toNumYYYYMM(min_date));
minmax_idx = MinMaxIndex(min_date, max_date);
}
else
@ -826,6 +716,13 @@ void MergeTreeDataPart::loadPartitionAndMinMaxIndex()
partition.load(storage, full_path);
minmax_idx.load(storage, full_path);
}
String calculated_partition_id = partition.getID(storage);
if (calculated_partition_id != info.partition_id)
throw Exception(
"While loading part " + getFullPath() + ": calculated partition ID: " + calculated_partition_id
+ " differs from partition ID in part name: " + info.partition_id,
ErrorCodes::CORRUPTED_DATA);
}
void MergeTreeDataPart::loadChecksums(bool require)

View File

@ -4,6 +4,7 @@
#include <Core/Block.h>
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Columns/IColumn.h>
#include <shared_mutex>
@ -149,29 +150,7 @@ struct MergeTreeDataPart
using Index = Columns;
Index index;
struct Partition
{
Row value;
public:
Partition() = default;
explicit Partition(Row value_) : value(std::move(value_)) {}
/// For month-based partitioning.
explicit Partition(UInt32 yyyymm) : value(1, static_cast<UInt64>(yyyymm)) {}
String getID(const MergeTreeData & storage) const;
void serializeTextQuoted(const MergeTreeData & storage, WriteBuffer & out) const;
void load(const MergeTreeData & storage, const String & part_path);
void store(const MergeTreeData & storage, const String & part_path, Checksums & checksums) const;
void assign(const Partition & other) { value.assign(other.value); }
};
Partition partition;
MergeTreePartition partition;
/// Index that for each part stores min and max values of a set of columns. This allows quickly excluding
/// parts based on conditions on these columns imposed by a query.

View File

@ -132,7 +132,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
MergeTreeDataPart::MinMaxIndex minmax_idx;
minmax_idx.update(block, data.minmax_idx_columns);
MergeTreeDataPart::Partition partition(std::move(block_with_partition.partition));
MergeTreePartition partition(std::move(block_with_partition.partition));
MergeTreePartInfo new_part_info(partition.getID(data), temp_index, temp_index, 0);
String part_name;

View File

@ -0,0 +1,128 @@
#include <Storages/MergeTree/MergeTreePartition.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/HashingWriteBuffer.h>
#include <Core/FieldVisitors.h>
#include <DataTypes/DataTypeDate.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <Common/hex.h>
#include <Poco/File.h>
namespace DB
{
static ReadBufferFromFile openForReading(const String & path)
{
return ReadBufferFromFile(path, std::min(static_cast<Poco::File::FileSize>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path).getSize()));
}
String MergeTreePartition::getID(const MergeTreeData & storage) const
{
if (value.size() != storage.partition_expr_columns.size())
throw Exception("Invalid partition key size: " + toString(value.size()), ErrorCodes::LOGICAL_ERROR);
if (value.empty())
return "all";
/// In case all partition fields are represented by integral types, try to produce a human-readable partition id.
/// Otherwise use a hex-encoded hash.
bool are_all_integral = true;
for (const Field & field : value)
{
if (field.getType() != Field::Types::UInt64 && field.getType() != Field::Types::Int64)
{
are_all_integral = false;
break;
}
}
String result;
if (are_all_integral)
{
FieldVisitorToString to_string_visitor;
for (size_t i = 0; i < value.size(); ++i)
{
if (i > 0)
result += '-';
if (typeid_cast<const DataTypeDate *>(storage.partition_expr_column_types[i].get()))
result += toString(DateLUT::instance().toNumYYYYMMDD(DayNum_t(value[i].get<UInt64>())));
else
result += applyVisitor(to_string_visitor, value[i]);
}
return result;
}
SipHash hash;
FieldVisitorHash hashing_visitor(hash);
for (const Field & field : value)
applyVisitor(hashing_visitor, field);
char hash_data[16];
hash.get128(hash_data);
result.resize(32);
for (size_t i = 0; i < 16; ++i)
writeHexByteLowercase(hash_data[i], &result[2 * i]);
return result;
}
void MergeTreePartition::serializeTextQuoted(const MergeTreeData & storage, WriteBuffer & out) const
{
size_t key_size = storage.partition_expr_column_types.size();
if (key_size == 0)
{
writeCString("tuple()", out);
return;
}
if (key_size > 1)
writeChar('(', out);
for (size_t i = 0; i < key_size; ++i)
{
if (i > 0)
writeCString(", ", out);
const DataTypePtr & type = storage.partition_expr_column_types[i];
ColumnPtr column = type->createColumn();
column->insert(value[i]);
type->serializeTextQuoted(*column, 0, out);
}
if (key_size > 1)
writeChar(')', out);
}
void MergeTreePartition::load(const MergeTreeData & storage, const String & part_path)
{
if (!storage.partition_expr)
return;
ReadBufferFromFile file = openForReading(part_path + "partition.dat");
value.resize(storage.partition_expr_column_types.size());
for (size_t i = 0; i < storage.partition_expr_column_types.size(); ++i)
storage.partition_expr_column_types[i]->deserializeBinary(value[i], file);
}
void MergeTreePartition::store(const MergeTreeData & storage, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
if (!storage.partition_expr)
return;
WriteBufferFromFile out(part_path + "partition.dat");
HashingWriteBuffer out_hashing(out);
for (size_t i = 0; i < value.size(); ++i)
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Core/Types.h>
#include <Core/Row.h>
#include <IO/WriteBuffer.h>
namespace DB
{
class MergeTreeData;
struct MergeTreeDataPartChecksums;
struct MergeTreePartition
{
Row value;
public:
MergeTreePartition() = default;
explicit MergeTreePartition(Row value_) : value(std::move(value_)) {}
/// For month-based partitioning.
explicit MergeTreePartition(UInt32 yyyymm) : value(1, static_cast<UInt64>(yyyymm)) {}
String getID(const MergeTreeData & storage) const;
void serializeTextQuoted(const MergeTreeData & storage, WriteBuffer & out) const;
void load(const MergeTreeData & storage, const String & part_path);
void store(const MergeTreeData & storage, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
void assign(const MergeTreePartition & other) { value.assign(other.value); }
};
}

View File

@ -322,7 +322,8 @@ void StorageDistributed::reshardPartitions(
ASTAlterQuery::Parameters & parameters = alter_query.parameters.back();
parameters.type = ASTAlterQuery::RESHARD_PARTITION;
parameters.partition = partition->clone();
if (partition)
parameters.partition = partition->clone();
ASTPtr expr_list = std::make_shared<ASTExpressionList>();
for (const auto & entry : weighted_zookeeper_paths)