Merge pull request #49432 from CheSema/lost-blobs

all s3-blobs removed when merge aborted, remove part from failed fetch without unlock keper
This commit is contained in:
Sema Checherinda 2023-05-12 13:19:27 +02:00 committed by GitHub
commit a1ee7d52e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 358 additions and 8 deletions

View File

@ -355,10 +355,10 @@ MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
/// It is important to include PreActive and Outdated parts here because remote replicas cannot reliably
/// It is important to include Outdated parts here because remote replicas cannot reliably
/// determine the local state of the part, so queries for the parts in these states are completely normal.
auto part = data.getPartIfExists(
name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
return part;
@ -919,6 +919,10 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
new_data_part->is_temp = true;
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse claims that this new part can be deleted in temporary state without unlocking the blobs
/// The blobs have to stay intact, this temporary part does not own them and does not share them yet.
new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
}

View File

@ -1684,12 +1684,6 @@ void IMergeTreeDataPart::remove()
return CanRemoveDescription{.can_remove_anything = true, .files_not_to_remove = {} };
}
if (getState() == MergeTreeDataPartState::Temporary)
{
LOG_TRACE(storage.log, "Part {} in temporary state can be removed without unlocking shared state", name);
return CanRemoveDescription{.can_remove_anything = false, .files_not_to_remove = {} };
}
auto [can_remove, files_not_to_remove] = canRemovePart();
if (!can_remove)
LOG_TRACE(storage.log, "Blobs of part {} cannot be removed", name);

View File

@ -218,6 +218,22 @@ public:
/// FIXME Why do we need this flag? What's difference from Temporary and DeleteOnDestroy state? Can we get rid of this?
bool is_temp = false;
/// This type and the field remove_tmp_policy is used as a hint
/// to help avoid communication with keeper when temporary part is deleting.
/// The common procedure is to ask the keeper with unlock request to release a references to the blobs.
/// And then follow the keeper answer decide remove or preserve the blobs in that part from s3.
/// However in some special cases Clickhouse can make a decision without asking keeper.
enum class BlobsRemovalPolicyForTemporaryParts
{
/// decision about removing blobs is determined by keeper, the common case
ASK_KEEPER,
/// is set when Clickhouse is sure that the blobs in the part are belong only to it, other replicas have not seen them yet
REMOVE_BLOBS,
/// is set when Clickhouse is sure that the blobs belong to other replica and current replica has not locked them on s3 yet
PRESERVE_BLOBS,
};
BlobsRemovalPolicyForTemporaryParts remove_tmp_policy = BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
/// If true it means that there are no ZooKeeper node for this part, so it should be deleted only from filesystem
bool is_duplicate = false;

View File

@ -209,6 +209,10 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->new_data_part->uuid = global_ctx->future_part->uuid;
global_ctx->new_data_part->partition.assign(global_ctx->future_part->getPartition());
global_ctx->new_data_part->is_temp = global_ctx->parent_part == nullptr;
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse claims that this new part can be deleted in temporary state without unlocking the blobs
/// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet.
global_ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
ctx->need_remove_expired_values = false;
ctx->force_ttl = false;

View File

@ -8285,6 +8285,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse claims that this new part can be deleted in temporary state without unlocking the blobs
/// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet.
new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
auto new_data_part_storage = new_data_part->getDataPartStoragePtr();
new_data_part_storage->beginTransaction();

View File

