Add unique identifiers IMergeTreeDataPart structure

For now uuids are not generated at all, they are present only if the
part is updated manually (as you can see in the integration test).

The only place where they can be seen today by an end user is in
`system.parts` table. I was looking for hiding this column behind an
option but couldn't find an easy way to do that.

Likely this is also required for WAL, but need to think how not to break
compatibility.

Relates to #13574, https://github.com/ClickHouse/ClickHouse/issues/13574

Next 1: In the upcoming PR the plan is to integrate de-duplication based on
these fingerprints in the query pipeline.

Next 2: We'll enable automatic generation of uuids and come up with a
way for conditionally sending uuids when processing distributed queries
only when part movement is in progress.
This commit is contained in:
Nicolae Vartolomei 2020-10-15 17:17:16 +01:00
parent 0d78002a22
commit 425dc4b11b
10 changed files with 134 additions and 8 deletions

View File

@ -45,6 +45,7 @@ constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
std::string getEndpointId(const std::string & node_id)
@ -109,7 +110,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
}
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION))});
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID))});
++total_sends;
SCOPE_EXIT({--total_sends;});
@ -142,8 +143,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
sendPartFromMemory(part, out);
else
{
bool send_default_compression_file = client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION;
sendPartFromDisk(part, out, send_default_compression_file);
sendPartFromDisk(part, out, client_protocol_version);
}
}
catch (const NetException &)
@ -176,7 +176,7 @@ void Service::sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteB
block_out.write(part_in_memory->block);
}
void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_default_compression_file)
void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version)
{
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
@ -184,8 +184,12 @@ void Service::sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuf
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
for (const auto & file_name : file_names_without_checksums)
{
if (!send_default_compression_file && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
continue;
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID && file_name == IMergeTreeDataPart::UUID_FILE_NAME)
continue;
checksums.files[file_name] = {};
}
@ -263,7 +267,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION)},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)},
{"compress", "false"}
});
@ -430,7 +434,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&
file_name != IMergeTreeDataPart::UUID_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
if (sync)

View File

@ -32,7 +32,7 @@ public:
private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out);
void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_default_compression_file);
void sendPartFromDisk(const MergeTreeData::DataPartPtr & part, WriteBuffer & out, int client_protocol_version);
private:
/// StorageReplicatedMergeTree::shutdown() waits for all parts exchange handlers to finish,

View File

@ -410,6 +410,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Motivation: memory for index is shared between queries - not belong to the query itself.
MemoryTracker::BlockerInThread temporarily_disable_memory_tracker;
loadUUID();
loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
loadIndexGranularity();
@ -482,9 +483,14 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
NameSet result = {"checksums.txt", "columns.txt"};
String default_codec_path = getFullRelativePath() + DEFAULT_COMPRESSION_CODEC_FILE_NAME;
if (volume->getDisk()->exists(default_codec_path))
result.emplace(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
String uuid_path = getFullRelativePath() + UUID_FILE_NAME;
if (volume->getDisk()->exists(uuid_path))
result.emplace(UUID_FILE_NAME);
return result;
}
@ -726,6 +732,19 @@ void IMergeTreeDataPart::loadTTLInfos()
}
}
void IMergeTreeDataPart::loadUUID()
{
String path = getFullRelativePath() + UUID_FILE_NAME;
if (volume->getDisk()->exists(path))
{
auto in = openForReading(volume->getDisk(), path);
readText(uuid, *in);
if (uuid == UUIDHelpers::Nil)
throw Exception("Unexpected empty " + String(UUID_FILE_NAME) + " in part: " + name, ErrorCodes::LOGICAL_ERROR);
}
}
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = getFullRelativePath() + "columns.txt";
@ -894,6 +913,7 @@ void IMergeTreeDataPart::remove() const
for (const auto & file : {"checksums.txt", "columns.txt"})
volume->getDisk()->remove(to + "/" + file);
volume->getDisk()->removeIfExists(to + "/" + UUID_FILE_NAME);
volume->getDisk()->removeIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME);
volume->getDisk()->removeIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME);

View File

