mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
CLICKHOUSE-4421 Fix segfault in copier (#4835)
* Fix segfault in copier * add test * better runner
This commit is contained in:
parent
736e3c0f46
commit
edaec2353c
@ -1201,7 +1201,8 @@ protected:
|
||||
|
||||
auto new_columns_list = std::make_shared<ASTColumns>();
|
||||
new_columns_list->set(new_columns_list->columns, new_columns);
|
||||
new_columns_list->set(new_columns_list->indices, query_ast->as<ASTCreateQuery>()->columns_list->indices->clone());
|
||||
if (auto indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
|
||||
new_columns_list->set(new_columns_list->indices, indices->clone());
|
||||
|
||||
new_query.replace(new_query.columns_list, new_columns_list);
|
||||
|
||||
|
@ -1,10 +1,24 @@
|
||||
if (CLICKHOUSE_SPLIT_BINARY)
|
||||
if(CLICKHOUSE_SPLIT_BINARY)
|
||||
set (TEST_USE_BINARIES CLICKHOUSE_TESTS_SERVER_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse-server CLICKHOUSE_TESTS_CLIENT_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse-client)
|
||||
else()
|
||||
set (TEST_USE_BINARIES CLICKHOUSE_TESTS_SERVER_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse CLICKHOUSE_TESTS_CLIENT_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse)
|
||||
endif()
|
||||
|
||||
find_program(DOCKER_CMD docker)
|
||||
find_program(DOCKER_COMPOSE_CMD docker-compose)
|
||||
find_program(PYTEST_CMD pytest)
|
||||
find_program(SUDO_CMD sudo)
|
||||
|
||||
# will mount only one binary to docker container - build with .so cant work
|
||||
if (MAKE_STATIC_LIBRARIES)
|
||||
add_test (NAME integration WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND env ${TEST_USE_BINARIES} "CLICKHOUSE_TESTS_BASE_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/dbms/programs/server/" ${PYTEST_STARTER} pytest ${PYTEST_OPT})
|
||||
if(MAKE_STATIC_LIBRARIES AND DOCKER_CMD)
|
||||
if(INTEGRATION_USE_RUNNER AND SUDO_CMD)
|
||||
add_test(NAME integration-runner WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${SUDO_CMD} ${CMAKE_CURRENT_SOURCE_DIR}/runner --binary ${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse --configs-dir ${ClickHouse_SOURCE_DIR}/dbms/programs/server/)
|
||||
message(STATUS "Using tests in docker with runner SUDO=${SUDO_CMD}; DOCKER=${DOCKER_CMD};")
|
||||
endif()
|
||||
if(NOT INTEGRATION_USE_RUNNER AND DOCKER_COMPOSE_CMD AND PYTEST_CMD)
|
||||
# To run one test with debug:
|
||||
# cmake . -DPYTEST_OPT="-ss;test_cluster_copier"
|
||||
add_test(NAME integration-pytest WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND env ${TEST_USE_BINARIES} "CLICKHOUSE_TESTS_BASE_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/dbms/programs/server/" ${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT})
|
||||
message(STATUS "Using tests in docker DOCKER=${DOCKER_CMD}; DOCKER_COMPOSE=${DOCKER_COMPOSE_CMD}; PYTEST=${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT}")
|
||||
endif()
|
||||
endif()
|
||||
|
109
dbms/tests/integration/test_cluster_copier/task_no_index.xml
Normal file
109
dbms/tests/integration/test_cluster_copier/task_no_index.xml
Normal file
@ -0,0 +1,109 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<source_cluster>
|
||||
<shard>
|
||||
<internal_replication>false</internal_replication>
|
||||
<replica>
|
||||
<host>s0_0_0</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</source_cluster>
|
||||
|
||||
<destination_cluster>
|
||||
<shard>
|
||||
<internal_replication>false</internal_replication>
|
||||
<replica>
|
||||
<host>s1_1_0</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</destination_cluster>
|
||||
</remote_servers>
|
||||
|
||||
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
|
||||
<max_workers>2</max_workers>
|
||||
|
||||
<!-- Setting used to fetch (pull) data from source cluster tables -->
|
||||
<settings_pull>
|
||||
<readonly>1</readonly>
|
||||
</settings_pull>
|
||||
|
||||
<!-- Setting used to insert (push) data to destination cluster tables -->
|
||||
<settings_push>
|
||||
<readonly>0</readonly>
|
||||
</settings_push>
|
||||
|
||||
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
|
||||
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
|
||||
<settings>
|
||||
<connect_timeout>3</connect_timeout>
|
||||
<!-- Sync insert is set forcibly, leave it here just in case. -->
|
||||
<insert_distributed_sync>1</insert_distributed_sync>
|
||||
</settings>
|
||||
|
||||
<!-- Copying tasks description.
|
||||
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
|
||||
sequentially.
|
||||
-->
|
||||
<tables>
|
||||
<!-- A table task, copies one table. -->
|
||||
<table_hits>
|
||||
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
|
||||
<cluster_pull>source_cluster</cluster_pull>
|
||||
<database_pull>default</database_pull>
|
||||
<table_pull>ontime</table_pull>
|
||||
<!-- <table_pull>onetime</table_pull> -->
|
||||
|
||||
<!-- Destination cluster name and tables in which the data should be inserted -->
|
||||
<cluster_push>destination_cluster</cluster_push>
|
||||
<database_push>default</database_push>
|
||||
<table_push>ontime22</table_push>
|
||||
<!-- <table_pull>onetime</table_pull> -->
|
||||
|
||||
<!-- Engine of destination tables.
|
||||
If destination tables have not be created, workers create them using columns definition from source tables and engine
|
||||
definition from here.
|
||||
|
||||
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
|
||||
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
|
||||
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
|
||||
system.parts table.
|
||||
-->
|
||||
|
||||
|
||||
<engine>
|
||||
ENGINE = MergeTree() PARTITION BY Year ORDER BY (Year, FlightDate) SETTINGS index_granularity=8192
|
||||
</engine>
|
||||
|
||||
<!-- Sharding key used to insert data to destination cluster -->
|
||||
|
||||
<sharding_key>jumpConsistentHash(intHash64(Year), 2)</sharding_key>
|
||||
|
||||
<!-- Optional expression that filter data while pull them from source servers -->
|
||||
<!-- <where_condition>CounterID != 0</where_condition> -->
|
||||
|
||||
<!-- This section specifies partitions that should be copied, other partition will be ignored.
|
||||
Partition names should have the same format as
|
||||
partition column of system.parts table (i.e. a quoted text).
|
||||
Since partition key of source and destination cluster could be different,
|
||||
these partition names specify destination partitions.
|
||||
|
||||
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
|
||||
it is strictly recommended to specify them explicitly.
|
||||
If you already have some ready paritions on destination cluster they
|
||||
will be removed at the start of the copying since they will be interpeted
|
||||
as unfinished data from the previous copying!!!
|
||||
-->
|
||||
<enabled_partitions>
|
||||
<partition>2017</partition>
|
||||
</enabled_partitions>
|
||||
</table_hits>
|
||||
|
||||
<!-- Next table to copy. It is not copied until previous table is copying. -->
|
||||
<!-- </table_visits>
|
||||
|
||||
</table_visits>
|
||||
-->
|
||||
</tables>
|
||||
</yandex>
|
@ -167,6 +167,28 @@ class Task_test_block_size:
|
||||
ddl_check_query(instance, "DROP TABLE test_block_size ON CLUSTER shard_0_0", 2)
|
||||
ddl_check_query(instance, "DROP TABLE test_block_size ON CLUSTER cluster1")
|
||||
|
||||
class Task_no_index:
|
||||
|
||||
def __init__(self, cluster):
|
||||
self.cluster = cluster
|
||||
self.zk_task_path="/clickhouse-copier/task_no_index"
|
||||
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_no_index.xml'), 'r').read()
|
||||
self.rows = 1000000
|
||||
|
||||
|
||||
def start(self):
|
||||
instance = cluster.instances['s0_0_0']
|
||||
instance.query("create table ontime (Year UInt16, FlightDate String) ENGINE = Memory")
|
||||
instance.query("insert into ontime values (2016, 'test6'), (2017, 'test7'), (2018, 'test8')")
|
||||
|
||||
|
||||
def check(self):
|
||||
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT Year FROM ontime22")) == TSV("2017\n")
|
||||
instance = cluster.instances['s0_0_0']
|
||||
instance.query("DROP TABLE ontime")
|
||||
instance = cluster.instances['s1_1_0']
|
||||
instance.query("DROP TABLE ontime22")
|
||||
|
||||
|
||||
def execute_task(task, cmd_options):
|
||||
task.start()
|
||||
@ -229,6 +251,9 @@ def test_copy_month_to_week_partition_with_recovering(started_cluster):
|
||||
def test_block_size(started_cluster):
|
||||
execute_task(Task_test_block_size(started_cluster), [])
|
||||
|
||||
def test_no_index(started_cluster):
|
||||
execute_task(Task_no_index(started_cluster), [])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
|
Loading…
Reference in New Issue
Block a user