This commit is contained in:
Yatsishin Ilya 2021-04-08 17:43:57 +03:00
parent 61fe636c36
commit 2cc9d314be
3 changed files with 52 additions and 37 deletions

View File

@ -344,12 +344,8 @@ class ClickHouseCluster:
def setup_postgres_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')])
env_variables['POSTGRES_HOST'] = self.postgres_host
env_variables['POSTGRES_PORT'] = str(self.postgres_port)
env_variables['POSTGRES_DIR'] = self.postgres_logs_dir
env_variables['POSTGRES2_DIR'] = self.postgres2_logs_dir
env_variables['POSTGRES3_DIR'] = self.postgres3_logs_dir
env_variables['POSTGRES4_DIR'] = self.postgres4_logs_dir
env_variables['POSTGRES_LOGS_FS'] = "bind"
self.with_postgres = True
@ -357,6 +353,17 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')]
return self.base_postgres_cmd
def setup_postgres_cluster_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_postgres_cluster = True
env_variables['POSTGRES_PORT'] = str(self.postgres_port)
env_variables['POSTGRES2_DIR'] = self.postgres2_logs_dir
env_variables['POSTGRES3_DIR'] = self.postgres3_logs_dir
env_variables['POSTGRES4_DIR'] = self.postgres4_logs_dir
env_variables['POSTGRES_LOGS_FS'] = "bind"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')])
self.base_postgres_cluster_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres_cluster.yml')]
def setup_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_hdfs = True
env_variables['HDFS_HOST'] = self.hdfs_host
@ -559,11 +566,7 @@ class ClickHouseCluster:
cmds.append(self.setup_postgres_cmd(instance, env_variables, docker_compose_yml_dir))
if with_postgres_cluster and not self.with_postgres_cluster:
self.with_postgres_cluster = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')])
self.base_postgres_cluster_cmd = ['docker-compose', '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres_cluster.yml')]
cmds.append(self.base_postgres_cluster_cmd)
cmds.append(self.setup_postgres_cluster_cmd(instance, env_variables, docker_compose_yml_dir))
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
@ -760,16 +763,30 @@ class ClickHouseCluster:
def wait_postgres_to_start(self, timeout=60):
self.postgres_ip = self.get_instance_ip(self.postgres_host)
start = time.time()
while time.time() - start < timeout:
try:
conn = psycopg2.connect(host=self.postgres_ip, port=self.postgres_port, user='postgres', password='mysecretpassword')
conn.close()
logging.debug("Postgres Started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres " + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
def wait_postgres_cluster_to_start(self, timeout=60):
self.postgres2_ip = self.get_instance_ip(self.postgres2_host)
self.postgres3_ip = self.get_instance_ip(self.postgres3_host)
self.postgres4_ip = self.get_instance_ip(self.postgres4_host)
start = time.time()
for ip in [self.postgres_ip, self.postgres2_ip, self.postgres3_ip, self.postgres4_ip]:
for ip in [self.postgres2_ip, self.postgres3_ip, self.postgres4_ip]:
while time.time() - start < timeout:
try:
conn = psycopg2.connect(host=ip, port=self.postgres_port, user='postgres', password='mysecretpassword')
conn.close()
logging.debug("Postgres Started")
logging.debug("Postgres Cluster Started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres " + str(ex))
@ -1013,22 +1030,20 @@ class ClickHouseCluster:
shutil.rmtree(self.postgres_dir)
os.makedirs(self.postgres_logs_dir)
os.chmod(self.postgres_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres2_logs_dir)
os.chmod(self.postgres2_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres3_logs_dir)
os.chmod(self.postgres3_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres4_logs_dir)
os.chmod(self.postgres4_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cmd + common_opts)
self.wait_postgres_to_start(120)
if self.with_postgres_cluster and self.base_postgres_cluster_cmd:
print('Setup Postgres')
os.makedirs(self.postgres2_logs_dir)
os.chmod(self.postgres2_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres3_logs_dir)
os.chmod(self.postgres3_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres4_logs_dir)
os.chmod(self.postgres4_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cluster_cmd + common_opts)
self.wait_postgres_to_start(120, port=5421)
self.wait_postgres_to_start(120, port=5441)
self.wait_postgres_to_start(120, port=5461)
self.wait_postgres_cluster_to_start(120)
if self.with_kafka and self.base_kafka_cmd:
logging.debug('Setup Kafka')

View File

@ -57,7 +57,7 @@ def started_cluster():
def test_postgres_select_insert(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
table_name = 'test_many'
table = f'''postgresql('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'clickhouse', '{table_name}', 'postgres', 'mysecretpassword')'''
@ -77,7 +77,7 @@ def test_postgres_select_insert(started_cluster):
def test_postgres_conversions(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute(
'''CREATE TABLE IF NOT EXISTS test_types (
@ -158,7 +158,7 @@ def test_postgres_conversions(started_cluster):
def test_non_default_scema(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
cursor.execute('CREATE SCHEMA test_schema')
cursor.execute('CREATE TABLE test_schema.test_table (a integer)')
@ -190,7 +190,7 @@ def test_non_default_scema(started_cluster):
def test_concurrent_queries(started_cluster):
conn = get_postgres_conn(started_cluster, True)
conn = get_postgres_conn(started_cluster, started_cluster.postgres_ip, True)
cursor = conn.cursor()
node1.query('''