Use JSON metadata in WAL

This commit is contained in:
Nicolae Vartolomei 2020-11-02 22:36:32 +00:00
parent 040aba9f85
commit 7c8bc1c04e
5 changed files with 79 additions and 19 deletions

View File

@ -5,7 +5,10 @@
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Poco/File.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <Poco/JSON/Parser.h>
#include <sys/time.h>
namespace DB
@ -250,6 +253,29 @@ MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
return std::make_pair(min_block, max_block);
}
String MergeTreeWriteAheadLog::ActionMetadata::toJSON() const
{
Poco::JSON::Object json;
if (part_uuid != UUIDHelpers::Nil)
json.set(JSON_KEY_PART_UUID, toString(part_uuid));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
json.stringify(oss);
return oss.str();
}
void MergeTreeWriteAheadLog::ActionMetadata::fromJSON(const String & buf)
{
Poco::JSON::Parser parser;
auto json = parser.parse(buf).extract<Poco::JSON::Object::Ptr>();
if (json->has(JSON_KEY_PART_UUID))
part_uuid = parseFromString<UUID>(json->getValue<std::string>(JSON_KEY_PART_UUID));
}
void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
{
readIntBinary(min_compatible_version, meta_in);
@ -260,29 +286,23 @@ void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
size_t metadata_size;
readVarUInt(metadata_size, meta_in);
UInt32 metadata_start = meta_in.offset();
if (metadata_size == 0)
return;
if (meta_in.hasPendingData())
readUUIDText(part_uuid, meta_in);
String buf(metadata_size, ' ');
meta_in.readStrict(buf.data(), metadata_size);
/// Skip extra fields if any. If min_compatible_version is lower than WAL_VERSION it means
/// that the fields are not critical for the correctness.
meta_in.ignore(metadata_size - (meta_in.offset() - metadata_start));
fromJSON(buf);
}
void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const
{
writeIntBinary(min_compatible_version, meta_out);
/// Write metadata to a temporary buffer first to compute the size.
MemoryWriteBuffer buf{};
String ser_meta = toJSON();
writeUUIDText(part_uuid, buf);
buf.finalize();
auto read_buf = buf.tryGetReadBuffer();
writeVarUInt(static_cast<UInt32>(read_buf->available()), meta_out);
copyData(*read_buf, meta_out);
writeVarUInt(static_cast<UInt32>(ser_meta.length()), meta_out);
writeString(ser_meta, meta_out);
}
}

View File

@ -42,6 +42,12 @@ public:
void write(WriteBuffer & meta_out) const;
void read(ReadBuffer & meta_in);
private:
static constexpr auto JSON_KEY_PART_UUID = "part_uuid";
String toJSON() const;
void fromJSON(const String & buf);
};
constexpr static UInt8 WAL_VERSION = 1;

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<min_rows_for_compact_part>1000</min_rows_for_compact_part>
<min_rows_for_wide_part>1000</min_rows_for_wide_part>
</merge_tree>
</yandex>

View File

@ -1,6 +1,5 @@
<yandex>
<merge_tree>
<assign_part_uuids>1</assign_part_uuids>
<randomize_part_type>1</randomize_part_type>
</merge_tree>
</yandex>

View File

@ -8,12 +8,12 @@ cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
'node1',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
main_configs=['configs/remote_servers.xml', 'configs/merge_tree_uuids.xml'],
with_zookeeper=True)
node2 = cluster.add_instance(
'node2',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
main_configs=['configs/remote_servers.xml', 'configs/merge_tree_uuids.xml', 'configs/merge_tree_in_memory.xml'],
with_zookeeper=True)
@ -77,3 +77,32 @@ def test_part_uuid(started_cluster):
uuids.add(part_merge_uuid)
assert part_mutate_uuid not in [uuid_zero, part_merge_uuid]
assert len(uuids) == 1, "expect the same uuid on all the replicas"
def test_part_uuid_wal(started_cluster):
uuid_zero = uuid.UUID(bytes=b"\x00" * 16)
for ix, n in enumerate([node1, node2]):
n.query("""
CREATE TABLE t_wal(key UInt64, value UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/tables/t_wal', '{}')
ORDER BY tuple()
""".format(ix))
node2.query("INSERT INTO t_wal VALUES (1, 1)")
uuids = set()
for node in [node1, node2]:
node.query("SYSTEM SYNC REPLICA t")
part_initial_uuid = uuid.UUID(node.query("SELECT uuid FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip())
assert "InMemory" == node.query("SELECT part_type FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip()
uuids.add(part_initial_uuid)
assert uuid_zero != part_initial_uuid
assert len(uuids) == 1, "expect the same uuid on all the replicas"
# Test detach / attach table to trigger WAL processing.
for node in [node1, node2]:
node.query("DETACH TABLE t_wal; ATTACH TABLE t_wal")
part_reattach_uuid = uuid.UUID(node.query(
"SELECT uuid FROM system.parts WHERE table = 't_wal' AND active ORDER BY name").strip())
assert part_initial_uuid == part_reattach_uuid