Merge pull request #15657 from Jokser/disk-s3-write-error-handling

Proper error handling during insert into MergeTree with S3
This commit is contained in:
alexey-milovidov 2020-10-07 11:17:32 +03:00 committed by GitHub
commit e00961f882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 250 additions and 1 deletions

View File

@ -194,6 +194,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();
},
buf_size);
}

View File

@ -27,6 +27,7 @@ void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, cons
auto in = from_disk.readFile(from_path);
auto out = to_disk.writeFile(to_path);
copyData(*in, *out);
out->finalize();
}

View File

@ -1,3 +1,4 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
@ -123,6 +124,9 @@ void registerDiskS3(DiskFactory & factory)
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
cfg.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt(config_prefix + ".retry_attempts", 10));
auto client = S3::ClientFactory::instance().create(
cfg,
uri.is_virtual_hosted_style,

View File

@ -88,6 +88,7 @@ void IMergeTreeDataPart::MinMaxIndex::store(
out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
out->finalize();
}
}

View File

@ -229,6 +229,8 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
marks.next();
addToChecksums(checksums);
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();

View File

@ -17,8 +17,12 @@ namespace
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
{
compressed.next();
plain_file->next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
plain_hashing.next();
marks.next();
plain_file->finalize();
marks_file->finalize();
}
void MergeTreeDataPartWriterOnDisk::Stream::sync() const
@ -331,6 +335,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_file_stream->finalize();
if (sync)
index_file_stream->sync();
index_stream = nullptr;

View File

@ -156,6 +156,7 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->finalize();
}
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)

View File

@ -148,6 +148,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->finalize();
if (sync)
count_out->sync();
}
@ -160,6 +161,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->finalize();
if (sync)
out->sync();
}
@ -170,6 +172,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out);
out->finalize();
if (sync)
out->sync();
}
@ -178,6 +181,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->finalize();
}
else
{
@ -189,6 +193,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write file with checksums.
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
out->finalize();
if (sync)
out->sync();
}

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,26 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<!-- Use custom S3 endpoint -->
<endpoint>http://resolver:8080/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,49 @@
from bottle import request, route, run, response
# Endpoint can be configured to throw 500 error on N-th request attempt.
# In usual situation just redirects to original Minio server.
# Dict to the number of request should be failed.
cache = {}
@route('/fail_request/<_request_number>')
def fail_request(_request_number):
request_number = int(_request_number)
if request_number > 0:
cache['request_number'] = request_number
else:
cache.pop('request_number', None)
return 'OK'
# Handle for MultipleObjectsDelete.
@route('/<_bucket>', ['POST'])
def delete(_bucket):
response.set_header("Location", "http://minio1:9001/" + _bucket + "?" + request.query_string)
response.status = 307
return 'Redirected'
@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE'])
def server(_bucket, _path):
if cache.get('request_number', None):
request_number = cache.pop('request_number') - 1
if request_number > 0:
cache['request_number'] = request_number
else:
response.status = 500
return 'Expected Error'
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
response.status = 307
return 'Redirected'
@route('/')
def ping():
return 'OK'
run(host='0.0.0.0', port=8080)

View File

@ -0,0 +1,117 @@
import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py")
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
def fail_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY")
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s)
FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2
@pytest.mark.parametrize(
"min_bytes_for_wide_part,request_count",
[
(0, FILES_PER_PART_WIDE),
(1024 * 1024, FILES_PER_PART_COMPACT)
]
)
def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
dt Date,
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
PARTITION BY dt
SETTINGS storage_policy='s3', min_bytes_for_wide_part={}
"""
.format(min_bytes_for_wide_part)
)
for request in range(request_count + 1):
# Fail N-th request to S3.
fail_request(cluster, request + 1)
data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')"
positive = request == request_count
try:
node.query("INSERT INTO s3_failover_test VALUES {}".format(data))
assert positive, "Insert query should be failed, request {}".format(request)
except QueryRuntimeException as e:
assert not positive, "Insert query shouldn't be failed, request {}".format(request)
assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e))
if positive:
# Disable request failing.
fail_request(cluster, 0)
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data