@ -469,6 +469,10 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl(
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse claims that this new part can be deleted in temporary state without unlocking the blobs
/// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet.
new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
SyncGuardPtr sync_guard;
if (new_data_part->isStoredOnDisk())

View File

@ -1838,6 +1838,11 @@ bool MutateTask::prepare()
if (!isWidePart(ctx->source_part) || !isFullPartStorage(ctx->source_part->getDataPartStorage())
|| (ctx->interpreter && ctx->interpreter->isAffectingAllColumns()))
{
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse claims that this new part can be deleted in temporary state without unlocking the blobs
/// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet.
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
}
else /// TODO: check that we modify only non-key columns in this case.
@ -1865,6 +1870,12 @@ bool MutateTask::prepare()
ctx->for_file_renames,
ctx->mrk_extension);
/// In case of replicated merge tree with zero copy replication
/// Here Clickhouse has to follow the common procedure when deleting new part in temporary state
/// Some of the files within the blobs are shared with source part, some belongs only to the part
/// Keeper has to be asked with unlock request to release the references to the blobs
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
task = std::make_unique<MutateSomePartColumnsTask>(ctx);
}

View File

@ -8357,6 +8357,33 @@ StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, co
return std::make_pair(true, NameSet{});
}
if (part.getState() == MergeTreeDataPartState::Temporary && part.is_temp)
{
/// Part {} is in temporary state and has it_temp flag. it means that it is under construction.
/// That path hasn't been added to active set, no commit procedure has begun.
/// The metadata files is about to delete now. Clichouse has to make a decision remove or preserve blobs on remote FS.
/// In general remote data might be shared and has to be unlocked in the keeper before removing.
/// However there are some cases when decision is clear without asking keeper:
/// When the part has been fetched then remote data has to be preserved, part doesn't own it.
/// When the part has been merged then remote data can be removed, part owns it.
/// In opposition, when the part has been mutated in generally it hardlinks the files from source part.
/// Therefore remote data could be shared, it has to be unlocked in the keeper.
/// In order to track all that cases remove_tmp_policy is used.
/// Clickhouse set that field as REMOVE_BLOBS or PRESERVE_BLOBS when it sure about the decision without asking keeper.
if (part.remove_tmp_policy == IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS
|| part.remove_tmp_policy == IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::PRESERVE_BLOBS)
{
bool can_remove_blobs = part.remove_tmp_policy == IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
LOG_INFO(log, "Looks like CH knows the origin of that part. "
"Part {} can be deleted without unlocking shared data in zookeeper. "
"Part blobs {}.",
part.name,
can_remove_blobs ? "will be removed" : "have to be preserved");
return std::make_pair(can_remove_blobs, NameSet{});
}
}
if (has_metadata_in_zookeeper.has_value() && !has_metadata_in_zookeeper)
{
if (zookeeper->exists(zookeeper_path))

View File

@ -28,3 +28,12 @@ def wait_for_delete_empty_parts(node, table, database=None, **kwargs):
f"WHERE active AND rows = 0 AND table = '{table}' AND database = '{database}'"
)
assert_eq_with_retry(node, empty_parts_query, "0\n", **kwargs)
def wait_for_merges(node, table, database=None, **kwargs):
table, database = _parse_table_database(table, database)
merges_count_query = (
f"SELECT count() > 0 FROM system.merges "
f"WHERE table = '{table}' AND database = '{database}'"
)
assert_eq_with_retry(node, merges_count_query, "1\n", **kwargs)

View File

@ -22,6 +22,14 @@
<secret_access_key>minio123</secret_access_key>
<s3_max_single_read_retries>10</s3_max_single_read_retries>
</no_delete_objects_s3>
<broken_s3>
<type>s3</type>
<endpoint>http://resolver:8083/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<s3_retry_attempts>1</s3_retry_attempts>
<connect_timeout_ms>20000</connect_timeout_ms>
</broken_s3>
<hdd>
<type>local</type>
<path>/</path>
@ -100,6 +108,23 @@
</main>
</volumes>
</s3_cache_r>
<external_broken_s3>
<volumes>
<main>
<disk>hdd</disk>
</main>
<external>
<disk>broken_s3</disk>
</external>
</volumes>
</external_broken_s3>
<broken_s3>
<volumes>
<main>
<disk>broken_s3</disk>
</main>
</volumes>
</broken_s3>
</policies>
</storage_configuration>

