This commit is contained in:
Yatsishin Ilya 2021-04-29 14:57:48 +03:00
parent a87fe5e1bc
commit cb101e46bb
7 changed files with 72 additions and 172 deletions

View File

@ -15,6 +15,7 @@ import time
import traceback
import urllib.parse
import shlex
import urllib3
from cassandra.policies import RoundRobinPolicy
import cassandra.cluster
@ -224,6 +225,7 @@ class ClickHouseCluster:
self.minio_port = 9001
self.minio_client = None # type: Minio
self.minio_redirect_host = "proxy1"
self.minio_redirect_ip = None
self.minio_redirect_port = 8080
# available when with_hdfs == True
@ -351,7 +353,7 @@ class ClickHouseCluster:
try:
logging.debug("Trying to prune unused images...")
subprocess_call(['docker', 'images', 'prune', '-f'])
subprocess_call(['docker', 'image', 'prune', '-f'])
logging.debug("Images pruned")
except:
pass
@ -985,12 +987,15 @@ class ClickHouseCluster:
def wait_minio_to_start(self, timeout=180, secure=False):
self.minio_ip = self.get_instance_ip(self.minio_host)
self.minio_redirect_ip = self.get_instance_ip(self.minio_redirect_host)
os.environ['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_dir, 'certs', 'public.crt')
minio_client = Minio(f'{self.minio_ip}:{self.minio_port}',
access_key='minio',
secret_key='minio123',
secure=secure)
secure=secure,
http_client=urllib3.PoolManager(cert_reqs='CERT_NONE')) # disable SSL check as we test ClickHouse and not Python library
start = time.time()
while time.time() - start < timeout:
try:

View File

