2019-10-23 11:25:51 +00:00
import json
2019-07-15 09:36:02 +00:00
import pytest
2019-08-20 18:00:48 +00:00
import random
2019-10-23 11:25:51 +00:00
import re
2019-08-20 18:00:48 +00:00
import string
2019-12-09 16:20:56 +00:00
import threading
2019-10-23 11:25:51 +00:00
import time
2019-08-21 12:32:48 +00:00
from multiprocessing . dummy import Pool
from helpers . client import QueryRuntimeException
2019-07-15 09:36:02 +00:00
from helpers . cluster import ClickHouseCluster
2019-09-09 13:50:19 +00:00
from helpers . test_tools import TSV
2019-07-15 09:36:02 +00:00
cluster = ClickHouseCluster ( __file__ )
node1 = cluster . add_instance ( ' node1 ' ,
config_dir = ' configs ' ,
2019-08-15 09:43:31 +00:00
main_configs = [ ' configs/logs_config.xml ' ] ,
2019-07-15 09:36:02 +00:00
with_zookeeper = True ,
2019-12-09 16:20:56 +00:00
stay_alive = True ,
2019-07-15 09:36:02 +00:00
tmpfs = [ ' /jbod1:size=40M ' , ' /jbod2:size=40M ' , ' /external:size=200M ' ] ,
macros = { " shard " : 0 , " replica " : 1 } )
node2 = cluster . add_instance ( ' node2 ' ,
config_dir = ' configs ' ,
2019-08-15 09:43:31 +00:00
main_configs = [ ' configs/logs_config.xml ' ] ,
2019-07-15 09:36:02 +00:00
with_zookeeper = True ,
2019-12-09 16:20:56 +00:00
stay_alive = True ,
2019-07-15 09:36:02 +00:00
tmpfs = [ ' /jbod1:size=40M ' , ' /jbod2:size=40M ' , ' /external:size=200M ' ] ,
macros = { " shard " : 0 , " replica " : 2 } )
@pytest.fixture ( scope = " module " )
2019-08-14 10:30:36 +00:00
def start_cluster ( ) :
2019-07-15 09:36:02 +00:00
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2019-07-25 18:21:01 +00:00
2019-09-09 13:50:19 +00:00
def test_system_tables ( start_cluster ) :
expected_disks_data = [
{
" name " : " default " ,
2019-09-11 17:17:10 +00:00
" path " : " /var/lib/clickhouse/ " ,
2019-09-09 13:50:19 +00:00
" keep_free_space " : ' 1024 ' ,
} ,
{
" name " : " jbod1 " ,
" path " : " /jbod1/ " ,
" keep_free_space " : ' 0 ' ,
} ,
{
" name " : " jbod2 " ,
" path " : " /jbod2/ " ,
" keep_free_space " : ' 10485760 ' ,
} ,
{
" name " : " external " ,
" path " : " /external/ " ,
" keep_free_space " : ' 0 ' ,
}
]
click_disk_data = json . loads ( node1 . query ( " SELECT name, path, keep_free_space FROM system.disks FORMAT JSON " ) ) [ " data " ]
assert sorted ( click_disk_data , key = lambda x : x [ " name " ] ) == sorted ( expected_disks_data , key = lambda x : x [ " name " ] )
expected_policies_data = [
{
" policy_name " : " small_jbod_with_external " ,
" volume_name " : " main " ,
" volume_priority " : " 1 " ,
" disks " : [ " jbod1 " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " small_jbod_with_external " ,
" volume_name " : " external " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
2020-01-09 13:52:37 +00:00
{
" policy_name " : " one_more_small_jbod_with_external " ,
" volume_name " : " m " ,
" volume_priority " : " 1 " ,
" disks " : [ " jbod1 " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " one_more_small_jbod_with_external " ,
" volume_name " : " e " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
2019-09-09 13:50:19 +00:00
{
" policy_name " : " jbods_with_external " ,
" volume_name " : " main " ,
" volume_priority " : " 1 " ,
" disks " : [ " jbod1 " , " jbod2 " ] ,
" max_data_part_size " : " 10485760 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " jbods_with_external " ,
" volume_name " : " external " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " moving_jbod_with_external " ,
" volume_name " : " main " ,
" volume_priority " : " 1 " ,
" disks " : [ " jbod1 " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.7 ,
} ,
{
" policy_name " : " moving_jbod_with_external " ,
" volume_name " : " external " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.7 ,
} ,
{
" policy_name " : " default_disk_with_external " ,
" volume_name " : " small " ,
" volume_priority " : " 1 " ,
" disks " : [ " default " ] ,
" max_data_part_size " : " 2097152 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " default_disk_with_external " ,
" volume_name " : " big " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 20971520 " ,
" move_factor " : 0.1 ,
} ,
2019-10-24 05:58:06 +00:00
{
" policy_name " : " special_warning_policy " ,
" volume_name " : " special_warning_zero_volume " ,
" volume_priority " : " 1 " ,
" disks " : [ " default " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " special_warning_policy " ,
" volume_name " : " special_warning_default_volume " ,
" volume_priority " : " 2 " ,
" disks " : [ " external " ] ,
" max_data_part_size " : " 0 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " special_warning_policy " ,
" volume_name " : " special_warning_small_volume " ,
" volume_priority " : " 3 " ,
" disks " : [ " jbod1 " ] ,
" max_data_part_size " : " 1024 " ,
" move_factor " : 0.1 ,
} ,
{
" policy_name " : " special_warning_policy " ,
" volume_name " : " special_warning_big_volume " ,
" volume_priority " : " 4 " ,
" disks " : [ " jbod2 " ] ,
" max_data_part_size " : " 1024000000 " ,
" move_factor " : 0.1 ,
} ,
2019-09-09 13:50:19 +00:00
]
clickhouse_policies_data = json . loads ( node1 . query ( " SELECT * FROM system.storage_policies WHERE policy_name != ' default ' FORMAT JSON " ) ) [ " data " ]
def key ( x ) :
return ( x [ " policy_name " ] , x [ " volume_name " ] , x [ " volume_priority " ] )
assert sorted ( clickhouse_policies_data , key = key ) == sorted ( expected_policies_data , key = key )
2019-09-09 12:41:46 +00:00
def test_query_parser ( start_cluster ) :
2019-09-09 13:50:19 +00:00
try :
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( """
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree ( )
ORDER BY d
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' very_exciting_policy '
2019-09-09 13:50:19 +00:00
""" )
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( """
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree ( )
ORDER BY d
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbod1 '
2019-09-09 13:50:19 +00:00
""" )
2019-09-09 12:41:46 +00:00
node1 . query ( """
2019-09-09 13:50:19 +00:00
CREATE TABLE table_with_normal_policy (
d UInt64
) ENGINE = MergeTree ( )
ORDER BY d
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' default '
2019-09-09 12:41:46 +00:00
""" )
2019-09-09 13:50:19 +00:00
node1 . query ( " INSERT INTO table_with_normal_policy VALUES (5) " )
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE table_with_normal_policy MOVE PARTITION ' all ' TO VOLUME ' some_volume ' " )
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE table_with_normal_policy MOVE PARTITION ' all ' TO DISK ' some_volume ' " )
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE table_with_normal_policy MOVE PART ' xxxxx ' TO DISK ' jbod1 ' " )
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE table_with_normal_policy MOVE PARTITION ' yyyy ' TO DISK ' jbod1 ' " )
2019-09-09 12:41:46 +00:00
2019-09-09 13:50:19 +00:00
with pytest . raises ( QueryRuntimeException ) :
2019-09-20 20:35:50 +00:00
node1 . query ( " ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy= ' moving_jbod_with_external ' " )
2019-09-09 13:50:19 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS table_with_normal_policy " )
2019-09-09 12:41:46 +00:00
2020-01-09 13:52:37 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " test_alter_policy " , " MergeTree() " ) ,
( " replicated_test_alter_policy " , " ReplicatedMergeTree( ' /clickhouse/test_alter_policy ' , ' 1 ' ) " , ) ,
] )
def test_alter_policy ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
d UInt64
) ENGINE = { engine }
ORDER BY d
SETTINGS storage_policy = ' small_jbod_with_external '
""" .format(name=name, engine=engine))
assert node1 . query ( """ SELECT storage_policy FROM system.tables WHERE name = ' {name} ' """ . format ( name = name ) ) == " small_jbod_with_external \n "
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( """ ALTER TABLE {name} MODIFY SETTING storage_policy= ' one_more_small_jbod_with_external ' """ . format ( name = name ) )
assert node1 . query ( """ SELECT storage_policy FROM system.tables WHERE name = ' {name} ' """ . format ( name = name ) ) == " small_jbod_with_external \n "
node1 . query ( """ ALTER TABLE {name} MODIFY SETTING storage_policy= ' jbods_with_external ' """ . format ( name = name ) )
assert node1 . query ( """ SELECT storage_policy FROM system.tables WHERE name = ' {name} ' """ . format ( name = name ) ) == " jbods_with_external \n "
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( """ ALTER TABLE {name} MODIFY SETTING storage_policy= ' small_jbod_with_external ' """ . format ( name = name ) )
assert node1 . query ( """ SELECT storage_policy FROM system.tables WHERE name = ' {name} ' """ . format ( name = name ) ) == " jbods_with_external \n "
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-08-20 18:00:48 +00:00
def get_random_string ( length ) :
return ' ' . join ( random . choice ( string . ascii_uppercase + string . digits ) for _ in range ( length ) )
def get_used_disks_for_table ( node , table_name ) :
2019-09-03 14:50:49 +00:00
return node . query ( " select disk_name from system.parts where table == ' {} ' and active=1 order by modification_time " . format ( table_name ) ) . strip ( ) . split ( ' \n ' )
2019-08-20 18:00:48 +00:00
2019-10-23 11:25:51 +00:00
def test_no_warning_about_zero_max_data_part_size ( start_cluster ) :
def get_log ( node ) :
return node . exec_in_container ( [ " bash " , " -c " , " cat /var/log/clickhouse-server/clickhouse-server.log " ] )
for node in ( node1 , node2 ) :
node . query ( """
CREATE TABLE default . test_warning_table (
s String
) ENGINE = MergeTree
ORDER BY tuple ( )
SETTINGS storage_policy = ' small_jbod_with_external '
""" )
node . query ( """
DROP TABLE default . test_warning_table
""" )
log = get_log ( node )
assert not re . search ( " Warning.*Volume.*special_warning_zero_volume " , log )
assert not re . search ( " Warning.*Volume.*special_warning_default_volume " , log )
assert re . search ( " Warning.*Volume.*special_warning_small_volume " , log )
assert not re . search ( " Warning.*Volume.*special_warning_big_volume " , log )
2019-08-20 18:35:35 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " mt_on_jbod " , " MergeTree() " ) ,
( " replicated_mt_on_jbod " , " ReplicatedMergeTree( ' /clickhouse/replicated_mt_on_jbod ' , ' 1 ' ) " , ) ,
] )
def test_round_robin ( start_cluster , name , engine ) :
2019-08-20 18:00:48 +00:00
try :
node1 . query ( """
2019-08-20 18:35:35 +00:00
CREATE TABLE { name } (
2019-08-20 18:00:48 +00:00
d UInt64
2019-08-20 18:35:35 +00:00
) ENGINE = { engine }
2019-08-20 18:00:48 +00:00
ORDER BY d
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-08-20 18:35:35 +00:00
""" .format(name=name, engine=engine))
2019-08-20 18:00:48 +00:00
# first should go to the jbod1
2019-08-20 18:35:35 +00:00
node1 . query ( " insert into {} select * from numbers(10000) " . format ( name ) )
used_disk = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
assert len ( used_disk ) == 1 , ' More than one disk used for single insert '
2019-08-20 18:35:35 +00:00
node1 . query ( " insert into {} select * from numbers(10000, 10000) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
assert len ( used_disks ) == 2 , ' Two disks should be used for two parts '
2019-08-20 18:35:35 +00:00
assert used_disks [ 0 ] != used_disks [ 1 ] , " Should write to different disks "
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
node1 . query ( " insert into {} select * from numbers(20000, 10000) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
# jbod1 -> jbod2 -> jbod1 -> jbod2 ... etc
2019-08-20 18:00:48 +00:00
assert len ( used_disks ) == 3
2019-08-20 18:35:35 +00:00
assert used_disks [ 0 ] != used_disks [ 1 ]
assert used_disks [ 2 ] == used_disks [ 0 ]
2019-08-20 18:00:48 +00:00
finally :
2019-08-20 18:35:35 +00:00
node1 . query ( " DROP TABLE IF EXISTS {} " . format ( name ) )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " mt_with_huge_part " , " MergeTree() " ) ,
( " replicated_mt_with_huge_part " , " ReplicatedMergeTree( ' /clickhouse/replicated_mt_with_huge_part ' , ' 1 ' ) " , ) ,
] )
def test_max_data_part_size ( start_cluster , name , engine ) :
2019-08-20 18:00:48 +00:00
try :
node1 . query ( """
2019-08-20 18:35:35 +00:00
CREATE TABLE { name } (
2019-08-20 18:00:48 +00:00
s1 String
2019-08-20 18:35:35 +00:00
) ENGINE = { engine }
2019-08-20 18:00:48 +00:00
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-08-20 18:35:35 +00:00
""" .format(name=name, engine=engine))
2019-08-20 18:00:48 +00:00
data = [ ] # 10MB in total
for i in range ( 10 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
2019-08-20 18:35:35 +00:00
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
assert len ( used_disks ) == 1
assert used_disks [ 0 ] == ' external '
finally :
2019-08-20 18:35:35 +00:00
node1 . query ( " DROP TABLE IF EXISTS {} " . format ( name ) )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " mt_with_overflow " , " MergeTree() " ) ,
( " replicated_mt_with_overflow " , " ReplicatedMergeTree( ' /clickhouse/replicated_mt_with_overflow ' , ' 1 ' ) " , ) ,
] )
def test_jbod_overflow ( start_cluster , name , engine ) :
2019-08-20 18:00:48 +00:00
try :
node1 . query ( """
2019-08-20 18:35:35 +00:00
CREATE TABLE { name } (
2019-08-20 18:00:48 +00:00
s1 String
2019-08-20 18:35:35 +00:00
) ENGINE = { engine }
2019-08-20 18:00:48 +00:00
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' small_jbod_with_external '
2019-08-20 18:35:35 +00:00
""" .format(name=name, engine=engine))
2019-08-20 18:00:48 +00:00
node1 . query ( " SYSTEM STOP MERGES " )
# small jbod size is 40MB, so lets insert 5MB batch 7 times
for i in range ( 7 ) :
2019-08-20 18:35:35 +00:00
data = [ ] # 5MB in total
for i in range ( 5 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
assert all ( disk == ' jbod1 ' for disk in used_disks )
# should go to the external disk (jbod is overflown)
data = [ ] # 10MB in total
for i in range ( 10 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
2019-08-20 18:35:35 +00:00
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-20 18:00:48 +00:00
assert used_disks [ - 1 ] == ' external '
node1 . query ( " SYSTEM START MERGES " )
2019-08-20 18:35:35 +00:00
time . sleep ( 1 )
2019-08-20 18:00:48 +00:00
2019-08-20 18:35:35 +00:00
node1 . query ( " OPTIMIZE TABLE {} FINAL " . format ( name ) )
time . sleep ( 2 )
2019-09-03 17:16:01 +00:00
disks_for_merges = node1 . query ( " SELECT disk_name FROM system.parts WHERE table == ' {} ' AND level >= 1 and active = 1 ORDER BY modification_time " . format ( name ) ) . strip ( ) . split ( ' \n ' )
2019-08-20 18:00:48 +00:00
assert all ( disk == ' external ' for disk in disks_for_merges )
finally :
2019-08-20 18:35:35 +00:00
node1 . query ( " DROP TABLE IF EXISTS {} " . format ( name ) )
2019-08-20 18:00:48 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " moving_mt " , " MergeTree() " ) ,
2019-08-20 18:35:35 +00:00
( " moving_replicated_mt " , " ReplicatedMergeTree( ' /clickhouse/moving_replicated_mt ' , ' 1 ' ) " , ) ,
2019-08-20 18:00:48 +00:00
] )
def test_background_move ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
s1 String
) ENGINE = { engine }
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' moving_jbod_with_external '
2019-08-20 18:00:48 +00:00
""" .format(name=name, engine=engine))
for i in range ( 5 ) :
data = [ ] # 5MB in total
for i in range ( 5 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
2019-09-04 17:26:53 +00:00
# small jbod size is 40MB, so lets insert 5MB batch 5 times
2019-08-20 18:00:48 +00:00
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
used_disks = get_used_disks_for_table ( node1 , name )
2019-08-21 10:09:29 +00:00
retry = 20
i = 0
while not sum ( 1 for x in used_disks if x == ' jbod1 ' ) < = 2 and i < retry :
time . sleep ( 0.5 )
used_disks = get_used_disks_for_table ( node1 , name )
i + = 1
2019-08-20 18:00:48 +00:00
assert sum ( 1 for x in used_disks if x == ' jbod1 ' ) < = 2
# first (oldest) part was moved to external
assert used_disks [ 0 ] == ' external '
2019-09-03 11:32:25 +00:00
path = node1 . query ( " SELECT path_on_disk FROM system.part_log WHERE table = ' {} ' AND event_type= ' MovePart ' ORDER BY event_time LIMIT 1 " . format ( name ) )
# first (oldest) part was moved to external
assert path . startswith ( " /external " )
2019-08-20 18:00:48 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-09-03 14:50:49 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " stopped_moving_mt " , " MergeTree() " ) ,
( " stopped_moving_replicated_mt " , " ReplicatedMergeTree( ' /clickhouse/stopped_moving_replicated_mt ' , ' 1 ' ) " , ) ,
] )
def test_start_stop_moves ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
s1 String
) ENGINE = { engine }
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' moving_jbod_with_external '
2019-09-03 14:50:49 +00:00
""" .format(name=name, engine=engine))
node1 . query ( " INSERT INTO {} VALUES ( ' HELLO ' ) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES ( ' WORLD ' ) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
assert all ( d == " jbod1 " for d in used_disks ) , " All writes shoud go to jbods "
2019-09-03 17:16:01 +00:00
first_part = node1 . query ( " SELECT name FROM system.parts WHERE table = ' {} ' and active = 1 ORDER BY modification_time LIMIT 1 " . format ( name ) ) . strip ( )
2019-09-03 14:50:49 +00:00
node1 . query ( " SYSTEM STOP MOVES " )
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE {} MOVE PART ' {} ' TO VOLUME ' external ' " . format ( name , first_part ) )
used_disks = get_used_disks_for_table ( node1 , name )
assert all ( d == " jbod1 " for d in used_disks ) , " Blocked moves doesn ' t actually move something "
node1 . query ( " SYSTEM START MOVES " )
node1 . query ( " ALTER TABLE {} MOVE PART ' {} ' TO VOLUME ' external ' " . format ( name , first_part ) )
2019-09-03 17:16:01 +00:00
disk = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and name = ' {} ' and active = 1 " . format ( name , first_part ) ) . strip ( )
2019-09-03 14:50:49 +00:00
assert disk == " external "
node1 . query ( " TRUNCATE TABLE {} " . format ( name ) )
node1 . query ( " SYSTEM STOP MOVES {} " . format ( name ) )
node1 . query ( " SYSTEM STOP MERGES {} " . format ( name ) )
for i in range ( 5 ) :
data = [ ] # 5MB in total
for i in range ( 5 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
# jbod size is 40MB, so lets insert 5MB batch 7 times
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
used_disks = get_used_disks_for_table ( node1 , name )
retry = 5
i = 0
while not sum ( 1 for x in used_disks if x == ' jbod1 ' ) < = 2 and i < retry :
time . sleep ( 0.1 )
used_disks = get_used_disks_for_table ( node1 , name )
i + = 1
# first (oldest) part doesn't move anywhere
assert used_disks [ 0 ] == ' jbod1 '
node1 . query ( " SYSTEM START MOVES {} " . format ( name ) )
node1 . query ( " SYSTEM START MERGES {} " . format ( name ) )
# wait sometime until background backoff finishes
retry = 30
i = 0
while not sum ( 1 for x in used_disks if x == ' jbod1 ' ) < = 2 and i < retry :
time . sleep ( 1 )
used_disks = get_used_disks_for_table ( node1 , name )
i + = 1
assert sum ( 1 for x in used_disks if x == ' jbod1 ' ) < = 2
# first (oldest) part moved to external
assert used_disks [ 0 ] == ' external '
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-09-03 11:32:25 +00:00
def get_path_for_part_from_part_log ( node , table , part_name ) :
node . query ( " SYSTEM FLUSH LOGS " )
path = node . query ( " SELECT path_on_disk FROM system.part_log WHERE table = ' {} ' and part_name = ' {} ' ORDER BY event_time DESC LIMIT 1 " . format ( table , part_name ) )
return path . strip ( )
def get_paths_for_partition_from_part_log ( node , table , partition_id ) :
node . query ( " SYSTEM FLUSH LOGS " )
paths = node . query ( " SELECT path_on_disk FROM system.part_log WHERE table = ' {} ' and partition_id = ' {} ' ORDER BY event_time DESC " . format ( table , partition_id ) )
return paths . strip ( ) . split ( ' \n ' )
2019-09-05 15:53:23 +00:00
2019-08-20 19:04:58 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " altering_mt " , " MergeTree() " ) ,
2019-09-06 15:09:20 +00:00
#("altering_replicated_mt","ReplicatedMergeTree('/clickhouse/altering_replicated_mt', '1')",),
# SYSTEM STOP MERGES doesn't disable merges assignments
2019-08-20 19:04:58 +00:00
] )
def test_alter_move ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-08-20 19:04:58 +00:00
""" .format(name=name, engine=engine))
2019-09-05 15:53:23 +00:00
node1 . query ( " SYSTEM STOP MERGES {} " . format ( name ) ) # to avoid conflicts
2019-08-20 19:04:58 +00:00
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-15 ' ), 65) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-16 ' ), 66) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-04-10 ' ), 42) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-04-11 ' ), 43) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
2019-10-24 08:52:33 +00:00
assert all ( d . startswith ( " jbod " ) for d in used_disks ) , " All writes should go to jbods "
2019-08-20 19:04:58 +00:00
2019-09-03 17:16:01 +00:00
first_part = node1 . query ( " SELECT name FROM system.parts WHERE table = ' {} ' and active = 1 ORDER BY modification_time LIMIT 1 " . format ( name ) ) . strip ( )
2019-08-20 19:04:58 +00:00
2019-09-03 17:06:36 +00:00
time . sleep ( 1 )
2019-08-20 19:04:58 +00:00
node1 . query ( " ALTER TABLE {} MOVE PART ' {} ' TO VOLUME ' external ' " . format ( name , first_part ) )
2019-09-03 17:16:01 +00:00
disk = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and name = ' {} ' and active = 1 " . format ( name , first_part ) ) . strip ( )
2019-08-20 19:04:58 +00:00
assert disk == ' external '
2019-09-03 11:32:25 +00:00
assert get_path_for_part_from_part_log ( node1 , name , first_part ) . startswith ( " /external " )
2019-08-20 19:04:58 +00:00
2019-09-03 11:32:25 +00:00
2019-09-03 17:06:36 +00:00
time . sleep ( 1 )
2019-08-20 19:04:58 +00:00
node1 . query ( " ALTER TABLE {} MOVE PART ' {} ' TO DISK ' jbod1 ' " . format ( name , first_part ) )
2019-09-03 17:16:01 +00:00
disk = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and name = ' {} ' and active = 1 " . format ( name , first_part ) ) . strip ( )
2019-08-20 19:04:58 +00:00
assert disk == ' jbod1 '
2019-09-03 11:32:25 +00:00
assert get_path_for_part_from_part_log ( node1 , name , first_part ) . startswith ( " /jbod1 " )
2019-08-20 19:04:58 +00:00
2019-09-03 17:06:36 +00:00
time . sleep ( 1 )
2019-08-20 19:04:58 +00:00
node1 . query ( " ALTER TABLE {} MOVE PARTITION 201904 TO VOLUME ' external ' " . format ( name ) )
2019-09-03 17:16:01 +00:00
disks = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and partition = ' 201904 ' and active = 1 " . format ( name ) ) . strip ( ) . split ( ' \n ' )
2019-08-20 19:04:58 +00:00
assert len ( disks ) == 2
assert all ( d == " external " for d in disks )
2019-09-03 11:32:25 +00:00
assert all ( path . startswith ( " /external " ) for path in get_paths_for_partition_from_part_log ( node1 , name , ' 201904 ' ) [ : 2 ] )
2019-09-03 17:06:36 +00:00
time . sleep ( 1 )
2019-08-20 19:04:58 +00:00
node1 . query ( " ALTER TABLE {} MOVE PARTITION 201904 TO DISK ' jbod2 ' " . format ( name ) )
2019-09-03 17:16:01 +00:00
disks = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and partition = ' 201904 ' and active = 1 " . format ( name ) ) . strip ( ) . split ( ' \n ' )
2019-08-20 19:04:58 +00:00
assert len ( disks ) == 2
assert all ( d == " jbod2 " for d in disks )
2019-09-03 11:32:25 +00:00
assert all ( path . startswith ( " /jbod2 " ) for path in get_paths_for_partition_from_part_log ( node1 , name , ' 201904 ' ) [ : 2 ] )
2019-08-20 19:04:58 +00:00
2019-08-20 19:06:03 +00:00
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 4 \n "
2019-08-20 19:04:58 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-10-24 08:52:33 +00:00
@pytest.mark.parametrize ( " volume_or_disk " , [
" DISK " ,
" VOLUME "
] )
def test_alter_move_half_of_partition ( start_cluster , volume_or_disk ) :
name = " alter_move_half_of_partition "
engine = " MergeTree() "
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
SETTINGS storage_policy = ' jbods_with_external '
""" .format(name=name, engine=engine))
node1 . query ( " SYSTEM STOP MERGES {} " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-15 ' ), 65) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-16 ' ), 42) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
assert all ( d . startswith ( " jbod " ) for d in used_disks ) , " All writes should go to jbods "
time . sleep ( 1 )
parts = node1 . query ( " SELECT name FROM system.parts WHERE table = ' {} ' and active = 1 " . format ( name ) ) . splitlines ( )
assert len ( parts ) == 2
node1 . query ( " ALTER TABLE {} MOVE PART ' {} ' TO VOLUME ' external ' " . format ( name , parts [ 0 ] ) )
disks = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and name = ' {} ' and active = 1 " . format ( name , parts [ 0 ] ) ) . splitlines ( )
assert disks == [ " external " ]
time . sleep ( 1 )
node1 . query ( " ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} ' external ' " . format ( name , volume_or_disk = volume_or_disk ) )
disks = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and partition = ' 201903 ' and active = 1 " . format ( name ) ) . splitlines ( )
assert disks == [ " external " ] * 2
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 2 \n "
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
@pytest.mark.parametrize ( " volume_or_disk " , [
" DISK " ,
" VOLUME "
] )
def test_alter_double_move_partition ( start_cluster , volume_or_disk ) :
name = " alter_double_move_partition "
engine = " MergeTree() "
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
SETTINGS storage_policy = ' jbods_with_external '
""" .format(name=name, engine=engine))
node1 . query ( " SYSTEM STOP MERGES {} " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-15 ' ), 65) " . format ( name ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019-03-16 ' ), 42) " . format ( name ) )
used_disks = get_used_disks_for_table ( node1 , name )
assert all ( d . startswith ( " jbod " ) for d in used_disks ) , " All writes should go to jbods "
time . sleep ( 1 )
node1 . query ( " ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} ' external ' " . format ( name , volume_or_disk = volume_or_disk ) )
disks = node1 . query ( " SELECT disk_name FROM system.parts WHERE table = ' {} ' and partition = ' 201903 ' and active = 1 " . format ( name ) ) . splitlines ( )
assert disks == [ " external " ] * 2
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 2 \n "
time . sleep ( 1 )
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE {} MOVE PARTITION 201903 TO {volume_or_disk} ' external ' " . format ( name , volume_or_disk = volume_or_disk ) )
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-08-21 13:06:01 +00:00
def produce_alter_move ( node , name ) :
move_type = random . choice ( [ " PART " , " PARTITION " ] )
if move_type == " PART " :
2019-09-03 09:16:35 +00:00
for _ in range ( 10 ) :
try :
2019-09-03 17:16:01 +00:00
parts = node1 . query ( " SELECT name from system.parts where table = ' {} ' and active = 1 " . format ( name ) ) . strip ( ) . split ( ' \n ' )
2019-09-03 09:16:35 +00:00
break
except QueryRuntimeException :
pass
else :
raise Exception ( " Cannot select from system.parts " )
2019-08-21 13:06:01 +00:00
move_part = random . choice ( [ " ' " + part + " ' " for part in parts ] )
else :
move_part = random . choice ( [ 201903 , 201904 ] )
move_disk = random . choice ( [ " DISK " , " VOLUME " ] )
if move_disk == " DISK " :
move_volume = random . choice ( [ " ' external ' " , " ' jbod1 ' " , " ' jbod2 ' " ] )
else :
move_volume = random . choice ( [ " ' main ' " , " ' external ' " ] )
try :
node1 . query ( " ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv} " . format (
name , mt = move_type , mp = move_part , md = move_disk , mv = move_volume ) )
except QueryRuntimeException as ex :
pass
2019-08-20 19:04:58 +00:00
2019-08-21 12:32:48 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " concurrently_altering_mt " , " MergeTree() " ) ,
( " concurrently_altering_replicated_mt " , " ReplicatedMergeTree( ' /clickhouse/concurrently_altering_replicated_mt ' , ' 1 ' ) " , ) ,
] )
def test_concurrent_alter_move ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-08-21 12:32:48 +00:00
""" .format(name=name, engine=engine))
2020-01-14 00:50:42 +00:00
values = list ( { random . randint ( 1 , 1000000 ) for _ in range ( 0 , 1000 ) } )
2019-08-21 12:32:48 +00:00
def insert ( num ) :
for i in range ( num ) :
day = random . randint ( 11 , 30 )
2020-01-14 00:50:42 +00:00
value = values . pop ( )
2019-08-21 12:32:48 +00:00
month = ' 0 ' + str ( random . choice ( [ 3 , 4 ] ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019- {m} - {d} ' ), {v} ) " . format ( name , m = month , d = day , v = value ) )
def alter_move ( num ) :
for i in range ( num ) :
2019-08-21 13:06:01 +00:00
produce_alter_move ( node1 , name )
def alter_update ( num ) :
for i in range ( num ) :
node1 . query ( " ALTER TABLE {} UPDATE number = number + 1 WHERE 1 " . format ( name ) )
2019-08-21 12:32:48 +00:00
def optimize_table ( num ) :
for i in range ( num ) :
node1 . query ( " OPTIMIZE TABLE {} FINAL " . format ( name ) )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 5 ) :
tasks . append ( p . apply_async ( insert , ( 100 , ) ) )
tasks . append ( p . apply_async ( alter_move , ( 100 , ) ) )
2019-08-21 13:06:01 +00:00
tasks . append ( p . apply_async ( alter_update , ( 100 , ) ) )
2019-08-21 12:32:48 +00:00
tasks . append ( p . apply_async ( optimize_table , ( 100 , ) ) )
for task in tasks :
task . get ( timeout = 60 )
assert node1 . query ( " SELECT 1 " ) == " 1 \n "
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 500 \n "
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-08-21 13:06:01 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " concurrently_dropping_mt " , " MergeTree() " ) ,
( " concurrently_dropping_replicated_mt " , " ReplicatedMergeTree( ' /clickhouse/concurrently_dropping_replicated_mt ' , ' 1 ' ) " , ) ,
] )
def test_concurrent_alter_move_and_drop ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-08-21 13:06:01 +00:00
""" .format(name=name, engine=engine))
2020-01-15 15:25:23 +00:00
values = list ( { random . randint ( 1 , 1000000 ) for _ in range ( 0 , 1000 ) } )
2019-08-21 13:06:01 +00:00
def insert ( num ) :
for i in range ( num ) :
day = random . randint ( 11 , 30 )
2020-01-15 15:25:23 +00:00
value = values . pop ( )
2019-08-21 13:06:01 +00:00
month = ' 0 ' + str ( random . choice ( [ 3 , 4 ] ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019- {m} - {d} ' ), {v} ) " . format ( name , m = month , d = day , v = value ) )
def alter_move ( num ) :
for i in range ( num ) :
produce_alter_move ( node1 , name )
def alter_drop ( num ) :
for i in range ( num ) :
partition = random . choice ( [ 201903 , 201904 ] )
drach = random . choice ( [ " drop " , " detach " ] )
node1 . query ( " ALTER TABLE {} {} PARTITION {} " . format ( name , drach , partition ) )
insert ( 100 )
p = Pool ( 15 )
tasks = [ ]
for i in range ( 5 ) :
tasks . append ( p . apply_async ( insert , ( 100 , ) ) )
tasks . append ( p . apply_async ( alter_move , ( 100 , ) ) )
tasks . append ( p . apply_async ( alter_drop , ( 100 , ) ) )
for task in tasks :
task . get ( timeout = 60 )
assert node1 . query ( " SELECT 1 " ) == " 1 \n "
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-11-19 11:43:33 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " detach_attach_mt " , " MergeTree() " ) ,
( " replicated_detach_attach_mt " , " ReplicatedMergeTree( ' /clickhouse/replicated_detach_attach_mt ' , ' 1 ' ) " , ) ,
] )
def test_detach_attach ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
s1 String
) ENGINE = { engine }
ORDER BY tuple ( )
SETTINGS storage_policy = ' moving_jbod_with_external '
""" .format(name=name, engine=engine))
data = [ ] # 5MB in total
for i in range ( 5 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
node1 . query ( " ALTER TABLE {} DETACH PARTITION tuple() " . format ( name ) )
assert node1 . query ( " SELECT count() FROM {} " . format ( name ) ) . strip ( ) == " 0 "
assert node1 . query ( " SELECT disk FROM system.detached_parts WHERE table = ' {} ' " . format ( name ) ) . strip ( ) == " jbod1 "
node1 . query ( " ALTER TABLE {} ATTACH PARTITION tuple() " . format ( name ) )
assert node1 . query ( " SELECT count() FROM {} " . format ( name ) ) . strip ( ) == " 5 "
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-09-04 17:26:53 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " mutating_mt " , " MergeTree() " ) ,
( " replicated_mutating_mt " , " ReplicatedMergeTree( ' /clickhouse/replicated_mutating_mt ' , ' 1 ' ) " , ) ,
] )
def test_mutate_to_another_disk ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
s1 String
) ENGINE = { engine }
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' moving_jbod_with_external '
2019-09-04 17:26:53 +00:00
""" .format(name=name, engine=engine))
for i in range ( 5 ) :
data = [ ] # 5MB in total
for i in range ( 5 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB row
node1 . query ( " INSERT INTO {} VALUES {} " . format ( name , ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
node1 . query ( " ALTER TABLE {} UPDATE s1 = concat(s1, ' x ' ) WHERE 1 " . format ( name ) )
2019-09-05 13:12:29 +00:00
retry = 20
2019-09-04 17:26:53 +00:00
while node1 . query ( " SELECT * FROM system.mutations WHERE is_done = 0 " ) != " " and retry > 0 :
retry - = 1
time . sleep ( 0.5 )
2019-09-05 15:53:23 +00:00
if node1 . query ( " SELECT latest_fail_reason FROM system.mutations WHERE table = ' {} ' " . format ( name ) ) == " " :
assert node1 . query ( " SELECT sum(endsWith(s1, ' x ' )) FROM {} " . format ( name ) ) == " 25 \n "
else : # mutation failed, let's try on another disk
print " Mutation failed "
node1 . query ( " OPTIMIZE TABLE {} FINAL " . format ( name ) )
node1 . query ( " ALTER TABLE {} UPDATE s1 = concat(s1, ' x ' ) WHERE 1 " . format ( name ) )
retry = 20
while node1 . query ( " SELECT * FROM system.mutations WHERE is_done = 0 " ) != " " and retry > 0 :
retry - = 1
time . sleep ( 0.5 )
assert node1 . query ( " SELECT sum(endsWith(s1, ' x ' )) FROM {} " . format ( name ) ) == " 25 \n "
2019-09-04 17:26:53 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-09-06 15:09:20 +00:00
@pytest.mark.parametrize ( " name,engine " , [
( " alter_modifying_mt " , " MergeTree() " ) ,
( " replicated_alter_modifying_mt " , " ReplicatedMergeTree( ' /clickhouse/replicated_alter_modifying_mt ' , ' 1 ' ) " , ) ,
] )
def test_concurrent_alter_modify ( start_cluster , name , engine ) :
try :
node1 . query ( """
CREATE TABLE { name } (
EventDate Date ,
number UInt64
) ENGINE = { engine }
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( EventDate )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' jbods_with_external '
2019-09-06 15:09:20 +00:00
""" .format(name=name, engine=engine))
2019-09-04 17:26:53 +00:00
2020-01-15 15:25:23 +00:00
values = list ( { random . randint ( 1 , 1000000 ) for _ in range ( 0 , 1000 ) } )
2019-09-06 15:09:20 +00:00
def insert ( num ) :
for i in range ( num ) :
day = random . randint ( 11 , 30 )
2020-01-15 15:25:23 +00:00
value = values . pop ( )
2019-09-06 15:09:20 +00:00
month = ' 0 ' + str ( random . choice ( [ 3 , 4 ] ) )
node1 . query ( " INSERT INTO {} VALUES(toDate( ' 2019- {m} - {d} ' ), {v} ) " . format ( name , m = month , d = day , v = value ) )
2019-09-04 17:26:53 +00:00
2019-09-06 15:09:20 +00:00
def alter_move ( num ) :
for i in range ( num ) :
produce_alter_move ( node1 , name )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def alter_modify ( num ) :
for i in range ( num ) :
column_type = random . choice ( [ " UInt64 " , " String " ] )
node1 . query ( " ALTER TABLE {} MODIFY COLUMN number {} " . format ( name , column_type ) )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
insert ( 100 )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 100 \n "
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
p = Pool ( 50 )
tasks = [ ]
for i in range ( 5 ) :
tasks . append ( p . apply_async ( alter_move , ( 100 , ) ) )
tasks . append ( p . apply_async ( alter_modify , ( 100 , ) ) )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
for task in tasks :
task . get ( timeout = 60 )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
assert node1 . query ( " SELECT 1 " ) == " 1 \n "
assert node1 . query ( " SELECT COUNT() FROM {} " . format ( name ) ) == " 100 \n "
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def test_simple_replication_and_moves ( start_cluster ) :
try :
for i , node in enumerate ( [ node1 , node2 ] ) :
node . query ( """
CREATE TABLE replicated_table_for_moves (
s1 String
) ENGINE = ReplicatedMergeTree ( ' /clickhouse/replicated_table_for_moves ' , ' {} ' )
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' moving_jbod_with_external ' , old_parts_lifetime = 1 , cleanup_delay_period = 1 , cleanup_delay_period_random_add = 2
2019-09-06 15:09:20 +00:00
""" .format(i + 1))
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
def insert ( num ) :
for i in range ( num ) :
node = random . choice ( [ node1 , node2 ] )
data = [ ] # 1MB in total
for i in range ( 2 ) :
data . append ( get_random_string ( 512 * 1024 ) ) # 500KB value
node . query ( " INSERT INTO replicated_table_for_moves VALUES {} " . format ( ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
def optimize ( num ) :
for i in range ( num ) :
node = random . choice ( [ node1 , node2 ] )
node . query ( " OPTIMIZE TABLE replicated_table_for_moves FINAL " )
2019-09-10 11:21:59 +00:00
p = Pool ( 60 )
2019-09-06 15:09:20 +00:00
tasks = [ ]
tasks . append ( p . apply_async ( insert , ( 20 , ) ) )
tasks . append ( p . apply_async ( optimize , ( 20 , ) ) )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
for task in tasks :
task . get ( timeout = 60 )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
node1 . query ( " SYSTEM SYNC REPLICA replicated_table_for_moves " , timeout = 5 )
node2 . query ( " SYSTEM SYNC REPLICA replicated_table_for_moves " , timeout = 5 )
2019-07-15 09:36:02 +00:00
2019-09-10 11:21:59 +00:00
node1 . query ( " SELECT COUNT() FROM replicated_table_for_moves " ) == " 40 \n "
node2 . query ( " SELECT COUNT() FROM replicated_table_for_moves " ) == " 40 \n "
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
data = [ ] # 1MB in total
for i in range ( 2 ) :
data . append ( get_random_string ( 512 * 1024 ) ) # 500KB value
2019-07-15 09:36:02 +00:00
2019-09-09 11:13:36 +00:00
time . sleep ( 3 ) # wait until old parts will be deleted
2019-09-10 11:21:59 +00:00
node1 . query ( " SYSTEM STOP MERGES " )
node2 . query ( " SYSTEM STOP MERGES " )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
node1 . query ( " INSERT INTO replicated_table_for_moves VALUES {} " . format ( ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
node2 . query ( " INSERT INTO replicated_table_for_moves VALUES {} " . format ( ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
time . sleep ( 3 ) # nothing was moved
2019-07-15 09:36:02 +00:00
2019-09-06 15:09:20 +00:00
disks1 = get_used_disks_for_table ( node1 , " replicated_table_for_moves " )
disks2 = get_used_disks_for_table ( node2 , " replicated_table_for_moves " )
2019-07-15 09:36:02 +00:00
2019-09-10 11:21:59 +00:00
node1 . query ( " SYSTEM START MERGES " )
node2 . query ( " SYSTEM START MERGES " )
set ( disks1 ) == set ( [ " jbod1 " , " external " ] )
set ( disks2 ) == set ( [ " jbod1 " , " external " ] )
2019-09-06 15:09:20 +00:00
finally :
for node in [ node1 , node2 ] :
node . query ( " DROP TABLE IF EXISTS replicated_table_for_moves " )
2019-09-09 12:28:28 +00:00
def test_download_appropriate_disk ( start_cluster ) :
try :
for i , node in enumerate ( [ node1 , node2 ] ) :
node . query ( """
CREATE TABLE replicated_table_for_download (
s1 String
) ENGINE = ReplicatedMergeTree ( ' /clickhouse/replicated_table_for_download ' , ' {} ' )
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' moving_jbod_with_external ' , old_parts_lifetime = 1 , cleanup_delay_period = 1 , cleanup_delay_period_random_add = 2
2019-09-09 12:28:28 +00:00
""" .format(i + 1))
data = [ ]
for i in range ( 50 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB value
node1 . query ( " INSERT INTO replicated_table_for_download VALUES {} " . format ( ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
for _ in range ( 10 ) :
try :
print " Syncing replica "
node2 . query ( " SYSTEM SYNC REPLICA replicated_table_for_download " )
break
except :
time . sleep ( 0.5 )
disks2 = get_used_disks_for_table ( node2 , " replicated_table_for_download " )
assert set ( disks2 ) == set ( [ " external " ] )
finally :
for node in [ node1 , node2 ] :
node . query ( " DROP TABLE IF EXISTS replicated_table_for_download " )
2019-09-11 10:57:32 +00:00
def test_rename ( start_cluster ) :
try :
node1 . query ( """
CREATE TABLE default . renaming_table (
s String
) ENGINE = MergeTree
ORDER BY tuple ( )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' small_jbod_with_external '
2019-09-11 10:57:32 +00:00
""" )
for _ in range ( 5 ) :
data = [ ]
for i in range ( 10 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB value
node1 . query ( " INSERT INTO renaming_table VALUES {} " . format ( ' , ' . join ( [ " ( ' " + x + " ' ) " for x in data ] ) ) )
disks = get_used_disks_for_table ( node1 , " renaming_table " )
assert len ( disks ) > 1
assert node1 . query ( " SELECT COUNT() FROM default.renaming_table " ) == " 50 \n "
node1 . query ( " RENAME TABLE default.renaming_table TO default.renaming_table1 " )
assert node1 . query ( " SELECT COUNT() FROM default.renaming_table1 " ) == " 50 \n "
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " SELECT COUNT() FROM default.renaming_table " )
node1 . query ( " CREATE DATABASE IF NOT EXISTS test " )
node1 . query ( " RENAME TABLE default.renaming_table1 TO test.renaming_table2 " )
assert node1 . query ( " SELECT COUNT() FROM test.renaming_table2 " ) == " 50 \n "
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " SELECT COUNT() FROM default.renaming_table1 " )
finally :
node1 . query ( " DROP TABLE IF EXISTS default.renaming_table " )
node1 . query ( " DROP TABLE IF EXISTS default.renaming_table1 " )
node1 . query ( " DROP TABLE IF EXISTS test.renaming_table2 " )
2019-12-09 16:20:56 +00:00
2019-09-11 10:57:32 +00:00
def test_freeze ( start_cluster ) :
try :
node1 . query ( """
CREATE TABLE default . freezing_table (
d Date ,
s String
) ENGINE = MergeTree
ORDER BY tuple ( )
PARTITION BY toYYYYMM ( d )
2019-09-20 20:35:50 +00:00
SETTINGS storage_policy = ' small_jbod_with_external '
2019-09-11 10:57:32 +00:00
""" )
for _ in range ( 5 ) :
data = [ ]
dates = [ ]
for i in range ( 10 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB value
dates . append ( " toDate( ' 2019-03-05 ' ) " )
node1 . query ( " INSERT INTO freezing_table VALUES {} " . format ( ' , ' . join ( [ " ( " + d + " , ' " + s + " ' ) " for d , s in zip ( dates , data ) ] ) ) )
disks = get_used_disks_for_table ( node1 , " freezing_table " )
assert len ( disks ) > 1
assert node1 . query ( " SELECT COUNT() FROM default.freezing_table " ) == " 50 \n "
node1 . query ( " ALTER TABLE freezing_table FREEZE PARTITION 201903 " )
# check shadow files (backups) exists
2019-09-11 17:17:10 +00:00
node1 . exec_in_container ( [ " bash " , " -c " , " find /jbod1/shadow -name ' *.mrk2 ' | grep ' .* ' " ] )
node1 . exec_in_container ( [ " bash " , " -c " , " find /external/shadow -name ' *.mrk2 ' | grep ' .* ' " ] )
2019-09-11 10:57:32 +00:00
finally :
node1 . query ( " DROP TABLE IF EXISTS default.freezing_table " )
2019-12-11 05:54:58 +00:00
node1 . exec_in_container ( [ " rm " , " -rf " , " /jbod1/shadow " , " /external/shadow " ] )
2019-12-09 16:20:56 +00:00
def test_kill_while_insert ( start_cluster ) :
try :
2019-12-09 17:53:52 +00:00
name = " test_kill_while_insert "
2019-12-09 16:20:56 +00:00
node1 . query ( """
CREATE TABLE { name } (
s String
) ENGINE = MergeTree
ORDER BY tuple ( )
SETTINGS storage_policy = ' small_jbod_with_external '
""" .format(name=name))
data = [ ]
dates = [ ]
for i in range ( 10 ) :
data . append ( get_random_string ( 1024 * 1024 ) ) # 1MB value
node1 . query ( " INSERT INTO {name} VALUES {} " . format ( ' , ' . join ( [ " ( ' " + s + " ' ) " for s in data ] ) , name = name ) )
disks = get_used_disks_for_table ( node1 , name )
assert set ( disks ) == { " jbod1 " }
start_time = time . time ( )
long_select = threading . Thread ( target = node1 . query , args = ( " SELECT sleep(3) FROM {name} " . format ( name = name ) , ) )
long_select . start ( )
time . sleep ( 0.5 )
node1 . query ( " ALTER TABLE {name} MOVE PARTITION tuple() TO DISK ' external ' " . format ( name = name ) )
assert time . time ( ) - start_time < 2
node1 . restart_clickhouse ( kill = True )
try :
long_select . join ( )
except :
""" """
time . sleep ( 0.5 )
assert node1 . query ( " SELECT count() FROM {name} " . format ( name = name ) ) . splitlines ( ) == [ " 10 " ]
finally :
2019-12-11 04:59:37 +00:00
try :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )
2019-12-11 08:58:53 +00:00
except :
""" ClickHouse may be inactive at this moment and we don ' t want to mask a meaningful exception. """
2019-12-09 17:57:53 +00:00
def test_move_while_merge ( start_cluster ) :
try :
name = " test_move_while_merge "
node1 . query ( """
CREATE TABLE { name } (
n Int64
) ENGINE = MergeTree
ORDER BY sleep ( 2 )
SETTINGS storage_policy = ' small_jbod_with_external '
""" .format(name=name))
node1 . query ( " INSERT INTO {name} VALUES (1) " . format ( name = name ) )
node1 . query ( " INSERT INTO {name} VALUES (2) " . format ( name = name ) )
parts = node1 . query ( " SELECT name FROM system.parts WHERE table = ' {name} ' AND active = 1 " . format ( name = name ) ) . splitlines ( )
assert len ( parts ) == 2
def optimize ( ) :
node1 . query ( " OPTIMIZE TABLE {name} " . format ( name = name ) )
optimize = threading . Thread ( target = optimize )
optimize . start ( )
time . sleep ( 0.5 )
with pytest . raises ( QueryRuntimeException ) :
node1 . query ( " ALTER TABLE {name} MOVE PART ' {part} ' TO DISK ' external ' " . format ( name = name , part = parts [ 0 ] ) )
exiting = False
no_exception = { }
def alter ( ) :
while not exiting :
try :
node1 . query ( " ALTER TABLE {name} MOVE PART ' {part} ' TO DISK ' external ' " . format ( name = name , part = parts [ 0 ] ) )
no_exception [ ' missing ' ] = ' exception '
break
except QueryRuntimeException :
""" """
alter_thread = threading . Thread ( target = alter )
alter_thread . start ( )
optimize . join ( )
time . sleep ( 0.5 )
exiting = True
alter_thread . join ( )
assert len ( no_exception ) == 0
assert node1 . query ( " SELECT count() FROM {name} " . format ( name = name ) ) . splitlines ( ) == [ " 2 " ]
finally :
node1 . query ( " DROP TABLE IF EXISTS {name} " . format ( name = name ) )