2024-09-27 10:19:39 +00:00
import datetime
2020-05-07 19:00:17 +00:00
import random
2020-09-16 04:26:10 +00:00
import time
2020-05-07 13:50:42 +00:00
from multiprocessing . dummy import Pool
2020-09-16 04:26:10 +00:00
import pytest
2024-09-27 10:19:39 +00:00
2020-05-07 13:50:42 +00:00
from helpers . client import QueryRuntimeException
2020-09-16 04:26:10 +00:00
from helpers . cluster import ClickHouseCluster
2020-05-07 13:50:42 +00:00
2020-05-07 19:00:17 +00:00
node_options = dict (
with_zookeeper = True ,
2022-03-22 16:39:58 +00:00
main_configs = [
" configs/remote_servers.xml " ,
" configs/config.d/instant_moves.xml " ,
" configs/config.d/part_log.xml " ,
" configs/config.d/zookeeper_session_timeout.xml " ,
" configs/config.d/storage_configuration.xml " ,
] ,
tmpfs = [ " /external:size=200M " , " /internal:size=1M " ] ,
)
2020-05-07 13:50:42 +00:00
cluster = ClickHouseCluster ( __file__ )
2022-03-22 16:39:58 +00:00
node1 = cluster . add_instance ( " node1 " , macros = { " shard " : 0 , " replica " : 1 } , * * node_options )
node2 = cluster . add_instance ( " node2 " , macros = { " shard " : 0 , " replica " : 2 } , * * node_options )
node3 = cluster . add_instance ( " node3 " , macros = { " shard " : 1 , " replica " : 1 } , * * node_options )
node4 = cluster . add_instance ( " node4 " , macros = { " shard " : 1 , " replica " : 2 } , * * node_options )
2020-05-07 13:50:42 +00:00
nodes = [ node1 , node2 , node3 , node4 ]
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
yield cluster
except Exception as ex :
print ( ex )
finally :
cluster . shutdown ( )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def drop_table ( nodes , table_name ) :
for node in nodes :
2023-05-03 18:06:46 +00:00
node . query ( " DROP TABLE IF EXISTS {} SYNC " . format ( table_name ) )
2020-05-07 13:50:42 +00:00
2020-05-13 12:56:32 +00:00
2022-03-22 16:39:58 +00:00
def create_table (
nodes ,
table_name ,
with_storage_policy = False ,
with_time_column = False ,
with_ttl_move = False ,
with_ttl_delete = False ,
) :
2020-05-07 19:00:17 +00:00
extra_columns = " "
settings = [ ]
2020-05-07 13:50:42 +00:00
for node in nodes :
sql = """
CREATE TABLE { table_name }
2020-05-07 19:00:17 +00:00
(
num UInt32 ,
num2 UInt32 DEFAULT num + 1 { extra_columns }
)
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/test/ {table_name} ' , ' {replica} ' )
ORDER BY num PARTITION BY num % 100
"""
if with_ttl_move :
sql + = """
2020-05-08 11:35:50 +00:00
TTL time + INTERVAL ( num2 % 1 ) SECOND TO DISK ' external '
2020-05-07 19:00:17 +00:00
"""
if with_ttl_delete :
sql + = """
2020-05-08 11:35:50 +00:00
TTL time + INTERVAL ( num2 % 1 ) SECOND DELETE
2020-05-07 19:00:17 +00:00
"""
settings . append ( " merge_with_ttl_timeout = 1 " )
if with_storage_policy :
settings . append ( " storage_policy= ' default_with_external ' " )
if settings :
sql + = """
SETTINGS { }
2022-03-22 16:39:58 +00:00
""" .format(
" , " . join ( settings )
)
2020-05-07 19:00:17 +00:00
if with_time_column :
extra_columns = """ ,
time DateTime
"""
2022-03-22 16:39:58 +00:00
node . query (
sql . format (
table_name = table_name , replica = node . name , extra_columns = extra_columns
)
)
2020-05-07 13:50:42 +00:00
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def create_distributed_table ( node , table_name ) :
2020-09-16 04:26:10 +00:00
sql = """
2020-05-07 13:50:42 +00:00
CREATE TABLE % ( table_name ) s_replicated ON CLUSTER test_cluster
(
num UInt32 ,
num2 UInt32 DEFAULT num + 1
)
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/test/ {shard} / %(table_name)s _replicated ' , ' {replica} ' )
ORDER BY num PARTITION BY num % % 100 ;
2022-03-22 16:39:58 +00:00
""" % d ict(
table_name = table_name
)
2020-09-16 04:26:10 +00:00
node . query ( sql )
sql = """
2020-05-07 13:50:42 +00:00
CREATE TABLE % ( table_name ) s ON CLUSTER test_cluster AS % ( table_name ) s_replicated
ENGINE = Distributed ( test_cluster , default , % ( table_name ) s_replicated , rand ( ) )
2022-03-22 16:39:58 +00:00
""" % d ict(
table_name = table_name
)
2020-09-16 04:26:10 +00:00
node . query ( sql )
2020-05-07 13:50:42 +00:00
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def drop_distributed_table ( node , table_name ) :
2022-03-22 16:39:58 +00:00
node . query (
" DROP TABLE IF EXISTS {} ON CLUSTER test_cluster SYNC " . format ( table_name )
)
node . query (
" DROP TABLE IF EXISTS {} _replicated ON CLUSTER test_cluster SYNC " . format (
table_name
)
)
2020-05-07 13:50:42 +00:00
time . sleep ( 1 )
2020-05-13 12:56:32 +00:00
2022-03-22 16:39:58 +00:00
def insert (
node ,
table_name ,
chunk = 1000 ,
col_names = None ,
iterations = 1 ,
ignore_exception = False ,
slow = False ,
with_many_parts = False ,
offset = 0 ,
with_time_column = False ,
) :
2020-05-07 13:50:42 +00:00
if col_names is None :
2022-03-22 16:39:58 +00:00
col_names = [ " num " , " num2 " ]
2020-05-07 13:50:42 +00:00
for i in range ( iterations ) :
try :
query = [ " SET max_partitions_per_insert_block = 10000000 " ]
if with_many_parts :
2021-05-19 10:47:24 +00:00
query . append ( " SET max_insert_block_size = 256 " )
2020-05-07 19:00:17 +00:00
if with_time_column :
query . append (
2022-03-22 16:39:58 +00:00
" INSERT INTO {table_name} ( {col0} , {col1} , time) SELECT number AS {col0} , number + 1 AS {col1} , now() + 10 AS time FROM numbers_mt( {chunk} ) " . format (
table_name = table_name ,
chunk = chunk ,
col0 = col_names [ 0 ] ,
col1 = col_names [ 1 ] ,
)
)
2020-05-07 19:00:17 +00:00
elif slow :
2020-05-07 13:50:42 +00:00
query . append (
2023-07-04 19:19:30 +00:00
" INSERT INTO {table_name} ( {col0} , {col1} ) SELECT number + sleepEachRow(0.001) AS {col0} , number + 1 AS {col1} FROM numbers_mt( {chunk} ) SETTINGS function_sleep_max_microseconds_per_block = 0 " . format (
2022-03-22 16:39:58 +00:00
table_name = table_name ,
chunk = chunk ,
col0 = col_names [ 0 ] ,
col1 = col_names [ 1 ] ,
)
)
2020-05-07 13:50:42 +00:00
else :
query . append (
2022-03-22 16:39:58 +00:00
" INSERT INTO {table_name} ( {col0} , {col1} ) SELECT number + {offset} AS {col0} , number + 1 + {offset} AS {col1} FROM numbers_mt( {chunk} ) " . format (
table_name = table_name ,
chunk = chunk ,
col0 = col_names [ 0 ] ,
col1 = col_names [ 1 ] ,
offset = str ( offset ) ,
)
)
2020-05-07 13:50:42 +00:00
node . query ( " ; \n " . join ( query ) )
except QueryRuntimeException as ex :
if not ignore_exception :
raise
2020-05-13 12:56:32 +00:00
2022-03-22 16:39:58 +00:00
def select (
node ,
table_name ,
col_name = " num " ,
expected_result = None ,
iterations = 1 ,
ignore_exception = False ,
slow = False ,
poll = None ,
) :
2020-05-07 13:50:42 +00:00
for i in range ( iterations ) :
start_time = time . time ( )
while True :
try :
if slow :
2020-09-16 04:26:10 +00:00
r = node . query (
2023-07-04 19:19:30 +00:00
" SELECT count() FROM (SELECT num2, sleepEachRow(0.5) FROM {} WHERE {} % 1000 > 0) SETTINGS function_sleep_max_microseconds_per_block = 0 " . format (
2022-03-22 16:39:58 +00:00
table_name , col_name
)
)
2020-05-07 13:50:42 +00:00
else :
2022-03-22 16:39:58 +00:00
r = node . query (
" SELECT count() FROM {} WHERE {} % 1000 > 0 " . format (
table_name , col_name
)
)
2020-05-07 13:50:42 +00:00
if expected_result :
2022-03-22 16:39:58 +00:00
if (
r != expected_result
and poll
and time . time ( ) - start_time < poll
) :
2020-05-07 13:50:42 +00:00
continue
assert r == expected_result
except QueryRuntimeException as ex :
if not ignore_exception :
raise
break
2020-05-13 12:56:32 +00:00
2022-03-22 16:39:58 +00:00
def rename_column (
node , table_name , name , new_name , iterations = 1 , ignore_exception = False
) :
2023-05-16 11:05:36 +00:00
i = 0
while True :
i + = 1
2020-05-07 13:50:42 +00:00
try :
2022-03-22 16:39:58 +00:00
node . query (
" ALTER TABLE {table_name} RENAME COLUMN {name} to {new_name} " . format (
table_name = table_name , name = name , new_name = new_name
)
)
2020-05-07 13:50:42 +00:00
except QueryRuntimeException as ex :
2023-05-16 11:15:15 +00:00
if " Coordination::Exception " in str ( ex ) :
2023-05-16 11:05:36 +00:00
continue
2020-05-07 13:50:42 +00:00
if not ignore_exception :
raise
2023-05-16 11:05:36 +00:00
if i > = iterations :
break
2020-05-13 12:56:32 +00:00
2022-03-22 16:39:58 +00:00
def rename_column_on_cluster (
node , table_name , name , new_name , iterations = 1 , ignore_exception = False
) :
2023-05-16 11:05:36 +00:00
i = 0
while True :
i + = 1
2020-05-07 13:50:42 +00:00
try :
2022-03-22 16:39:58 +00:00
node . query (
" ALTER TABLE {table_name} ON CLUSTER test_cluster RENAME COLUMN {name} to {new_name} " . format (
table_name = table_name , name = name , new_name = new_name
)
)
2020-05-07 13:50:42 +00:00
except QueryRuntimeException as ex :
2023-05-16 11:15:15 +00:00
if " Coordination::Exception " in str ( ex ) :
2023-05-16 11:05:36 +00:00
continue
2020-05-07 13:50:42 +00:00
if not ignore_exception :
raise
2023-05-16 11:05:36 +00:00
if i > = iterations :
break
2020-05-07 19:00:17 +00:00
def alter_move ( node , table_name , iterations = 1 , ignore_exception = False ) :
2023-05-16 11:05:36 +00:00
i = 0
while True :
i + = 1
2020-05-07 19:00:17 +00:00
move_part = random . randint ( 0 , 99 )
2022-03-22 16:39:58 +00:00
move_volume = " external "
2020-05-07 19:00:17 +00:00
try :
2022-03-22 16:39:58 +00:00
node . query (
" ALTER TABLE {table_name} MOVE PARTITION ' {move_part} ' TO VOLUME ' {move_volume} ' " . format (
table_name = table_name , move_part = move_part , move_volume = move_volume
)
)
2020-05-07 19:00:17 +00:00
except QueryRuntimeException as ex :
2023-05-16 11:15:15 +00:00
if " Coordination::Exception " in str ( ex ) :
2023-05-16 11:05:36 +00:00
continue
2020-05-07 19:00:17 +00:00
if not ignore_exception :
raise
2023-05-16 11:05:36 +00:00
if i > = iterations :
break
2020-05-07 13:50:42 +00:00
def test_rename_parallel_same_node ( started_cluster ) :
table_name = " test_rename_parallel_same_node "
drop_table ( nodes , table_name )
try :
create_table ( nodes , table_name )
insert ( node1 , table_name , 1000 )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 1 ) :
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " foo2 " , " foo3 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " foo3 " , " num2 " , 5 , True )
)
)
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
# rename column back to original
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
# check that select still works
select ( node1 , table_name , " num2 " , " 999 \n " )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_parallel ( started_cluster ) :
table_name = " test_rename_parallel "
drop_table ( nodes , table_name )
try :
create_table ( nodes , table_name )
insert ( node1 , table_name , 1000 )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 1 ) :
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node3 , table_name , " foo3 " , " num2 " , 5 , True )
)
)
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
# rename column back to original
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
# check that select still works
select ( node1 , table_name , " num2 " , " 999 \n " )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_with_parallel_select ( started_cluster ) :
table_name = " test_rename_with_parallel_select "
drop_table ( nodes , table_name )
try :
create_table ( nodes , table_name )
insert ( node1 , table_name , 1000 )
2020-05-11 18:09:45 +00:00
select ( node1 , table_name , " num2 " , " 999 \n " , poll = 30 )
select ( node2 , table_name , " num2 " , " 999 \n " , poll = 30 )
select ( node3 , table_name , " num2 " , " 999 \n " , poll = 30 )
2020-05-07 13:50:42 +00:00
p = Pool ( 15 )
tasks = [ ]
for i in range ( 1 ) :
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node3 , table_name , " foo3 " , " num2 " , 5 , True )
)
)
tasks . append (
p . apply_async ( select , ( node1 , table_name , " foo3 " , " 999 \n " , 5 , True ) )
)
tasks . append (
p . apply_async ( select , ( node2 , table_name , " num2 " , " 999 \n " , 5 , True ) )
)
tasks . append (
p . apply_async ( select , ( node3 , table_name , " foo2 " , " 999 \n " , 5 , True ) )
)
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
# rename column back to original name
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
# check that select still works
select ( node1 , table_name , " num2 " , " 999 \n " )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_with_parallel_insert ( started_cluster ) :
table_name = " test_rename_with_parallel_insert "
drop_table ( nodes , table_name )
try :
create_table ( nodes , table_name )
insert ( node1 , table_name , 1000 )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 1 ) :
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 5 , True )
)
)
tasks . append (
p . apply_async (
rename_column , ( node3 , table_name , " foo3 " , " num2 " , 5 , True )
)
)
tasks . append (
p . apply_async (
insert , ( node1 , table_name , 100 , [ " num " , " foo3 " ] , 5 , True )
)
)
tasks . append (
p . apply_async (
insert , ( node2 , table_name , 100 , [ " num " , " num2 " ] , 5 , True )
)
)
tasks . append (
p . apply_async (
insert , ( node3 , table_name , 100 , [ " num " , " foo2 " ] , 5 , True )
)
)
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
# rename column back to original
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
# check that select still works
select ( node1 , table_name , " num2 " )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_with_parallel_merges ( started_cluster ) :
table_name = " test_rename_with_parallel_merges "
drop_table ( nodes , table_name )
try :
2021-05-19 10:47:24 +00:00
print ( " Creating tables " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
create_table ( nodes , table_name )
2021-05-19 10:47:24 +00:00
for i in range ( 5 ) :
2022-03-22 16:39:58 +00:00
insert (
node1 ,
table_name ,
100 ,
[ " num " , " num2 " ] ,
1 ,
False ,
False ,
True ,
offset = i * 100 ,
)
2020-05-07 13:50:42 +00:00
2021-05-19 10:47:24 +00:00
print ( " Data inserted " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
def merge_parts ( node , table_name , iterations = 1 ) :
2021-05-17 08:43:15 +00:00
for _ in range ( iterations ) :
try :
node . query ( " OPTIMIZE TABLE %s FINAL " % table_name )
except Exception as ex :
print ( " Got an exception while optimizing table " , ex )
2020-05-07 13:50:42 +00:00
2021-05-19 10:47:24 +00:00
print ( " Creating pool " )
p = Pool ( 15 )
2020-05-07 13:50:42 +00:00
tasks = [ ]
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async ( rename_column , ( node1 , table_name , " num2 " , " foo2 " , 2 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 2 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node3 , table_name , " foo3 " , " num2 " , 2 , True ) )
)
2021-05-19 10:47:24 +00:00
tasks . append ( p . apply_async ( merge_parts , ( node1 , table_name , 2 ) ) )
tasks . append ( p . apply_async ( merge_parts , ( node2 , table_name , 2 ) ) )
tasks . append ( p . apply_async ( merge_parts , ( node3 , table_name , 2 ) ) )
print ( " Waiting for tasks " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
2021-05-19 10:47:24 +00:00
print ( " Finished waiting " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
2021-05-19 10:47:24 +00:00
print ( " Renaming columns " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
# rename column back to the original name
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
2021-05-19 10:47:24 +00:00
print ( " Finished renaming " , datetime . datetime . now ( ) )
2020-05-07 13:50:42 +00:00
# check that select still works
2021-05-19 10:47:24 +00:00
select ( node1 , table_name , " num2 " , " 500 \n " )
select ( node2 , table_name , " num2 " , " 500 \n " )
select ( node3 , table_name , " num2 " , " 500 \n " )
2020-05-07 13:50:42 +00:00
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_with_parallel_slow_insert ( started_cluster ) :
table_name = " test_rename_with_parallel_slow_insert "
drop_table ( nodes , table_name )
try :
create_table ( nodes , table_name )
insert ( node1 , table_name , 1000 )
p = Pool ( 15 )
tasks = [ ]
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
insert , ( node1 , table_name , 10000 , [ " num " , " num2 " ] , 1 , False , True )
)
)
tasks . append (
p . apply_async (
insert , ( node1 , table_name , 10000 , [ " num " , " num2 " ] , 1 , True , True )
)
) # deduplicated
2020-05-07 13:50:42 +00:00
time . sleep ( 0.5 )
tasks . append ( p . apply_async ( rename_column , ( node1 , table_name , " num2 " , " foo2 " ) ) )
for task in tasks :
task . get ( timeout = 240 )
insert ( node1 , table_name , 100 , [ " num " , " foo2 " ] )
# rename column back to original
rename_column ( node1 , table_name , " foo2 " , " num2 " )
# check that select still works
select ( node1 , table_name , " num2 " , " 11089 \n " )
select ( node2 , table_name , " num2 " , " 11089 \n " , poll = 30 )
select ( node3 , table_name , " num2 " , " 11089 \n " , poll = 30 )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 19:00:17 +00:00
def test_rename_with_parallel_ttl_move ( started_cluster ) :
2022-03-22 16:39:58 +00:00
table_name = " test_rename_with_parallel_ttl_move "
2020-05-07 19:00:17 +00:00
try :
2022-03-22 16:39:58 +00:00
create_table (
nodes ,
table_name ,
with_storage_policy = True ,
with_time_column = True ,
with_ttl_move = True ,
)
2020-05-07 19:00:17 +00:00
rename_column ( node1 , table_name , " time " , " time2 " , 1 , False )
rename_column ( node1 , table_name , " time2 " , " time " , 1 , False )
p = Pool ( 15 )
tasks = [ ]
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
insert ,
(
node1 ,
table_name ,
10000 ,
[ " num " , " num2 " ] ,
1 ,
False ,
False ,
True ,
0 ,
True ,
) ,
)
)
2020-05-07 19:00:17 +00:00
time . sleep ( 5 )
2020-05-08 11:35:50 +00:00
rename_column ( node1 , table_name , " time " , " time2 " , 1 , False )
time . sleep ( 4 )
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async ( rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 5 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node3 , table_name , " num3 " , " num2 " , 5 , True ) )
)
2020-05-07 19:00:17 +00:00
for task in tasks :
task . get ( timeout = 240 )
2020-05-08 11:35:50 +00:00
# check some parts got moved
2022-03-22 16:39:58 +00:00
assert " external " in set (
node1 . query (
" SELECT disk_name FROM system.parts WHERE table == ' {} ' AND active=1 ORDER BY modification_time " . format (
table_name
)
)
. strip ( )
. splitlines ( )
)
2020-05-08 11:35:50 +00:00
2020-05-07 19:00:17 +00:00
# rename column back to original
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
# check that select still works
select ( node1 , table_name , " num2 " , " 9990 \n " )
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 19:00:17 +00:00
def test_rename_with_parallel_ttl_delete ( started_cluster ) :
2022-03-22 16:39:58 +00:00
table_name = " test_rename_with_parallel_ttl_delete "
2020-05-07 19:00:17 +00:00
try :
create_table ( nodes , table_name , with_time_column = True , with_ttl_delete = True )
rename_column ( node1 , table_name , " time " , " time2 " , 1 , False )
rename_column ( node1 , table_name , " time2 " , " time " , 1 , False )
def merge_parts ( node , table_name , iterations = 1 ) :
for i in range ( iterations ) :
node . query ( " OPTIMIZE TABLE {} " . format ( table_name ) )
p = Pool ( 15 )
tasks = [ ]
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async (
insert ,
(
node1 ,
table_name ,
10000 ,
[ " num " , " num2 " ] ,
1 ,
False ,
False ,
True ,
0 ,
True ,
) ,
)
)
2020-05-07 19:00:17 +00:00
time . sleep ( 15 )
2022-03-22 16:39:58 +00:00
tasks . append (
p . apply_async ( rename_column , ( node1 , table_name , " num2 " , " foo2 " , 5 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node2 , table_name , " foo2 " , " foo3 " , 5 , True ) )
)
tasks . append (
p . apply_async ( rename_column , ( node3 , table_name , " num3 " , " num2 " , 5 , True ) )
)
2021-09-01 07:31:45 +00:00
tasks . append ( p . apply_async ( merge_parts , ( node1 , table_name , 3 ) ) )
tasks . append ( p . apply_async ( merge_parts , ( node2 , table_name , 3 ) ) )
tasks . append ( p . apply_async ( merge_parts , ( node3 , table_name , 3 ) ) )
2020-05-07 19:00:17 +00:00
for task in tasks :
task . get ( timeout = 240 )
# rename column back to original
rename_column ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
rename_column ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
2022-03-22 16:39:58 +00:00
assert (
int ( node1 . query ( " SELECT count() FROM {} " . format ( table_name ) ) . strip ( ) )
< 10000
)
2020-05-07 19:00:17 +00:00
finally :
drop_table ( nodes , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_distributed ( started_cluster ) :
2022-03-22 16:39:58 +00:00
table_name = " test_rename_distributed "
2020-05-07 13:50:42 +00:00
try :
create_distributed_table ( node1 , table_name )
insert ( node1 , table_name , 1000 )
2022-03-22 16:39:58 +00:00
rename_column_on_cluster ( node1 , table_name , " num2 " , " foo2 " )
rename_column_on_cluster ( node1 , " %s _replicated " % table_name , " num2 " , " foo2 " )
2020-05-07 13:50:42 +00:00
2022-03-22 16:39:58 +00:00
insert ( node1 , table_name , 1000 , col_names = [ " num " , " foo2 " ] )
2020-05-07 13:50:42 +00:00
2022-03-22 16:39:58 +00:00
select ( node1 , table_name , " foo2 " , " 1998 \n " , poll = 30 )
2020-05-07 13:50:42 +00:00
finally :
drop_distributed_table ( node1 , table_name )
2020-05-13 12:56:32 +00:00
2020-05-07 13:50:42 +00:00
def test_rename_distributed_parallel_insert_and_select ( started_cluster ) :
2022-03-22 16:39:58 +00:00
table_name = " test_rename_distributed_parallel_insert_and_select "
2020-05-07 13:50:42 +00:00
try :
create_distributed_table ( node1 , table_name )
insert ( node1 , table_name , 1000 )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 1 ) :
2020-09-16 04:26:10 +00:00
tasks . append (
2022-03-22 16:39:58 +00:00
p . apply_async (
rename_column_on_cluster ,
( node1 , table_name , " num2 " , " foo2 " , 3 , True ) ,
)
)
tasks . append (
p . apply_async (
rename_column_on_cluster ,
( node1 , " %s _replicated " % table_name , " num2 " , " foo2 " , 3 , True ) ,
)
)
2020-09-16 04:26:10 +00:00
tasks . append (
2022-03-22 16:39:58 +00:00
p . apply_async (
rename_column_on_cluster ,
( node1 , table_name , " foo2 " , " foo3 " , 3 , True ) ,
)
)
2020-09-16 04:26:10 +00:00
tasks . append (
2022-03-22 16:39:58 +00:00
p . apply_async (
rename_column_on_cluster ,
( node1 , " %s _replicated " % table_name , " foo2 " , " foo3 " , 3 , True ) ,
)
)
tasks . append (
p . apply_async (
rename_column_on_cluster ,
( node1 , table_name , " foo3 " , " num2 " , 3 , True ) ,
)
)
tasks . append (
p . apply_async (
rename_column_on_cluster ,
( node1 , " %s _replicated " % table_name , " foo3 " , " num2 " , 3 , True ) ,
)
)
tasks . append (
p . apply_async ( insert , ( node1 , table_name , 10 , [ " num " , " foo3 " ] , 5 , True ) )
)
tasks . append (
p . apply_async ( insert , ( node2 , table_name , 10 , [ " num " , " num2 " ] , 5 , True ) )
)
tasks . append (
p . apply_async ( insert , ( node3 , table_name , 10 , [ " num " , " foo2 " ] , 5 , True ) )
)
tasks . append (
p . apply_async ( select , ( node1 , table_name , " foo2 " , None , 5 , True ) )
)
tasks . append (
p . apply_async ( select , ( node2 , table_name , " foo3 " , None , 5 , True ) )
)
tasks . append (
p . apply_async ( select , ( node3 , table_name , " num2 " , None , 5 , True ) )
)
2020-05-07 13:50:42 +00:00
for task in tasks :
task . get ( timeout = 240 )
2022-03-22 16:39:58 +00:00
rename_column_on_cluster ( node1 , table_name , " foo2 " , " num2 " , 1 , True )
rename_column_on_cluster (
node1 , " %s _replicated " % table_name , " foo2 " , " num2 " , 1 , True
)
rename_column_on_cluster ( node1 , table_name , " foo3 " , " num2 " , 1 , True )
rename_column_on_cluster (
node1 , " %s _replicated " % table_name , " foo3 " , " num2 " , 1 , True
)
2020-05-07 13:50:42 +00:00
2022-03-22 16:39:58 +00:00
insert ( node1 , table_name , 1000 , col_names = [ " num " , " num2 " ] )
2020-05-07 13:50:42 +00:00
select ( node1 , table_name , " num2 " )
select ( node2 , table_name , " num2 " )
select ( node3 , table_name , " num2 " )
select ( node4 , table_name , " num2 " )
finally :
drop_distributed_table ( node1 , table_name )