Zero copy replication over S3: fetch instead of merge

This commit is contained in:
Anton Ivashkin 2020-10-15 18:23:20 +03:00
parent b877459cf7
commit 14a78f87b0
3 changed files with 38 additions and 13 deletions

View File

@ -2,8 +2,6 @@
Говнокод просто для теста, не production-ready ни разу.
[Коммит](https://github.com/ianton-ru/ClickHouse/commit/acf86568a7e21176ba2cca15861da231bec6932a)
[Ветка](https://github.com/ianton-ru/ClickHouse/tree/s3_zero_copy_replication)
## Как сделано
@ -14,21 +12,24 @@
Введена новая версия протокола REPLICATION_PROTOCOL_VERSION_WITH_PARTS_S3_COPY. В запросе новый параметр send_s3_metadata, если 1, то приемних просит у источника метаданные вместо данных, если это возможно.
Приемник в ответ отсылает куку send_s3_metadata=1 в случае, если идут метаданные. В остальных случаях отсылаются данные, как и прежде.
Применик перед запросом смотрит, будет ли хранить данные в S3. Провеока сейчас кривая - запрашивается резервирование на диске с наибольшим доступным местом, а потом смотрится, не на S3 ли оно.
Если на S3, то отсылает в запросе send_s3_metadata=1.
Применик перед запросом смотрит, будет ли хранить данные в S3. Проверка сейчас кривая - если в сторадже есть S3, то считаем, что будет S3.
Если да S3, то отсылает в запросе send_s3_metadata=1.
Источник при получении такого запроса смотрит, лежит ли парт на S3. Если да, то в Зукипере ставит метку по пути `<путь к данным таблицы>/zero_copy_s3/<некий ID парта>/<ID реплики>`,
Источник при получении такого запроса смотрит, лежит ли парт на S3. Если да, то в Зукипере ставит метку по пути `<путь к данным таблицы>/zero_copy_s3/shared/<некий ID парта>/<ID реплики>`,
ставит в ответ куку send_s3_metadata=1 и вместо файлов с данными отсылает только файлы метаданных.
Приемник при получении ответа с send_s3_metadata=1 создает только файлики с идентичными меаданными, которые в итоге будут ссылаться на те же ключи в S3, ставит в зукипере аналогичную метку,
только со своим ID реплики, и работает с этим.
только со своим ID реплики, и работает с этим. Для первого фалйа из списка проверяет наличие первого ы3-объекта (просто наличие), если объект с таким именем найден, то все ок, если нет, то откат на старую версию.
(Сейчас есть еще код на случай наличия более одного диска S3, тогда перебирает все и если на каком-то файл найден, то использует его, но мы внутри команды MDB смотрим на такую конфигурацию как на странную.
Планируем ограничить функционал только случаем одного S3 диска.)
При желании удалить парт нода удаляет в Зукипере ключ `<путь к данным таблицы>/zero_copy_s3/<некий ID парта>/<ID реплики>`, потом получает все подключи `<путь к данным таблицы>/zero_copy_s3/<некий ID парта>`.
При желании удалить парт нода удаляет в Зукипере ключ `<путь к данным таблицы>/zero_copy_s3/shared/<некий ID парта>/<ID реплики>`, потом получает все подключи `<путь к данным таблицы>/zero_copy_s3/shared/<некий ID парта>`.
Если список не пустой, то считает, что данные использует другая нода и удаляет только локальные метаданные, если пустой, то удаляет и данные в S3.
## Костыли и недоработки, коих много
При мерже если реузльтат будет на S3, нода ставит эфемерную метку в Zookeeper по пути `<путь к данным таблицы>/zero_copy_s3/merged/<имя нового парта>`. Если такая метка уже есть, то считает, что другая нода
уже помержила или мержит сейчас, и надо сделать fetch вместо мержа самой.
* Никакой проверки, один и тот же S3 у нод или разный сейчас нет, если будет несколько разных S3, работать не будет.
## Костыли и недоработки, коих много
* В качестве ID парта берется имя первого S3-ключа от файла checksums.txt.
@ -40,8 +41,18 @@
* В протоколе репликации обмен инфой через параметр запрос в одну сторону и куку в другую мне не нравится, хотя так сделан обмен версиями репликации.
* При ошибке должно пытаться реплицироваться по старому, но хз, всегда ли сработает
* При ошибке должно пытаться реплицироваться по старому, но не уверен, всегда ли сработает
* Не будет обратной совместимости, если образуются такие шареные парты, откатиться на старую версию кликхауса не получится, иначе нода может удалить используемые другой данные.
* И вообще
* Возможны все же дублирования партов. Пример - нода делает мерж, падает. Другая нода незавимо делает мерж, первая нода поднимается. В итоге есть две копии померженого парта.
* ... много их. Честно.
## TODO, чего еще вообще не делалось
* Флаг в конфиге для включения функционала, по умолчанию будет выключен.
* Для гибридного хранилища сделать проверку и fetch при переезде парта с локального диска в S3.
* Тесты.

View File

@ -1092,7 +1092,7 @@ void IMergeTreeDataPart::lockSharedData(const String & zookeeper_path, const Str
if (id.empty())
throw Exception("Can't lock part on S3 storage", ErrorCodes::LOGICAL_ERROR);
String zookeeper_node = zookeeper_path + "/zero_copy_s3/" + id + "/" + replica_name;
String zookeeper_node = zookeeper_path + "/zero_copy_s3/shared/" + id + "/" + replica_name;
LOG_TRACE(storage.log, "Set zookeeper lock {}", id);
@ -1112,7 +1112,7 @@ bool IMergeTreeDataPart::unlockSharedData(const String & zookeeper_path, const S
if (id.empty())
return true;
String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/" + id;
String zookeeper_part_node = zookeeper_path + "/zero_copy_s3/shared/" + id;
String zookeeper_node = zookeeper_part_node + "/" + replica_name;
LOG_TRACE(storage.log, "Remove zookeeper lock for {}", id);

View File

@ -1436,6 +1436,20 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
future_merged_part.updatePath(*this, reserved_space);
future_merged_part.merge_type = entry.merge_type;
{
auto disk = reserved_space->getDisk();
if (disk->getType() == "s3")
{
auto zookeeper = getZooKeeper();
String zookeeper_node = zookeeper_path + "/zero_copy_s3/merged/" + entry.new_part_name;
zookeeper->createAncestors(zookeeper_node);
auto code = zookeeper->tryCreate(zookeeper_node, "lock", zkutil::CreateMode::Ephemeral);
/// Someone else created or started create this merge
if (code == Coordination::Error::ZNODEEXISTS)
return false;
}
}
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);