diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 778505d93ef..f36275bfd35 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -5,7 +5,10 @@ #include #include #include -#include +#include +#include +#include +#include #include namespace DB @@ -250,6 +253,29 @@ MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename) return std::make_pair(min_block, max_block); } +String MergeTreeWriteAheadLog::ActionMetadata::toJSON() const +{ + Poco::JSON::Object json; + + if (part_uuid != UUIDHelpers::Nil) + json.set(JSON_KEY_PART_UUID, toString(part_uuid)); + + std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM + oss.exceptions(std::ios::failbit); + json.stringify(oss); + + return oss.str(); +} + +void MergeTreeWriteAheadLog::ActionMetadata::fromJSON(const String & buf) +{ + Poco::JSON::Parser parser; + auto json = parser.parse(buf).extract(); + + if (json->has(JSON_KEY_PART_UUID)) + part_uuid = parseFromString(json->getValue(JSON_KEY_PART_UUID)); +} + void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in) { readIntBinary(min_compatible_version, meta_in); @@ -260,29 +286,23 @@ void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in) size_t metadata_size; readVarUInt(metadata_size, meta_in); - UInt32 metadata_start = meta_in.offset(); + if (metadata_size == 0) + return; - if (meta_in.hasPendingData()) - readUUIDText(part_uuid, meta_in); + String buf(metadata_size, ' '); + meta_in.readStrict(buf.data(), metadata_size); - /// Skip extra fields if any. If min_compatible_version is lower than WAL_VERSION it means - /// that the fields are not critical for the correctness. - meta_in.ignore(metadata_size - (meta_in.offset() - metadata_start)); + fromJSON(buf); } void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const { writeIntBinary(min_compatible_version, meta_out); - /// Write metadata to a temporary buffer first to compute the size. - MemoryWriteBuffer buf{}; + String ser_meta = toJSON(); - writeUUIDText(part_uuid, buf); - - buf.finalize(); - auto read_buf = buf.tryGetReadBuffer(); - - writeVarUInt(static_cast(read_buf->available()), meta_out); - copyData(*read_buf, meta_out); + writeVarUInt(static_cast(ser_meta.length()), meta_out); + writeString(ser_meta, meta_out); } + } diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h index 92330e4dcb4..3b8dc5befc0 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.h +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.h @@ -42,6 +42,12 @@ public: void write(WriteBuffer & meta_out) const; void read(ReadBuffer & meta_in); + + private: + static constexpr auto JSON_KEY_PART_UUID = "part_uuid"; + + String toJSON() const; + void fromJSON(const String & buf); }; constexpr static UInt8 WAL_VERSION = 1; diff --git a/tests/integration/test_part_uuid/configs/merge_tree_in_memory.xml b/tests/integration/test_part_uuid/configs/merge_tree_in_memory.xml new file mode 100644 index 00000000000..83d36f33795 --- /dev/null +++ b/tests/integration/test_part_uuid/configs/merge_tree_in_memory.xml @@ -0,0 +1,6 @@ + + + 1000 + 1000 + + diff --git a/tests/integration/test_part_uuid/configs/merge_tree.xml b/tests/integration/test_part_uuid/configs/merge_tree_uuids.xml similarity index 66% rename from tests/integration/test_part_uuid/configs/merge_tree.xml rename to tests/integration/test_part_uuid/configs/merge_tree_uuids.xml index c45c2822108..8369c916848 100644 --- a/tests/integration/test_part_uuid/configs/merge_tree.xml +++ b/tests/integration/test_part_uuid/configs/merge_tree_uuids.xml @@ -1,6 +1,5 @@ 1 - 1 diff --git a/tests/integration/test_part_uuid/test.py b/tests/integration/test_part_uuid/test.py index 3229ee7fba2..1dfeb17b9b8 100644 --- a/tests/integration/test_part_uuid/test.py +++ b/tests/integration/test_part_uuid/test.py @@ -8,12 +8,12 @@ cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( 'node1', - main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + main_configs=['configs/remote_servers.xml', 'configs/merge_tree_uuids.xml'], with_zookeeper=True) node2 = cluster.add_instance( 'node2', - main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'], + main_configs=['configs/remote_servers.xml', 'configs/merge_tree_uuids.xml', 'configs/merge_tree_in_memory.xml'], with_zookeeper=True) @@ -77,3 +77,32 @@ def test_part_uuid(started_cluster): uuids.add(part_merge_uuid) assert part_mutate_uuid not in [uuid_zero, part_merge_uuid] assert len(uuids) == 1, "expect the same uuid on all the replicas" + + +def test_part_uuid_wal(started_cluster): + uuid_zero = uuid.UUID(bytes=b"\x00" * 16) + + for ix, n in enumerate([node1, node2]): + n.query(""" + CREATE TABLE t_wal(key UInt64, value UInt64) + ENGINE ReplicatedMergeTree('/clickhouse/tables/t_wal', '{}') + ORDER BY tuple() + """.format(ix)) + + node2.query("INSERT INTO t_wal VALUES (1, 1)") + + uuids = set() + for node in [node1, node2]: + node.query("SYSTEM SYNC REPLICA t") + part_initial_uuid = uuid.UUID(node.query("SELECT uuid FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip()) + assert "InMemory" == node.query("SELECT part_type FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip() + uuids.add(part_initial_uuid) + assert uuid_zero != part_initial_uuid + assert len(uuids) == 1, "expect the same uuid on all the replicas" + + # Test detach / attach table to trigger WAL processing. + for node in [node1, node2]: + node.query("DETACH TABLE t_wal; ATTACH TABLE t_wal") + part_reattach_uuid = uuid.UUID(node.query( + "SELECT uuid FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip()) + assert part_initial_uuid == part_reattach_uuid