@ -164,6 +164,11 @@ public:
String name;
MergeTreePartInfo info;
/// Part unique identifier.
/// The intention is to use it for identifying cases where the same part is
/// processed by multiple shards.
UUID uuid;
VolumePtr volume;
/// A directory path (relative to storage's path) where part data is actually stored
@ -348,6 +353,8 @@ public:
static inline constexpr auto DELETE_ON_DESTROY_MARKER_FILE_NAME = "delete-on-destroy.txt";
static inline constexpr auto UUID_FILE_NAME = "uuid.txt";
/// Checks that all TTLs (table min/max, column ttls, so on) for part
/// calculated. Part without calculated TTL may exist if TTL was added after
/// part creation (using alter query with materialize_ttl setting).
@ -384,6 +391,9 @@ private:
/// In compact parts order of columns is necessary
NameToPosition column_name_to_position;
/// Reads part unique identifier (if exists) from uuid.txt
void loadUUID();
/// Reads columns names and types from columns.txt
void loadColumns(bool require);

View File

@ -1138,6 +1138,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
future_part.name, future_part.type, future_part.part_info, single_disk_volume, "tmp_mut_" + future_part.name);
new_data_part->is_temp = true;
new_data_part->uuid = source_part->uuid;
new_data_part->ttl_infos = source_part->ttl_infos;
/// It shouldn't be changed by mutation.
@ -1818,6 +1819,13 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart(
const CompressionCodecPtr & codec)
{
auto disk = new_data_part->volume->getDisk();
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = disk->writeFile(new_data_part->getFullRelativePath() + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
writeUUIDText(new_data_part->uuid, *out);
}
if (need_remove_expired_values)
{
/// Write a file with ttl infos in json format.

View File

@ -133,6 +133,15 @@ void MergedBlockOutputStream::finalizePartOnDisk(
MergeTreeData::DataPart::Checksums & checksums,
bool sync)
{
if (new_part->uuid != UUIDHelpers::Nil)
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::UUID_FILE_NAME, 4096);
writeUUIDText(new_part->uuid, *out);
out->finalize();
if (sync)
out->sync();
}
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{
new_part->partition.store(storage, volume->getDisk(), part_path, checksums);

View File

@ -7,6 +7,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeUUID.h>
#include <Storages/VirtualColumnUtils.h>
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
@ -20,6 +21,7 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{
{"partition", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeString>()},
{"uuid", std::make_shared<DataTypeUUID>()},
{"part_type", std::make_shared<DataTypeString>()},
{"active", std::make_shared<DataTypeUInt8>()},
{"marks", std::make_shared<DataTypeUInt64>()},
@ -93,6 +95,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns_, const Sto
columns_[i++]->insert(out.str());
}
columns_[i++]->insert(part->name);
columns_[i++]->insert(part->uuid);
columns_[i++]->insert(part->getTypeName());
columns_[i++]->insert(part_state == State::Committed);
columns_[i++]->insert(part->getMarksCount());

View File

@ -0,0 +1,18 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,53 @@
import time
import uuid
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1',
main_configs=['configs/remote_servers.xml'],
with_zookeeper=True)
node2 = cluster.add_instance(
'node2',
main_configs=['configs/remote_servers.xml'],
with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_part_uuid(started_cluster):
test_uuid = str(uuid.uuid4())
for ix, n in enumerate([node1, node2]):
n.query("""
CREATE TABLE t(key UInt64, value UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}')
ORDER BY tuple()
""".format(ix))
node1.query("INSERT INTO t VALUES (1, 1)")
node1.query("ALTER TABLE t DETACH PARTITION tuple()")
node1.exec_in_container([
"bash", "-c",
"echo '{}' > /var/lib/clickhouse/data/default/t/detached/{}/uuid.txt".format(test_uuid, "all_0_0_0")
])
node1.query("ALTER TABLE t ATTACH PARTITION tuple()")
node1.query("ALTER TABLE t UPDATE value = 1 WHERE key = 1")
assert test_uuid == node1.query("SELECT uuid FROM system.parts WHERE table = 't' AND active ORDER BY name").strip()
node2.query("SYSTEM SYNC REPLICA t")
assert test_uuid == node2.query("SELECT uuid FROM system.parts WHERE table = 't' AND active ORDER BY name").strip()