View File

@ -0,0 +1,116 @@
import sys
import urllib.parse
import http.server
import socketserver
UPSTREAM_HOST = "minio1"
UPSTREAM_PORT = 9001
class ServerRuntime:
def __init__(self):
self.error_at_put_when_length_bigger = None
def reset(self):
self.error_at_put_when_length_bigger = None
runtime = ServerRuntime()
class RequestHandler(http.server.BaseHTTPRequestHandler):
def _ok(self):
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
def _ping(self):
self._ok()
def _read_out(self):
content_length = int(self.headers.get("Content-Length", 0))
to_read = content_length
while to_read > 0:
# read content in order to avoid error on client
# Poco::Exception. Code: 1000, e.code() = 32, I/O error: Broken pipe
# do it piece by piece in order to avoid big allocation
size = min(to_read, 1024)
str(self.rfile.read(size))
to_read -= size
def _redirect(self):
self._read_out()
self.send_response(307)
url = f"http://{UPSTREAM_HOST}:{UPSTREAM_PORT}{self.path}"
self.send_header("Location", url)
self.end_headers()
self.wfile.write(b"Redirected")
def _error(self, data):
self._read_out()
self.send_response(500)
self.send_header("Content-Type", "text/xml")
self.end_headers()
self.wfile.write(data)
def _mock_settings(self):
parts = urllib.parse.urlsplit(self.path)
path = [x for x in parts.path.split("/") if x]
assert path[0] == "mock_settings", path
if path[1] == "error_at_put":
params = urllib.parse.parse_qs(parts.query, keep_blank_values=False)
runtime.error_at_put_when_length_bigger = int(
params.get("when_length_bigger", [1024 * 1024])[0]
)
self._ok()
elif path[1] == "reset":
runtime.reset()
self._ok()
else:
self._error("_mock_settings: wrong command")
def do_GET(self):
if self.path == "/":
self._ping()
elif self.path.startswith("/mock_settings"):
self._mock_settings()
else:
self._redirect()
def do_PUT(self):
if runtime.error_at_put_when_length_bigger is not None:
content_length = int(self.headers.get("Content-Length", 0))
if content_length > runtime.error_at_put_when_length_bigger:
self._error(
b'<?xml version="1.0" encoding="UTF-8"?>'
b"<Error>"
b"<Code>ExpectedError</Code>"
b"<Message>mock s3 injected error</Message>"
b"<RequestId>txfbd566d03042474888193-00608d7537</RequestId>"
b"</Error>"
)
else:
self._redirect()
else:
self._redirect()
def do_POST(self):
self._redirect()
def do_HEAD(self):
self._redirect()
def do_DELETE(self):
self._redirect()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -8,6 +8,8 @@ from helpers.mock_servers import start_mock_servers
from helpers.utility import generate_values, replace_config, SafeThread
from helpers.wait_for_helpers import wait_for_delete_inactive_parts
from helpers.wait_for_helpers import wait_for_delete_empty_parts
from helpers.wait_for_helpers import wait_for_merges
from helpers.test_tools import assert_eq_with_retry
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -97,6 +99,7 @@ def run_s3_mocks(cluster):
[
("unstable_proxy.py", "resolver", "8081"),
("no_delete_objects.py", "resolver", "8082"),
("broken_s3.py", "resolver", "8083"),
],
)
@ -136,6 +139,18 @@ def clear_minio(cluster):
yield
@pytest.fixture(autouse=True, scope="function")
def reset_mock_broken_s3(cluster):
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", f"http://localhost:8083/mock_settings/reset"],
nothrow=True,
)
assert response == "OK"
yield
def check_no_objects_after_drop(cluster, table_name="s3_test", node_name="node"):
node = cluster.instances[node_name]
node.query(f"DROP TABLE IF EXISTS {table_name} SYNC")
@ -825,3 +840,124 @@ def test_cache_setting_compatibility(cluster, node_name):
assert not node.contains_in_log("No such file or directory: Cache info:")
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_drop(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_drop NO DELAY")
node.query(
"CREATE TABLE test_merge_canceled_by_drop "
" (key UInt32, value String)"
" Engine=MergeTree() "
" ORDER BY value "
" SETTINGS storage_policy='s3'"
)
node.query("SYSTEM STOP MERGES test_merge_canceled_by_drop")
node.query(
"INSERT INTO test_merge_canceled_by_drop SELECT number, toString(number) FROM numbers(100000000)"
)
node.query("SYSTEM START MERGES test_merge_canceled_by_drop")
wait_for_merges(node, "test_merge_canceled_by_drop")
check_no_objects_after_drop(
cluster, table_name="test_merge_canceled_by_drop", node_name=node_name
)
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS test_merge_canceled_by_s3_errors NO DELAY")
node.query(
"CREATE TABLE test_merge_canceled_by_s3_errors "
" (key UInt32, value String)"
" Engine=MergeTree() "
" ORDER BY value "
" SETTINGS storage_policy='broken_s3'"
)
node.query("SYSTEM STOP MERGES test_merge_canceled_by_s3_errors")
node.query(
"INSERT INTO test_merge_canceled_by_s3_errors SELECT number, toString(number) FROM numbers(10000)"
)
node.query(
"INSERT INTO test_merge_canceled_by_s3_errors SELECT 2*number, toString(number) FROM numbers(10000)"
)
min_key = node.query("SELECT min(key) FROM test_merge_canceled_by_s3_errors")
assert int(min_key) == 0, min_key
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=50000",
],
nothrow=True,
)
assert response == "OK"
node.query("SYSTEM START MERGES test_merge_canceled_by_s3_errors")
error = node.query_and_get_error(
"OPTIMIZE TABLE test_merge_canceled_by_s3_errors FINAL",
)
assert "ExpectedError Message: mock s3 injected error" in error, error
node.wait_for_log_line("ExpectedError Message: mock s3 injected error")
check_no_objects_after_drop(
cluster, table_name="test_merge_canceled_by_s3_errors", node_name=node_name
)
@pytest.mark.parametrize("node_name", ["node"])
def test_merge_canceled_by_s3_errors_when_move(cluster, node_name):
node = cluster.instances[node_name]
settings = {
"storage_policy": "external_broken_s3",
"merge_with_ttl_timeout": 1,
}
create_table(node, "merge_canceled_by_s3_errors_when_move", **settings)
node.query("SYSTEM STOP MERGES merge_canceled_by_s3_errors_when_move")
node.query(
"INSERT INTO merge_canceled_by_s3_errors_when_move"
" VALUES {}".format(generate_values("2020-01-03", 1000))
)
node.query(
"INSERT INTO merge_canceled_by_s3_errors_when_move"
" VALUES {}".format(generate_values("2020-01-03", 1000, -1))
)
node.query(
"ALTER TABLE merge_canceled_by_s3_errors_when_move"
" MODIFY TTL"
" dt + INTERVAL 1 DAY "
" TO VOLUME 'external'",
settings={"materialize_ttl_after_modify": 0},
)
response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
[
"curl",
"-s",
f"http://localhost:8083/mock_settings/error_at_put?when_length_bigger=10000",
],
nothrow=True,
)
assert response == "OK"
node.query("SYSTEM START MERGES merge_canceled_by_s3_errors_when_move")
node.query("OPTIMIZE TABLE merge_canceled_by_s3_errors_when_move FINAL")
node.wait_for_log_line("ExpectedError Message: mock s3 injected error")
count = node.query("SELECT count() FROM merge_canceled_by_s3_errors_when_move")
assert int(count) == 2000, count
check_no_objects_after_drop(
cluster, table_name="merge_canceled_by_s3_errors_when_move", node_name=node_name
)