2021-05-17 14:29:32 +00:00
import pytest
from helpers . cluster import ClickHouseCluster
2021-05-30 13:57:30 +00:00
from helpers . test_tools import assert_eq_with_retry , exec_query_with_retry
2022-03-22 16:39:58 +00:00
2021-05-17 14:29:32 +00:00
cluster = ClickHouseCluster ( __file__ )
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
node1 = cluster . add_instance ( " node1 " , stay_alive = True )
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
node2 = cluster . add_instance (
" node2 " ,
with_zookeeper = True ,
image = " yandex/clickhouse-server " ,
tag = " 21.2 " ,
with_installed_binary = True ,
stay_alive = True ,
)
2021-05-31 14:44:57 +00:00
# Use differents nodes because if there is node.restart_from_latest_version(), then in later tests
# it will be with latest version, but shouldn't, order of tests in CI is shuffled.
2022-03-22 16:39:58 +00:00
node3 = cluster . add_instance (
" node3 " ,
image = " yandex/clickhouse-server " ,
tag = " 21.5 " ,
with_installed_binary = True ,
stay_alive = True ,
)
node4 = cluster . add_instance (
" node4 " ,
image = " yandex/clickhouse-server " ,
tag = " 21.5 " ,
with_installed_binary = True ,
stay_alive = True ,
)
node5 = cluster . add_instance (
" node5 " ,
image = " yandex/clickhouse-server " ,
tag = " 21.5 " ,
with_installed_binary = True ,
stay_alive = True ,
)
node6 = cluster . add_instance (
" node6 " ,
image = " yandex/clickhouse-server " ,
tag = " 21.5 " ,
with_installed_binary = True ,
stay_alive = True ,
)
def insert_data ( node , table_name = " test_table " , n = 1 , col2 = 1 ) :
node . query (
""" INSERT INTO {}
2021-05-31 14:44:57 +00:00
SELECT toDateTime ( NOW ( ) ) , { } ,
sumMapState ( arrayMap ( i - > 1 , range ( 300 ) ) , arrayMap ( i - > 1 , range ( 300 ) ) )
2022-03-22 16:39:58 +00:00
FROM numbers ( { } ) ; """ .format(
table_name , col2 , n
)
)
2021-05-30 22:54:42 +00:00
2021-05-31 14:44:57 +00:00
2022-03-22 16:39:58 +00:00
def create_table ( node , name = " test_table " , version = None ) :
2021-05-31 14:44:57 +00:00
node . query ( " DROP TABLE IF EXISTS {} ; " . format ( name ) )
if version is None :
2022-03-22 16:39:58 +00:00
node . query (
"""
2021-05-31 14:44:57 +00:00
CREATE TABLE { }
(
` col1 ` DateTime ,
` col2 ` Int64 ,
` col3 ` AggregateFunction ( sumMap , Array ( UInt8 ) , Array ( UInt8 ) )
)
2022-03-22 16:39:58 +00:00
ENGINE = AggregatingMergeTree ( ) ORDER BY ( col1 , col2 ) """ .format(
name
)
)
2021-05-31 14:44:57 +00:00
else :
2022-03-22 16:39:58 +00:00
node . query (
"""
2021-05-31 14:44:57 +00:00
CREATE TABLE { }
(
` col1 ` DateTime ,
` col2 ` Int64 ,
` col3 ` AggregateFunction ( { } , sumMap , Array ( UInt8 ) , Array ( UInt8 ) )
)
2022-03-22 16:39:58 +00:00
ENGINE = AggregatingMergeTree ( ) ORDER BY ( col1 , col2 ) """ .format(
name , version
)
)
2021-05-17 14:29:32 +00:00
@pytest.fixture ( scope = " module " )
def start_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2021-05-30 13:57:30 +00:00
def test_modulo_partition_key_issue_23508 ( start_cluster ) :
2022-03-22 16:39:58 +00:00
node2 . query (
" CREATE TABLE test (id Int64, v UInt64, value String) ENGINE = ReplicatedReplacingMergeTree( ' /clickhouse/tables/table1 ' , ' 1 ' , v) PARTITION BY id % 20 ORDER BY (id, v) "
)
node2 . query (
" INSERT INTO test SELECT number, number, toString(number) FROM numbers(10) "
)
2021-05-30 13:57:30 +00:00
2021-05-31 14:44:57 +00:00
expected = node2 . query ( " SELECT number, number, toString(number) FROM numbers(10) " )
2022-03-22 16:39:58 +00:00
partition_data = node2 . query (
" SELECT partition, name FROM system.parts WHERE table= ' test ' ORDER BY partition "
)
assert expected == node2 . query ( " SELECT * FROM test ORDER BY id " )
2021-05-30 13:57:30 +00:00
2021-05-31 14:44:57 +00:00
node2 . restart_with_latest_version ( )
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
assert expected == node2 . query ( " SELECT * FROM test ORDER BY id " )
assert partition_data == node2 . query (
" SELECT partition, name FROM system.parts WHERE table= ' test ' ORDER BY partition "
)
2021-05-30 13:57:30 +00:00
2021-05-31 14:44:57 +00:00
# Test from issue 16587
2021-05-30 13:57:30 +00:00
def test_aggregate_function_versioning_issue_16587 ( start_cluster ) :
2021-05-31 14:44:57 +00:00
for node in [ node1 , node3 ] :
2021-05-30 13:57:30 +00:00
node . query ( " DROP TABLE IF EXISTS test_table; " )
2022-03-22 16:39:58 +00:00
node . query (
"""
2021-05-30 13:57:30 +00:00
CREATE TABLE test_table ( ` col1 ` DateTime , ` col2 ` Int64 )
2022-03-22 16:39:58 +00:00
ENGINE = MergeTree ( ) ORDER BY col1 """
)
node . query (
" insert into test_table select ' 2020-10-26 00:00:00 ' , 1929292 from numbers(300) "
)
2021-05-30 13:57:30 +00:00
expected = " ([1],[600]) "
2022-03-22 16:39:58 +00:00
result_on_old_version = node3 . query (
" select sumMap(sm) from (select sumMap([1],[1]) as sm from remote( ' 127.0.0. { 1,2} ' , default.test_table) group by col1, col2); "
) . strip ( )
assert result_on_old_version != expected
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
result_on_new_version = node1 . query (
" select sumMap(sm) from (select sumMap([1],[1]) as sm from remote( ' 127.0.0. { 1,2} ' , default.test_table) group by col1, col2); "
) . strip ( )
assert result_on_new_version == expected
2021-05-30 13:57:30 +00:00
2021-05-31 14:44:57 +00:00
def test_aggregate_function_versioning_fetch_data_from_old_to_new_server ( start_cluster ) :
for node in [ node1 , node4 ] :
create_table ( node )
insert_data ( node )
2021-05-30 13:57:30 +00:00
expected = " ([1],[300]) "
2022-03-22 16:39:58 +00:00
new_server_data = node1 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert new_server_data == expected
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
old_server_data = node4 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert old_server_data != expected
2021-05-30 13:57:30 +00:00
2022-03-22 16:39:58 +00:00
data_from_old_to_new_server = node1 . query (
" select finalizeAggregation(col3) from remote( ' node4 ' , default.test_table); "
) . strip ( )
assert data_from_old_to_new_server == old_server_data
2021-05-30 13:57:30 +00:00
def test_aggregate_function_versioning_server_upgrade ( start_cluster ) :
2021-05-31 14:44:57 +00:00
for node in [ node1 , node5 ] :
create_table ( node )
insert_data ( node1 , col2 = 5 )
insert_data ( node5 , col2 = 1 )
2021-05-30 13:57:30 +00:00
2021-11-30 12:14:35 +00:00
# Serialization with version 0, server does not support versioning of aggregate function states.
2022-03-22 16:39:58 +00:00
old_server_data = node5 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert old_server_data == " ([1],[44]) "
2021-11-30 12:14:35 +00:00
create = node5 . query ( " describe table default.test_table; " ) . strip ( )
2022-03-22 16:39:58 +00:00
assert create . strip ( ) . endswith (
" col3 \t AggregateFunction(sumMap, Array(UInt8), Array(UInt8)) "
)
print ( " Ok 1 " )
2021-05-30 22:54:42 +00:00
2021-05-31 14:44:57 +00:00
# Upgrade server.
node5 . restart_with_latest_version ( )
2021-05-30 22:54:42 +00:00
2021-11-30 12:14:35 +00:00
# Deserialized with version 0, server supports versioning.
2022-03-22 16:39:58 +00:00
upgraded_server_data = node5 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) "
2021-11-30 12:14:35 +00:00
create = node5 . query ( " describe table default.test_table; " ) . strip ( )
2022-03-22 16:39:58 +00:00
assert create . strip ( ) . endswith (
" col3 \t AggregateFunction(sumMap, Array(UInt8), Array(UInt8)) "
)
print ( " Ok 2 " )
2021-05-30 22:54:42 +00:00
2021-11-30 12:14:35 +00:00
create = node1 . query ( " describe table default.test_table; " ) . strip ( )
print ( create )
2022-03-22 16:39:58 +00:00
assert create . strip ( ) . endswith (
" col3 \t AggregateFunction(1, sumMap, Array(UInt8), Array(UInt8)) "
)
2021-11-30 12:14:35 +00:00
2021-05-31 14:44:57 +00:00
# Data from upgraded server to new server. Deserialize with version 0.
2022-03-22 16:39:58 +00:00
data_from_upgraded_to_new_server = node1 . query (
" select finalizeAggregation(col3) from remote( ' node5 ' , default.test_table); "
) . strip ( )
assert data_from_upgraded_to_new_server == upgraded_server_data == " ([1],[44]) "
print ( " Ok 3 " )
2021-05-30 13:57:30 +00:00
2021-11-30 12:14:35 +00:00
# Data is serialized according to version 0 (though one of the states is version 1, but result is version 0).
2022-03-22 16:39:58 +00:00
upgraded_server_data = node5 . query (
" select finalizeAggregation(col3) from remote( ' 127.0.0. { 1,2} ' , default.test_table); "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) \n ([1],[44]) "
print ( " Ok 4 " )
2021-05-30 22:54:42 +00:00
2021-05-31 14:44:57 +00:00
# Check insertion after server upgarde.
insert_data ( node5 , col2 = 2 )
2021-11-30 12:14:35 +00:00
# Check newly inserted data is still serialized with 0 version.
2022-03-22 16:39:58 +00:00
upgraded_server_data = node5 . query (
" select finalizeAggregation(col3) from default.test_table order by col2; "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) \n ([1],[44]) "
print ( " Ok 5 " )
2021-05-31 14:44:57 +00:00
2021-11-30 12:14:35 +00:00
# New table has latest version.
2022-03-22 16:39:58 +00:00
new_server_data = node1 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert new_server_data == " ([1],[300]) "
print ( " Ok 6 " )
2021-05-31 14:44:57 +00:00
2021-11-30 12:14:35 +00:00
# Insert from new server (with version 1) to upgraded server (where version will be 0), result version 0.
2022-03-22 16:39:58 +00:00
node1 . query (
" insert into table function remote( ' node5 ' , default.test_table) select * from default.test_table; "
) . strip ( )
upgraded_server_data = node5 . query (
" select finalizeAggregation(col3) from default.test_table order by col2; "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) \n ([1],[44]) \n ([1],[44]) "
print ( " Ok 7 " )
2021-05-31 14:44:57 +00:00
2021-11-30 12:14:35 +00:00
# But new table gets data with latest version.
2021-05-31 14:44:57 +00:00
insert_data ( node1 )
2022-03-22 16:39:58 +00:00
new_server_data = node1 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert new_server_data == " ([1],[300]) \n ([1],[300]) "
print ( " Ok 8 " )
2021-05-31 14:44:57 +00:00
2021-11-30 12:14:35 +00:00
# Create table with column implicitly with older version (version 0).
2022-03-22 16:39:58 +00:00
create_table ( node1 , name = " test_table_0 " , version = 0 )
insert_data ( node1 , table_name = " test_table_0 " , col2 = 3 )
data = node1 . query (
" select finalizeAggregation(col3) from default.test_table_0; "
) . strip ( )
assert data == " ([1],[44]) "
print ( " Ok " )
2021-05-31 14:44:57 +00:00
2021-11-30 12:14:35 +00:00
# Insert from new server to upgraded server to a new table but the version was set implicitly to 0, so data version 0.
2022-03-22 16:39:58 +00:00
node1 . query (
" insert into table function remote( ' node5 ' , default.test_table) select * from default.test_table_0; "
) . strip ( )
upgraded_server_data = node5 . query (
" select finalizeAggregation(col3) from default.test_table order by col2; "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) \n ([1],[44]) \n ([1],[44]) \n ([1],[44]) "
print ( " Ok " )
2021-05-30 22:54:42 +00:00
def test_aggregate_function_versioning_persisting_metadata ( start_cluster ) :
2021-05-31 14:44:57 +00:00
for node in [ node1 , node6 ] :
create_table ( node )
insert_data ( node )
2022-03-22 16:39:58 +00:00
data = node1 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert data == " ([1],[300]) "
data = node6 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert data == " ([1],[44]) "
2021-05-30 13:57:30 +00:00
2021-05-31 14:44:57 +00:00
node6 . restart_with_latest_version ( )
for node in [ node1 , node6 ] :
2021-05-30 22:54:42 +00:00
node . query ( " DETACH TABLE test_table " )
node . query ( " ATTACH TABLE test_table " )
2021-05-31 14:44:57 +00:00
for node in [ node1 , node6 ] :
2021-05-30 22:54:42 +00:00
insert_data ( node )
2022-03-22 16:39:58 +00:00
new_server_data = node1 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert new_server_data == " ([1],[300]) \n ([1],[300]) "
2021-05-31 14:44:57 +00:00
2022-03-22 16:39:58 +00:00
upgraded_server_data = node6 . query (
" select finalizeAggregation(col3) from default.test_table; "
) . strip ( )
assert upgraded_server_data == " ([1],[44]) \n ([1],[44]) "
2021-05-30 22:54:42 +00:00
2021-05-31 14:44:57 +00:00
for node in [ node1 , node6 ] :
2021-05-30 22:54:42 +00:00
node . restart_clickhouse ( )
insert_data ( node )
2022-03-22 16:39:58 +00:00
result = node1 . query (
" select finalizeAggregation(col3) from remote( ' 127.0.0. { 1,2} ' , default.test_table); "
) . strip ( )
assert (
result
== " ([1],[300]) \n ([1],[300]) \n ([1],[300]) \n ([1],[300]) \n ([1],[300]) \n ([1],[300]) "
)
result = node6 . query (
" select finalizeAggregation(col3) from remote( ' 127.0.0. { 1,2} ' , default.test_table); "
) . strip ( )
assert (
result
== " ([1],[44]) \n ([1],[44]) \n ([1],[44]) \n ([1],[44]) \n ([1],[44]) \n ([1],[44]) "
)