All new parts must have uuids

This commit is contained in:
Nicolae Vartolomei 2020-10-29 16:18:25 +00:00
parent 425dc4b11b
commit 746f8e45f5
15 changed files with 119 additions and 31 deletions

View File

@ -139,6 +139,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
writeStringBinary(part->getType().toString(), out);
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
writeUUIDText(part->uuid, out);
if (isInMemoryPart(part))
sendPartFromMemory(part, out);
else
@ -322,6 +325,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
readStringBinary(part_type, in);
UUID part_uuid;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
auto storage_id = data.getStorageID();
String new_part_path = part_type == "InMemory" ? "memory" : data.getFullPathOnDisk(reservation->getDisk()) + part_name + "/";
auto entry = data.global_context.getReplicatedFetchList().insert(
@ -331,12 +338,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
return part_type == "InMemory" ? downloadPartToMemory(part_name, metadata_snapshot, std::move(reservation), in)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, std::move(reservation), in);
return part_type == "InMemory" ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, std::move(reservation), in)
: downloadPartToDisk(part_name, part_uuid, replica_path, to_detached, tmp_prefix_, sync, std::move(reservation), in);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in)
@ -352,6 +361,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, volume);
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx.update(block, data.minmax_idx_columns);
@ -368,6 +378,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const String & part_name,
const UUID & part_uuid,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
@ -446,6 +457,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -71,6 +71,7 @@ public:
private:
MergeTreeData::MutableDataPartPtr downloadPartToDisk(
const String & part_name,
const UUID & part_uuid,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
@ -80,6 +81,7 @@ private:
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
const String & part_name,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in);

View File

@ -2003,7 +2003,7 @@ bool MergeTreeData::renameTempPartAndReplace(
if (part_in_memory && getSettings()->in_memory_parts_enable_wal)
{
auto wal = getWriteAheadLog();
wal->addPart(part_in_memory->block, part_in_memory->name);
wal->addPart(part_in_memory);
}
if (out_covered_parts)

View File

@ -681,6 +681,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
single_disk_volume,
TMP_PREFIX + future_part.name);
new_data_part->uuid = future_part.uuid;
new_data_part->setColumns(storage_columns);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->is_temp = true;
@ -1137,8 +1138,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto new_data_part = data.createPart(
future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name);
new_data_part->uuid = future_part.uuid;
new_data_part->is_temp = true;
new_data_part->uuid = source_part->uuid;
new_data_part->ttl_infos = source_part->ttl_infos;
/// It shouldn't be changed by mutation.
@ -1820,8 +1821,10 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
{
auto disk = new_data_part->volume->getDisk();
if (new_data_part->uuid != UUIDHelpers::Nil)
{
if (new_data_part->uuid == UUIDHelpers::Nil)
throw Exception("Empty IMergeTreeDataPart#uuid in finalize for part: " + new_data_part->name, ErrorCodes::LOGICAL_ERROR);
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
writeUUIDText(new_data_part->uuid, *out);
}

View File

@ -19,6 +19,7 @@ class MergeProgressCallback;
struct FutureMergedMutatedPart
{
String name;
UUID uuid;
String path;
MergeTreeDataPartType type;
MergeTreePartInfo part_info;

View File

@ -74,6 +74,7 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
auto new_type = storage.choosePartTypeOnDisk(block.bytes(), rows_count);
auto new_data_part = storage.createPart(name, new_type, info, volume, new_relative_path);
new_data_part->uuid = uuid;
new_data_part->setColumns(columns);
new_data_part->partition.value.assign(partition.value);
new_data_part->minmax_idx = minmax_idx;

View File

@ -247,6 +247,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + part_name);
new_data_part->uuid = UUIDHelpers::generateV4();
new_data_part->setColumns(columns);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);

View File