@ -57,10 +57,6 @@ class HDFSApi(object):
if kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override="kerberizedhdfs1", principal=self.principal)
#principal=self.principal,
#hostname_override=self.host, principal=self.principal)
# , mutual_authentication=reqkerb.REQUIRED, force_preemptive=True)
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
if self.kerberos_auth is None:
print("failed to obtain kerberos_auth")
else:
@ -98,23 +94,29 @@ class HDFSApi(object):
raise Exception("Kinit running failure")
def req_wrapper(self, func, expected_code, cnt=2, **kwargs):
with dns_hook(self):
for i in range(0, cnt):
response_data = func(**kwargs)
if response_data.status_code == expected_code:
return response_data
else:
print("unexpected response_data.status_code {}", response_data.status_code)
for i in range(0, cnt):
response_data = func(**kwargs)
if response_data.status_code == expected_code:
return response_data
else:
print("unexpected response_data.status_code {}", response_data.status_code)
response_data.raise_for_status()
def read_data(self, path, universal_newlines=True):
logging.debug("read_data protocol:{} host:{} port:{} path: {}".format(self.protocol, self.host, self.proxy_port, path))
response = self.req_wrapper(requests.get, 307, url="{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
# print("redirected to ", url)
response_data = self.req_wrapper(requests.get, 200, url=url,
headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
location = None
if self.kerberized:
location = response.headers['Location'].replace("kerberizedhdfs1:1006", "{}:{}".format(self.host, self.data_port))
else:
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
logging.debug("redirected to {}".format(location))
response_data = self.req_wrapper(requests.get, 200, url=location, headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if universal_newlines:
return response_data.text
else:
@ -130,37 +132,36 @@ class HDFSApi(object):
named_file.write(content)
named_file.flush()
if self.kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
# print(self.kerberos_auth)
response = self.req_wrapper(requests.put, 307,
url="{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(
protocol=self.protocol, host=self.host,
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
url="{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host='localhost',
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
logging.debug("HDFS api response:{}".format(response.headers))
# additional_params = '&'.join(
# response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
if self.kerberized:
location = response.headers['Location'].replace("kerberizedhdfs1:1006", "{}:{}".format(self.host, self.data_port))
else:
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
with open(fpath, mode="rb") as fh:
file_data = fh.read()
protocol = "http" # self.protocol
response = self.req_wrapper(requests.put, 201,
url="{location}".format(
location=response.headers['Location']),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
url="{location}".format(location=location),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
# print(response)
logging.debug(response)
def write_gzip_data(self, path, content):

View File

@ -540,7 +540,7 @@ def test_concurrent_queries(started_cluster):
def test_odbc_long_column_names(started_cluster):
conn = get_postgres_conn();
conn = get_postgres_conn(started_cluster);
cursor = conn.cursor()
column_name = "column" * 8
@ -572,7 +572,7 @@ def test_odbc_long_column_names(started_cluster):
def test_odbc_long_text(started_cluster):
conn = get_postgres_conn()
conn = get_postgres_conn(started_cluster)
cursor = conn.cursor()
cursor.execute("drop table if exists clickhouse.test_long_text")
cursor.execute("create table clickhouse.test_long_text(flen int, field1 text)");

View File

@ -464,6 +464,21 @@ def test_custom_auth_headers_exclusion(started_cluster):
assert ei.value.returncode == 243
assert '403 Forbidden' in ei.value.stderr
def test_infinite_redirect(started_cluster):
bucket = "redirected"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = f"select * from s3('http://resolver:{started_cluster.minio_redirect_port}/{bucket}/{filename}', 'CSV', '{table_format}')"
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
exception_raised = False
try:
run_query(instance, get_query)
except Exception as e:
assert str(e).find("Too many redirects while trying to access") != -1
exception_raised = True
finally:
assert exception_raised
@pytest.mark.parametrize("extension,method", [
pytest.param("bin", "gzip", id="bin"),
pytest.param("gz", "auto", id="gz"),

View File

@ -1,122 +0,0 @@
import gzip
import json
import logging
import os
import io
import random
import threading
import time
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*"
}
]}
minio_client = cluster.minio_client
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
minio_client.remove_bucket(cluster.minio_restricted_bucket)
minio_client.make_bucket(cluster.minio_restricted_bucket)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__, name="redirect")
cluster.add_instance("dummy", with_minio=True, main_configs=["configs/defaultS3.xml"])
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
run_s3_mock(cluster)
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
def run_s3_mock(cluster):
logging.info("Starting s3 mock")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)
# Wait for S3 mock start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
break
logging.info("S3 mock started")
def test_infinite_redirect(started_cluster):
bucket = "redirected"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
bucket=bucket,
file=filename,
table_format=table_format)
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
exception_raised = False
try:
run_query(instance, get_query)
except Exception as e:
assert str(e).find("Too many redirects while trying to access") != -1
exception_raised = True
finally:
assert exception_raised

View File

@ -17,11 +17,12 @@ node_1 = cluster.add_instance('node_1', with_zookeeper=True)
def started_cluster():
try:
cluster.start()
node_1.query_with_retry('DROP TABLE IF EXISTS replicated')
node_1.query('''CREATE TABLE replicated (id UInt32, date Date) ENGINE =
node_1.query_with_retry('''CREATE TABLE replicated (id UInt32, date Date) ENGINE =
ReplicatedMergeTree('/clickhouse/tables/replicated', 'node_1') ORDER BY id PARTITION BY toYYYYMM(date)''')
node.query("CREATE TABLE distributed (id UInt32, date Date) ENGINE = Distributed('test_cluster', 'default', 'replicated')")
node.query_with_retry("CREATE TABLE distributed (id UInt32, date Date) ENGINE = Distributed('test_cluster', 'default', 'replicated')")
yield cluster

View File

@ -32,28 +32,28 @@ def test_mutate_and_upgrade(start_cluster):
node1.query("INSERT INTO mt VALUES ('2020-02-13', 1), ('2020-02-13', 2);")
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
node2.query("SYSTEM SYNC REPLICA mt", timeout=15)
node1.restart_with_latest_version()
node2.restart_with_latest_version()
node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
node1.query("SYSTEM SYNC REPLICA mt", timeout=15)
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
node1.query("INSERT INTO mt VALUES ('2020-02-13', 4);")
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
node2.query("SYSTEM SYNC REPLICA mt", timeout=15)
assert node1.query("SELECT COUNT() FROM mt") == "3\n"
assert node2.query("SELECT COUNT() FROM mt") == "3\n"
node2.query("ALTER TABLE mt DELETE WHERE id = 3", settings={"mutations_sync": "2"})
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
node1.query("SYSTEM SYNC REPLICA mt", timeout=15)
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
assert node2.query("SELECT COUNT() FROM mt") == "2\n"