diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp
index c656fbf0c58..70c06efff5c 100644
--- a/src/Storages/MergeTree/DataPartsExchange.cpp
+++ b/src/Storages/MergeTree/DataPartsExchange.cpp
@@ -255,23 +255,23 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
const ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in)
{
-
size_t files;
readBinary(files, in);
+ auto disk = reservation->getDisk();
+
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
- String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
- String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/").absolute().toString();
- Poco::File part_file(absolute_part_path);
+ String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
+ String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
- if (part_file.exists())
- throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
+ if (disk->exists(part_download_path))
+ throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
- part_file.createDirectory();
+ disk->createDirectories(part_download_path);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
@@ -284,21 +284,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
/// File must be inside "absolute_part_path" directory.
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
- String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString();
- if (!startsWith(absolute_file_path, absolute_part_path))
- throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")."
+ String absolute_file_path = Poco::Path(part_download_path + file_name).absolute().toString();
+ if (!startsWith(absolute_file_path, Poco::Path(part_download_path).absolute().toString()))
+ throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + part_download_path + ")."
" This may happen if we are trying to download part from malicious replica or logical error.",
ErrorCodes::INSECURE_PATH);
- WriteBufferFromFile file_out(absolute_file_path);
- HashingWriteBuffer hashing_out(file_out);
+ auto file_out = disk->writeFile(part_download_path + file_name);
+ HashingWriteBuffer hashing_out(*file_out);
copyData(in, hashing_out, file_size, blocker.getCounter());
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
- part_file.remove(true);
+ disk->removeRecursive(part_download_path);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
@@ -306,7 +306,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
readPODBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
- throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path,
+ throw Exception("Checksum mismatch for file " + fullPath(disk, part_download_path + file_name) + " transferred from " + replica_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
if (file_name != "checksums.txt" &&
@@ -316,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
assertEOF(in);
- MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), relative_part_path);
+ MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h
index d299d39726e..b50db21f5bf 100644
--- a/src/Storages/MergeTree/MergeTreeData.h
+++ b/src/Storages/MergeTree/MergeTreeData.h
@@ -620,6 +620,8 @@ public:
return storage_settings.get();
}
+ String getRelativeDataPath() const { return relative_data_path; }
+
/// Get table path on disk
String getFullPathOnDisk(const DiskPtr & disk) const;
diff --git a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
index 5b292446c6b..d097675ca63 100644
--- a/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
+++ b/tests/integration/test_merge_tree_s3/configs/config.d/storage_conf.xml
@@ -13,7 +13,7 @@
-
+
s3
@@ -22,7 +22,7 @@
hdd
-
+
diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py
index f69c09631e8..50cf532e9a4 100644
--- a/tests/integration/test_merge_tree_s3/test.py
+++ b/tests/integration/test_merge_tree_s3/test.py
@@ -67,7 +67,9 @@ def create_table(cluster, table_name, additional_settings=None):
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
- old_parts_lifetime=0, index_granularity=512
+ storage_policy='s3',
+ old_parts_lifetime=0,
+ index_granularity=512
""".format(table_name)
if additional_settings:
@@ -84,7 +86,12 @@ def drop_table(cluster):
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test")
- assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
+ try:
+ assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
+ finally:
+ # Remove extra objects to prevent tests cascade failing
+ for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
+ minio.remove_object(cluster.minio_bucket, obj.object_name)
@pytest.mark.parametrize(
@@ -210,7 +217,7 @@ def test_attach_detach_partition(cluster):
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE s3_test DETACH PARTITION '2020-01-04'")
- node.query("SET allow_drop_detached=1; ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'")
+ node.query("ALTER TABLE s3_test DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1})
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD
@@ -245,8 +252,7 @@ def test_table_manipulations(cluster):
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE*2
node.query("RENAME TABLE s3_renamed TO s3_test")
- # TODO: Doesn't work with min_max index.
- #assert node.query("SET check_query_single_value_result='false'; CHECK TABLE s3_test FORMAT Values") == "(1)"
+ assert node.query("CHECK TABLE s3_test FORMAT Values") == "(1)"
node.query("DETACH TABLE s3_test")
node.query("ATTACH TABLE s3_test")
diff --git a/tests/integration/test_replicated_merge_tree_s3/__init__.py b/tests/integration/test_replicated_merge_tree_s3/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/integration/test_replicated_merge_tree_s3/configs/config.d/storage_conf.xml b/tests/integration/test_replicated_merge_tree_s3/configs/config.d/storage_conf.xml
new file mode 100644
index 00000000000..b32770095fc
--- /dev/null
+++ b/tests/integration/test_replicated_merge_tree_s3/configs/config.d/storage_conf.xml
@@ -0,0 +1,21 @@
+
+
+
+
+ s3
+ http://minio1:9001/root/data/
+ minio
+ minio123
+
+
+
+
+
+
+ s3
+
+
+
+
+
+
diff --git a/tests/integration/test_replicated_merge_tree_s3/test.py b/tests/integration/test_replicated_merge_tree_s3/test.py
new file mode 100644
index 00000000000..d6b6015a388
--- /dev/null
+++ b/tests/integration/test_replicated_merge_tree_s3/test.py
@@ -0,0 +1,106 @@
+import logging
+import random
+import string
+import time
+
+import pytest
+from helpers.cluster import ClickHouseCluster
+
+logging.getLogger().setLevel(logging.INFO)
+logging.getLogger().addHandler(logging.StreamHandler())
+
+
+# Creates S3 bucket for tests and allows anonymous read-write access to it.
+def prepare_s3_bucket(cluster):
+ minio_client = cluster.minio_client
+
+ if minio_client.bucket_exists(cluster.minio_bucket):
+ minio_client.remove_bucket(cluster.minio_bucket)
+
+ minio_client.make_bucket(cluster.minio_bucket)
+
+
+@pytest.fixture(scope="module")
+def cluster():
+ try:
+ cluster = ClickHouseCluster(__file__)
+
+ cluster.add_instance("node1", config_dir="configs", macros={'cluster': 'test1'}, with_minio=True, with_zookeeper=True)
+ cluster.add_instance("node2", config_dir="configs", macros={'cluster': 'test1'}, with_zookeeper=True)
+ cluster.add_instance("node3", config_dir="configs", macros={'cluster': 'test1'}, with_zookeeper=True)
+
+ logging.info("Starting cluster...")
+ cluster.start()
+ logging.info("Cluster started")
+
+ prepare_s3_bucket(cluster)
+ logging.info("S3 bucket created")
+
+ yield cluster
+ finally:
+ cluster.shutdown()
+
+
+FILES_OVERHEAD = 1
+FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
+FILES_OVERHEAD_PER_PART = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6
+
+
+def random_string(length):
+ letters = string.ascii_letters
+ return ''.join(random.choice(letters) for i in range(length))
+
+
+def generate_values(date_str, count, sign=1):
+ data = [[date_str, sign*(i + 1), random_string(10)] for i in range(count)]
+ data.sort(key=lambda tup: tup[1])
+ return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
+
+
+def create_table(cluster):
+ create_table_statement = """
+ CREATE TABLE s3_test (
+ dt Date,
+ id Int64,
+ data String,
+ INDEX min_max (id) TYPE minmax GRANULARITY 3
+ ) ENGINE=ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/s3', '{instance}')
+ PARTITION BY dt
+ ORDER BY (dt, id)
+ SETTINGS storage_policy='s3'
+ """
+
+ for node in cluster.instances.values():
+ node.query(create_table_statement)
+
+
+@pytest.fixture(autouse=True)
+def drop_table(cluster):
+ yield
+ for node in cluster.instances.values():
+ node.query("DROP TABLE IF EXISTS s3_test")
+
+ minio = cluster.minio_client
+ # Remove extra objects to prevent tests cascade failing
+ for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
+ minio.remove_object(cluster.minio_bucket, obj.object_name)
+
+
+def test_insert_select_replicated(cluster):
+ create_table(cluster)
+
+ all_values = ""
+ for node_idx in range(1, 4):
+ node = cluster.instances["node" + str(node_idx)]
+ values = generate_values("2020-01-0" + str(node_idx), 4096)
+ node.query("INSERT INTO s3_test VALUES {}".format(values), settings={"insert_quorum": 3})
+ if node_idx != 1:
+ all_values += ","
+ all_values += values
+
+ for node_idx in range(1, 4):
+ node = cluster.instances["node" + str(node_idx)]
+ assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values", settings={"select_sequential_consistency": 1}) == all_values
+
+ minio = cluster.minio_client
+ assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 3 * (FILES_OVERHEAD + FILES_OVERHEAD_PER_PART * 3)