@ -2,7 +2,9 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Poco/File.h>
#include <sys/time.h>
@ -56,18 +58,23 @@ void MergeTreeWriteAheadLog::init()
bytes_at_last_sync = 0;
}
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
void MergeTreeWriteAheadLog::addPart(DataPartInMemoryPtr & part)
{
std::unique_lock lock(write_mutex);
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
auto part_info = MergeTreePartInfo::fromPartName(part->name, storage.format_version);
min_block_number = std::min(min_block_number, part_info.min_block);
max_block_number = std::max(max_block_number, part_info.max_block);
writeIntBinary(WAL_VERSION, *out);
ActionMetadata metadata{};
metadata.part_uuid = part->uuid;
metadata.write(*out);
writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
writeStringBinary(part_name, *out);
block_out->write(block);
writeStringBinary(part->name, *out);
block_out->write(part->block);
block_out->flush();
sync(lock);
@ -81,6 +88,10 @@ void MergeTreeWriteAheadLog::dropPart(const String & part_name)
std::unique_lock lock(write_mutex);
writeIntBinary(WAL_VERSION, *out);
ActionMetadata metadata{};
metadata.write(*out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
out->next();
@ -136,6 +147,17 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
auto part_disk = storage.reserveSpace(0)->getDisk();
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
/// Likely part written by older ClickHouse version which didn't support UUIDs.
if (metadata.part_uuid == UUIDHelpers::Nil)
{
/// Defensive check. Since WAL version 1 we expect all parts to have UUID.
if (version > 0)
throw Exception("Unexpected empty part_uuid in entry version: " + toString(version), ErrorCodes::CORRUPTED_DATA);
metadata.part_uuid = UUIDHelpers::generateV4();
}
/// TODO(nv) Create part should check for empty UUIDs and crash.
part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
@ -143,6 +165,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
single_disk_volume,
part_name);
part->uuid = metadata.part_uuid;
block = block_in.read();
}
else
@ -249,8 +273,8 @@ void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
UInt32 metadata_start = meta_in.offset();
/// For the future: read metadata here.
if (meta_in.hasPendingData())
readUUIDText(part_uuid, meta_in);
/// Skip extra fields if any. If min_compatible_version is lower than WAL_VERSION it means
/// that the fields are not critical for the correctness.
@ -260,6 +284,16 @@ void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const
{
writeIntBinary(min_compatible_version, meta_out);
writeVarUInt(static_cast<UInt32>(0), meta_out);
/// Write metadata to a temporary buffer first to compute the size.
MemoryWriteBuffer buf{};
writeUUIDText(part_uuid, buf);
buf.finalize();
auto read_buf = buf.tryGetReadBuffer();
writeVarUInt(static_cast<UInt32>(read_buf->available()), meta_out);
copyData(*read_buf, meta_out);
}
}

View File

@ -2,9 +2,10 @@
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/BackgroundSchedulePool.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
namespace DB
{
@ -36,11 +37,14 @@ public:
/// The same approach can be used recursively inside metadata.
UInt8 min_compatible_version = 0;
/// Actual metadata.
UUID part_uuid = UUIDHelpers::Nil;
void write(WriteBuffer & meta_out) const;
void read(ReadBuffer & meta_in);
};
constexpr static UInt8 WAL_VERSION = 0;
constexpr static UInt8 WAL_VERSION = 1;
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE_NAME = "wal.bin";
@ -50,7 +54,7 @@ public:
~MergeTreeWriteAheadLog();
void addPart(const Block & block, const String & part_name);
void addPart(DataPartInMemoryPtr & part);
void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore(const StorageMetadataPtr & metadata_snapshot);

View File

