More integration tests

This commit is contained in:
kssenii 2021-05-22 20:33:15 +00:00
parent 6931573f69
commit 0f98731c1f
10 changed files with 406 additions and 116 deletions

View File

@ -80,7 +80,8 @@ RUN python3 -m pip install \
redis \
tzlocal \
urllib3 \
requests-kerberos
requests-kerberos \
pyhdfs
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -47,7 +47,7 @@ public:
std::unique_ptr<WriteBufferFromFileBase> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
void removeFromRemoteFS(const RemoteFSPathKeeper & remote_paths_keeper) override;
void removeFromRemoteFS(const RemoteFSPathKeeper & fs_paths_keeper) override;
private:
String getRandomName() { return toString(UUIDHelpers::generateV4()); }

View File

@ -19,7 +19,8 @@
namespace DB
{
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API)
/// Helper class to collect keys into chunks of maximum size (to prepare batch requests to AWS API).
/// Used for both S3 and HDFS.
class RemoteFSPathKeeper : public std::list<Aws::Vector<Aws::S3::Model::ObjectIdentifier>>
{
public:
@ -134,9 +135,9 @@ protected:
const String metadata_path;
private:
void removeMeta(const String & path, RemoteFSPathKeeper & keys);
void removeMeta(const String & path, RemoteFSPathKeeper & fs_paths_keeper);
void removeMetaRecursive(const String & path, RemoteFSPathKeeper & keys);
void removeMetaRecursive(const String & path, RemoteFSPathKeeper & fs_paths_keeper);
bool tryReserve(UInt64 bytes);

View File

@ -1,11 +0,0 @@
<?xml version="1.0"?>
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/</endpoint>
</hdfs>
</disks>
</storage_configuration>
</yandex>

View File

@ -1,27 +0,0 @@
<?xml version="1.0"?>
<yandex>
<storage_configuration>
<disks>
<default>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/</endpoint>
</default>
</disks>
<policies>
<hdfs_main>
<volumes>
<main>
<disk>default</disk>
</main>
</volumes>
</hdfs_main>
<hdfs_cold>
<volumes>
<main>
<disk>default</disk>
</main>
</volumes>
</hdfs_cold>
</policies>
</storage_configuration>
</yandex>

View File

@ -1,73 +0,0 @@
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.hdfs_api import HDFSApi
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=[
'configs/storage.xml',
'configs/log_conf.xml'], with_hdfs=True)
node2 = cluster.add_instance('node2', main_configs=[
'configs/storage_hdfs_as_default.xml',
'configs/log_conf.xml'], with_hdfs=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_read_write(started_cluster):
node1.query("DROP TABLE IF EXISTS simple_test")
node1.query("CREATE TABLE simple_test (id UInt64) Engine=TinyLog SETTINGS disk = 'hdfs'")
node1.query("INSERT INTO simple_test SELECT number FROM numbers(3)")
node1.query("INSERT INTO simple_test SELECT number FROM numbers(3, 3)")
assert node1.query("SELECT * FROM simple_test") == "0\n1\n2\n3\n4\n5\n"
def test_hdfs_disk_as_default(started_cluster):
assert int(node2.query("SELECT count() FROM system.disks")) == 1
assert node2.query("SELECT name, type FROM system.disks") == "default\thdfs\n"
node2.query("CREATE DATABASE test_database")
assert 'test_database' in node2.query('SHOW DATABASES')
node2.query("DROP TABLE IF EXISTS test_database.test_table")
assert 'test_table' not in node2.query('SHOW TABLES FROM test_database')
node2.query("CREATE TABLE test_database.test_table (id UInt32) Engine=Memory()")
assert 'test_table' in node2.query('SHOW TABLES FROM test_database')
for i in range(5):
node2.query("INSERT INTO test_database.test_table SELECT number FROM numbers(100000)")
assert int(node2.query("SELECT count() FROM test_database.test_table").rstrip()) == 5 * 100000
node2.query("RENAME TABLE test_database.test_table to test_database.test")
assert int(node2.query("SELECT count() FROM test_database.test").rstrip()) == 5 * 100000
node2.query("RENAME TABLE test_database.test to test_database.test_table")
node2.query("TRUNCATE TABLE test_database.test_table")
assert int(node2.query("SELECT count() FROM test_database.test_table").rstrip()) == 0
node2.query("INSERT INTO test_database.test_table SELECT number FROM numbers(100000)")
assert int(node2.query("SELECT count() FROM test_database.test_table").rstrip()) == 100000
node2.query("DETACH TABLE test_database.test_table")
assert 'test_table' not in node2.query('SHOW TABLES FROM test_database')
node2.query("ATTACH TABLE test_database.test_table")
assert 'test_table' in node2.query('SHOW TABLES FROM test_database')
node2.query("DROP TABLE test_database.test_table")
assert 'test_table' not in node2.query('SHOW TABLES FROM test_database')
node2.query("DROP DATABASE test_database")
assert 'test_database' not in node2.query('SHOW DATABASES')

View File

@ -1,4 +1,5 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>

View File

@ -0,0 +1,30 @@
<yandex>
<storage_configuration>
<disks>
<hdfs>
<type>hdfs</type>
<endpoint>hdfs://hdfs1:9000/clickhouse/</endpoint>
</hdfs>
<hdd>
<type>local</type>
<path>/</path>
</hdd>
</disks>
<policies>
<hdfs>
<volumes>
<main>
<disk>hdfs</disk>
</main>
<external>
<disk>hdd</disk>
</external>
</volumes>
</hdfs>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
</merge_tree>
</yandex>

View File

@ -0,0 +1,368 @@
import logging
import random
import string
import time
import threading
import os
import pytest
from helpers.cluster import ClickHouseCluster
from pyhdfs import HdfsClient
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(SCRIPT_DIR, './_instances/node/configs/config.d/storage_conf.xml')
def create_table(cluster, table_name, additional_settings=None):
node = cluster.instances["node"]
create_table_statement = """
CREATE TABLE {} (
dt Date, id Int64, data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=MergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS
storage_policy='hdfs',
old_parts_lifetime=0,
index_granularity=512
""".format(table_name)
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
node.query(create_table_statement)
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", main_configs=["configs/config.d/storage_conf.xml",
"configs/config.d/log_conf.xml"], with_hdfs=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
fs = HdfsClient(hosts='localhost')
fs.mkdirs('/clickhouse')
logging.info("Created HDFS directory")
yield cluster
finally:
cluster.shutdown()
def wait_for_delete_hdfs_objects(cluster, expected, num_tries=30):
fs = HdfsClient(hosts='localhost')
while num_tries > 0:
num_hdfs_objects = len(fs.listdir('/clickhouse'))
if num_hdfs_objects == expected:
break;
num_tries -= 1
time.sleep(1)
assert(len(fs.listdir('/clickhouse')) == expected)
@pytest.fixture(autouse=True)
def drop_table(cluster):
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
hdfs_objects = fs.listdir('/clickhouse')
print('Number of hdfs objects to delete:', len(hdfs_objects), sep=' ')
node.query("DROP TABLE IF EXISTS hdfs_test SYNC")
try:
wait_for_delete_hdfs_objects(cluster, 0)
finally:
hdfs_objects = fs.listdir('/clickhouse')
if len(hdfs_objects) == 0:
return
print("Manually removing extra objects to prevent tests cascade failing: ", hdfs_objects)
for path in hdfs_objects:
fs.delete(path)
@pytest.mark.parametrize("min_rows_for_wide_part,files_per_part", [(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)])
def test_simple_insert_select(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, "hdfs_test", additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
node = cluster.instances["node"]
values1 = generate_values('2020-01-03', 4096)
node.query("INSERT INTO hdfs_test VALUES {}".format(values1))
assert node.query("SELECT * FROM hdfs_test order by dt, id FORMAT Values") == values1
fs = HdfsClient(hosts='localhost')
hdfs_objects = fs.listdir('/clickhouse')
print(hdfs_objects)
assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part
values2 = generate_values('2020-01-04', 4096)
node.query("INSERT INTO hdfs_test VALUES {}".format(values2))
assert node.query("SELECT * FROM hdfs_test ORDER BY dt, id FORMAT Values") == values1 + "," + values2
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + files_per_part * 2
assert node.query("SELECT count(*) FROM hdfs_test where id = 1 FORMAT Values") == "(2)"
def test_alter_table_columns(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
node.query("ALTER TABLE hdfs_test ADD COLUMN col1 UInt64 DEFAULT 1")
# To ensure parts have merged
node.query("OPTIMIZE TABLE hdfs_test")
assert node.query("SELECT sum(col1) FROM hdfs_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(col1) FROM hdfs_test WHERE id > 0 FORMAT Values") == "(4096)"
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN)
node.query("ALTER TABLE hdfs_test MODIFY COLUMN col1 String", settings={"mutations_sync": 2})
assert node.query("SELECT distinct(col1) FROM hdfs_test FORMAT Values") == "('1')"
# and file with mutation
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD_PER_COLUMN + 1)
node.query("ALTER TABLE hdfs_test DROP COLUMN col1", settings={"mutations_sync": 2})
# and 2 files with mutations
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2)
def test_attach_detach_partition(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test DROP PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(4096)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE hdfs_test DETACH PARTITION '2020-01-04'")
node.query("ALTER TABLE hdfs_test DROP DETACHED PARTITION '2020-01-04'", settings={"allow_drop_detached": 1})
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD
def test_move_partition_to_another_disk(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-04' TO DISK 'hdfs'")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
def test_table_manipulations(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
node.query("RENAME TABLE hdfs_test TO hdfs_renamed")
assert node.query("SELECT count(*) FROM hdfs_renamed FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("RENAME TABLE hdfs_renamed TO hdfs_test")
assert node.query("CHECK TABLE hdfs_test FORMAT Values") == "(1)"
node.query("DETACH TABLE hdfs_test")
node.query("ATTACH TABLE hdfs_test")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
node.query("TRUNCATE TABLE hdfs_test")
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(0)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD
def test_move_replace_partition_to_another_table(cluster):
create_table(cluster, "hdfs_test")
node = cluster.instances["node"]
fs = HdfsClient(hosts='localhost')
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-04', 4096)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096, -1)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-06', 4096, -1)))
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
create_table(cluster, "hdfs_clone")
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-03' TO TABLE hdfs_clone")
node.query("ALTER TABLE hdfs_test MOVE PARTITION '2020-01-05' TO TABLE hdfs_clone")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)"
# Number of objects in HDFS should be unchanged.
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
# Add new partitions to source table, but with different values and replace them from copied table.
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-05', 4096)))
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 6
node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-03' FROM hdfs_clone")
node.query("ALTER TABLE hdfs_test REPLACE PARTITION '2020-01-05' FROM hdfs_clone")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
assert node.query("SELECT sum(id) FROM hdfs_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_clone FORMAT Values") == "(8192)"
# Wait for outdated partitions deletion.
print(1)
wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4)
node.query("DROP TABLE hdfs_clone NO DELAY")
assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM hdfs_test FORMAT Values") == "(16384)"
# Data should remain in hdfs
hdfs_objects = fs.listdir('/clickhouse')
assert len(hdfs_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
node.query("DROP TABLE hdfs_test NO DELAY")
# Backup data should remain in hdfs.
print(2)
# TODO: it gets deleted, but shouldn't
#wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE * 4)
#@pytest.mark.parametrize("merge_vertical", [False, True])
#def test_insert_same_partition_and_merge(cluster, merge_vertical):
# settings = None
# if merge_vertical:
# settings = """
# vertical_merge_algorithm_min_rows_to_activate=0,
# vertical_merge_algorithm_min_columns_to_activate=0
# """
# create_table(cluster, "hdfs_test", additional_settings=settings)
#
# node = cluster.instances["node"]
# fs = HdfsClient(hosts='localhost')
#
# node.query("SYSTEM STOP MERGES hdfs_test")
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 1024)))
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 2048)))
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096)))
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 1024, -1)))
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 2048, -1)))
# node.query("INSERT INTO hdfs_test VALUES {}".format(generate_values('2020-01-03', 4096, -1)))
# assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
# assert node.query("SELECT count(distinct(id)) FROM hdfs_test FORMAT Values") == "(8192)"
#
# hdfs_objects = fs.listdir('/clickhouse')
# assert len(hdfs_objects) == FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
#
# node.query("SYSTEM START MERGES hdfs_test")
#
# # Wait for merges and old parts deletion
# for attempt in range(0, 10):
# parts_count = node.query("SELECT COUNT(*) FROM system.parts WHERE table = 'hdfs_test' FORMAT Values")
# if parts_count == "(1)":
# break
# if attempt == 9:
# assert parts_count == "(1)"
# time.sleep(1)
#
# assert node.query("SELECT sum(id) FROM hdfs_test FORMAT Values") == "(0)"
# assert node.query("SELECT count(distinct(id)) FROM hdfs_test FORMAT Values") == "(8192)"
# wait_for_delete_hdfs_objects(cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD)