Merge pull request #10126 from Jokser/replicated-merge-tree-s3

Support ReplicatedMergeTree over S3
This commit is contained in:
alexey-milovidov 2020-04-18 16:13:04 +03:00 committed by GitHub
commit aac7cc7330
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 157 additions and 22 deletions

View File

@ -255,23 +255,23 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
const ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in)
{
size_t files;
readBinary(files, in);
auto disk = reservation->getDisk();
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/").absolute().toString();
Poco::File part_file(absolute_part_path);
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
if (part_file.exists())
throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (disk->exists(part_download_path))
throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
part_file.createDirectory();
disk->createDirectories(part_download_path);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
@ -284,21 +284,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
/// File must be inside "absolute_part_path" directory.
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString();
if (!startsWith(absolute_file_path, absolute_part_path))
throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")."
String absolute_file_path = Poco::Path(part_download_path + file_name).absolute().toString();
if (!startsWith(absolute_file_path, Poco::Path(part_download_path).absolute().toString()))
throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + part_download_path + ")."
" This may happen if we are trying to download part from malicious replica or logical error.",
ErrorCodes::INSECURE_PATH);
WriteBufferFromFile file_out(absolute_file_path);
HashingWriteBuffer hashing_out(file_out);
auto file_out = disk->writeFile(part_download_path + file_name);
HashingWriteBuffer hashing_out(*file_out);
copyData(in, hashing_out, file_size, blocker.getCounter());
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
part_file.remove(true);
disk->removeRecursive(part_download_path);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
@ -306,7 +306,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
readPODBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path,
throw Exception("Checksum mismatch for file " + fullPath(disk, part_download_path + file_name) + " transferred from " + replica_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
if (file_name != "checksums.txt" &&
@ -316,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), relative_part_path);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);

View File

@ -620,6 +620,8 @@ public:
return storage_settings.get();
}
String getRelativeDataPath() const { return relative_data_path; }
/// Get table path on disk
String getFullPathOnDisk(const DiskPtr & disk) const;

View File

@ -13,7 +13,7 @@
</hdd>
</disks>
<policies>
<default>
<s3>
<volumes>
<main>
<disk>s3</disk>
@ -22,7 +22,7 @@
<disk>hdd</disk>
</external>
</volumes>
</default>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -67,7 +67,9 @@ def create_table(cluster, table_name, additional_settings=None):
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
old_parts_lifetime=0, index_granularity=512
storage_policy='s3',
old_parts_lifetime=0,
index_granularity=512
""".format(table_name)
if additional_settings:
@ -84,7 +86,12 @@ def drop_table(cluster):
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
try:
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@pytest.mark.parametrize(
@ -210,7 +217,7 @@ def test_attach_detach_partition(cluster):
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-04'")
node.query("SET allow_drop_detached=1; ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'")
node.query("ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1})
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD
@ -245,8 +252,7 @@ def test_table_manipulations(cluster):
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("RENAME TABLE s3_renamed TO s3_test")
# TODO: Doesn't work with min_max index.
#assert node.query("SET check_query_single_value_result='false'; CHECK TABLE s3_test FORMAT Values") == "(1)"
assert node.query("CHECK TABLE s3_test FORMAT Values") == "(1)"
node.query("DETACH TABLE s3_test")
node.query("ATTACH TABLE s3_test")

View File

@ -0,0 +1,21 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,106 @@
import logging
import random
import string
import time
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
minio_client = cluster.minio_client
if minio_client.bucket_exists(cluster.minio_bucket):
minio_client.remove_bucket(cluster.minio_bucket)
minio_client.make_bucket(cluster.minio_bucket)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", config_dir="configs", macros={'cluster': 'test1'}, with_minio=True, with_zookeeper=True)
cluster.add_instance("node2", config_dir="configs", macros={'cluster': 'test1'}, with_zookeeper=True)
cluster.add_instance("node3", config_dir="configs", macros={'cluster': 'test1'}, with_zookeeper=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
finally:
cluster.shutdown()
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign*(i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(cluster):
create_table_statement = """
CREATE TABLE s3_test (
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/s3', '{instance}')
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS storage_policy='s3'
"""
for node in cluster.instances.values():
node.query(create_table_statement)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
for node in cluster.instances.values():
node.query("DROP TABLE IF EXISTS s3_test")
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)
def test_insert_select_replicated(cluster):
create_table(cluster)
all_values = ""
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values), settings={"insert_quorum": 3})
if node_idx != 1:
all_values += ","
all_values += values
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values", settings={"select_sequential_consistency": 1}) == all_values
minio = cluster.minio_client
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 * (FILES_OVERHEAD + FILES_OVERHEAD_PER_PART * 3)