Merge pull request #17248 from kaka11chen/15235

Fix #15235. When clickhouse-copier handle non-partitioned table, throws segfault error.
This commit is contained in:
Nikita Mikhaylov 2020-11-25 14:31:59 +03:00 committed by GitHub
commit 27acf6462f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 6 deletions

View File

@ -605,7 +605,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
settings_push.replication_alter_partitions_sync = 2; settings_push.replication_alter_partitions_sync = 2;
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) + query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
" ATTACH PARTITION " + partition_name + ((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
" FROM " + getQuotedTable(helping_table); " FROM " + getQuotedTable(helping_table);
LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string); LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
@ -636,7 +636,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
if (!task_table.isReplicatedTable()) if (!task_table.isReplicatedTable())
{ {
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) + query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
" PARTITION " + partition_name + " DEDUPLICATE;"; ((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string); LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
@ -807,7 +807,7 @@ bool ClusterCopier::tryDropPartitionPiece(
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table); String query = "ALTER TABLE " + getQuotedTable(helping_table);
query += " DROP PARTITION " + task_partition.name + ""; query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310 /// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'"; // query += " DROP PARTITION ID '" + task_partition.name + "'";
@ -1567,7 +1567,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
DatabaseAndTableName original_table = task_table.table_push; DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number)); DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name; String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name;
const ClusterPtr & cluster_push = task_table.cluster_push; const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push; Settings settings_push = task_cluster->settings_push;
@ -1670,14 +1670,24 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard) std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{ {
std::set<String> res;
createShardInternalTables(timeouts, task_shard, false); createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table; TaskTable & task_table = task_shard.task_table;
const String & partition_name = queryToString(task_table.engine_push_partition_key_ast);
if (partition_name == "'all'")
{
res.emplace("'all'");
return res;
}
String query; String query;
{ {
WriteBufferFromOwnString wb; WriteBufferFromOwnString wb;
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM" wb << "SELECT DISTINCT " << partition_name << " AS partition FROM"
<< " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC"; << " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
query = wb.str(); query = wb.str();
} }
@ -1692,7 +1702,6 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
local_context.setSettings(task_cluster->settings_pull); local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()); Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
std::set<String> res;
if (block) if (block)
{ {
ColumnWithTypeAndName & column = block.getByPosition(0); ColumnWithTypeAndName & column = block.getByPosition(0);

View File

@ -0,0 +1,39 @@
<yandex>
<remote_servers>
<source_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_cluster>
<default_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>1</max_workers>
<tables>
<table_copier_test1>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>copier_test1</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>default</database_push>
<table_push>copier_test1_1</table_push>
<engine>ENGINE = MergeTree ORDER BY date SETTINGS index_granularity = 8192</engine>
<sharding_key>rand()</sharding_key>
</table_copier_test1>
</tables>
</yandex>

View File

@ -230,6 +230,27 @@ class Task_no_arg:
instance = cluster.instances['s1_1_0'] instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1") instance.query("DROP TABLE copier_test1_1")
class Task_non_partitioned_table:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read()
self.rows = 1000000
def start(self):
instance = cluster.instances['s0_0_0']
instance.query(
"create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE copier_test1")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
def execute_task(task, cmd_options): def execute_task(task, cmd_options):
task.start() task.start()
@ -359,6 +380,8 @@ def test_no_index(started_cluster):
def test_no_arg(started_cluster): def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), []) execute_task(Task_no_arg(started_cluster), [])
def test_non_partitioned_table(started_cluster):
execute_task(Task_non_partitioned_table(started_cluster), [])
if __name__ == '__main__': if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster: with contextmanager(started_cluster)() as cluster: