Added passing ttl info between replicas.

This commit is contained in:
Vladimir Chebotarev 2020-01-30 13:21:40 +03:00
parent 4363ce6976
commit 4b2eff8551
2 changed files with 61 additions and 8 deletions

View File

@ -36,6 +36,8 @@ namespace
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = "2";
std::string getEndpointId(const std::string & node_id)
{
@ -53,10 +55,11 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
{
String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
String part_name = params.get("part");
if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS
&& client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE
&& client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL);
const auto data_settings = data.getSettings();
@ -75,7 +78,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
response.setChunkedTransferEncoding(false);
return;
}
response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE});
response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS});
++total_sends;
SCOPE_EXIT({--total_sends;});
@ -103,10 +106,16 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPart::Checksums data_checksums;
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
writeBinary(checksums.getTotalSizeOnDisk(), out);
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
WriteBufferFromOwnString ttl_infos_buffer;
part->ttl_infos.write(ttl_infos_buffer);
writeBinary(ttl_infos_buffer.str(), out);
}
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
@ -192,7 +201,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE},
{"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS},
{"compress", "false"}
});
@ -218,11 +227,22 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
ReservationPtr reservation;
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
size_t sum_files_size;
readBinary(sum_files_size, in);
reservation = data.reserveSpace(sum_files_size);
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
MergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;
readBinary(ttl_infos_string, in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr));
}
else
reservation = data.reserveSpace(sum_files_size);
}
else
{

View File

@ -332,6 +332,39 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine):
node1.query("DROP TABLE IF EXISTS {}".format(name))
def test_replicated_download_ttl_info(started_cluster):
name = "test_replicated_ttl_info"
engine = "ReplicatedMergeTree('/clickhouse/test_replicated_download_ttl_info', '{replica}')"
try:
for i, node in enumerate((node1, node2), start=1):
node.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
node1.query("SYSTEM STOP MOVES {}".format(name))
node2.query("INSERT INTO {} (s1, d1) VALUES ('{}', toDateTime({}))".format(name, get_random_string(1024 * 1024), time.time()-100))
assert set(get_used_disks_for_table(node2, name)) == {"external"}
time.sleep(1)
assert node1.query("SELECT count() FROM {}".format(name)).splitlines() == ["1"]
assert set(get_used_disks_for_table(node1, name)) == {"external"}
finally:
for node in (node1, node2):
try:
node.query("DROP TABLE IF EXISTS {}".format(name))
except:
continue
@pytest.mark.skip(reason="Flappy test")
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_merges_to_disk_do_not_work","MergeTree()",0),