@ -133,8 +133,10 @@ void MergedBlockOutputStream::finalizePartOnDisk(
MergeTreeData::DataPart::Checksums & checksums,
bool sync)
{
if (new_part->uuid != UUIDHelpers::Nil)
{
if (new_part->uuid == UUIDHelpers::Nil)
throw Exception("Empty IMergeTreeDataPart#uuid in finalize for part: " + new_part->name, ErrorCodes::LOGICAL_ERROR);
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
writeUUIDText(new_part->uuid, *out);
out->finalize();

View File

@ -264,6 +264,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
log_entry.create_time = time(nullptr);
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
/// TODO maybe add UUID here as well?
log_entry.quorum = quorum;
log_entry.block_id = block_id;
log_entry.new_part_type = part->getType();

View File

@ -31,13 +31,18 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
break;
case MERGE_PARTS:
assert(new_part_uuid != UUIDHelpers::Nil);
out << "merge\n";
for (const String & s : source_parts)
out << s << '\n';
out << "into\n" << new_part_name;
out << "\ndeduplicate: " << deduplicate;
if (merge_type != MergeType::REGULAR)
out <<"\nmerge_type: " << static_cast<UInt64>(merge_type);
out << "\ninto_uuid: " << new_part_uuid;
break;
case DROP_RANGE:
@ -70,10 +75,14 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
break;
case MUTATE_PART:
assert(new_part_uuid != UUIDHelpers::Nil);
out << "mutate\n"
<< source_parts.at(0) << "\n"
<< "to\n"
<< new_part_name;
<< new_part_name
<< "\nto_uuid\n"
<< new_part_uuid;
if (isAlterMutation())
out << "\nalter_version\n" << alter_version;
@ -156,15 +165,21 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
in >> "\ndeduplicate: " >> deduplicate;
/// Trying to be more backward compatible
in >> "\n";
if (checkString("merge_type: ", in))
while (!trailing_newline_found)
{
UInt64 value;
in >> value;
merge_type = checkAndGetMergeType(value);
in >> "\n";
if (checkString("merge_type: ", in))
{
UInt64 value;
in >> value;
merge_type = checkAndGetMergeType(value);
}
else if (checkString("into_uuid: ", in))
in >> new_part_uuid;
else
trailing_newline_found = true;
}
else
trailing_newline_found = true;
}
}
else if (type_str == "drop" || type_str == "detach")
@ -198,12 +213,17 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
>> new_part_name;
source_parts.push_back(source_part);
in >> "\n";
while (!trailing_newline_found)
{
in >> "\n";
if (in.eof())
trailing_newline_found = true;
else if (checkString("alter_version\n", in))
in >> alter_version;
if (checkString("alter_version\n", in))
in >> alter_version;
else if (checkString("to_uuid\n", in))
in >> new_part_uuid;
else
trailing_newline_found = true;
}
}
else if (type_str == "alter")
{

View File

@ -77,6 +77,7 @@ struct ReplicatedMergeTreeLogEntryData
MergeTreeDataPartType new_part_type;
String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/).
mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'.
UUID new_part_uuid;
Strings source_parts;
bool deduplicate = false; /// Do deduplicate on merge

View File

@ -641,6 +641,7 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
auto data_settings = getSettings();
FutureMergedMutatedPart future_part;
future_part.uuid = UUIDHelpers::generateV4();
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
CurrentlyMergingPartsTaggerPtr merging_tagger;
@ -862,7 +863,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
future_part.uuid = UUIDHelpers::generateV4();
future_part.type = part->getType();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return std::make_shared<MergeMutateSelectedEntry>(future_part, std::move(tagger), commands);

View File

@ -1443,6 +1443,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
throw Exception("Future merged part name " + backQuote(future_merged_part.name) + " differs from part name in log entry: "
+ backQuote(entry.new_part_name), ErrorCodes::BAD_DATA_PART_NAME);
}
future_merged_part.uuid = entry.new_part_uuid;
future_merged_part.updatePath(*this, reserved_space);
future_merged_part.merge_type = entry.merge_type;
@ -1567,9 +1568,10 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
Transaction transaction(*this);
FutureMergedMutatedPart future_mutated_part;
future_mutated_part.name = entry.new_part_name;
future_mutated_part.uuid = entry.new_part_uuid;
future_mutated_part.parts.push_back(source_part);
future_mutated_part.part_info = new_part_info;
future_mutated_part.name = entry.new_part_name;
future_mutated_part.updatePath(*this, reserved_space);
future_mutated_part.type = source_part->getType();
@ -2824,6 +2826,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
entry.new_part_uuid = UUIDHelpers::generateV4();
entry.new_part_type = merged_part_type;
entry.merge_type = merge_type;
entry.deduplicate = deduplicate;
@ -2899,6 +2902,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.source_replica = replica_name;
entry.source_parts.push_back(part.name);
entry.new_part_name = new_part_name;
entry.new_part_uuid = UUIDHelpers::generateV4();
entry.create_time = time(nullptr);
entry.alter_version = alter_version;