mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Fix #15235. When clickhouse-copier handle non-partitioned table, throw segfault error.
This commit is contained in:
parent
2623d35f68
commit
302cd55f45
@ -605,7 +605,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
|
||||
settings_push.replication_alter_partitions_sync = 2;
|
||||
|
||||
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
|
||||
" ATTACH PARTITION " + partition_name +
|
||||
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
|
||||
" FROM " + getQuotedTable(helping_table);
|
||||
|
||||
LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
|
||||
@ -636,7 +636,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
|
||||
if (!task_table.isReplicatedTable())
|
||||
{
|
||||
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
|
||||
" PARTITION " + partition_name + " DEDUPLICATE;";
|
||||
((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
|
||||
|
||||
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_alter_ast_string);
|
||||
|
||||
@ -807,7 +807,7 @@ bool ClusterCopier::tryDropPartitionPiece(
|
||||
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
|
||||
|
||||
String query = "ALTER TABLE " + getQuotedTable(helping_table);
|
||||
query += " DROP PARTITION " + task_partition.name + "";
|
||||
query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + "";
|
||||
|
||||
/// TODO: use this statement after servers will be updated up to 1.1.54310
|
||||
// query += " DROP PARTITION ID '" + task_partition.name + "'";
|
||||
@ -1567,7 +1567,7 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
|
||||
DatabaseAndTableName original_table = task_table.table_push;
|
||||
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first, original_table.second + "_piece_" + toString(current_piece_number));
|
||||
|
||||
String query = "ALTER TABLE " + getQuotedTable(helping_table) + " DROP PARTITION " + partition_name;
|
||||
String query = "ALTER TABLE " + getQuotedTable(helping_table) + ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + partition_name;
|
||||
|
||||
const ClusterPtr & cluster_push = task_table.cluster_push;
|
||||
Settings settings_push = task_cluster->settings_push;
|
||||
@ -1670,14 +1670,24 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
|
||||
|
||||
std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
|
||||
{
|
||||
std::set<String> res;
|
||||
|
||||
createShardInternalTables(timeouts, task_shard, false);
|
||||
|
||||
TaskTable & task_table = task_shard.task_table;
|
||||
|
||||
const String & partition_name = queryToString(task_table.engine_push_partition_key_ast);
|
||||
|
||||
if (partition_name == "'all'")
|
||||
{
|
||||
res.emplace("'all'");
|
||||
return res;
|
||||
}
|
||||
|
||||
String query;
|
||||
{
|
||||
WriteBufferFromOwnString wb;
|
||||
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
|
||||
wb << "SELECT DISTINCT " << partition_name << " AS partition FROM"
|
||||
<< " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
|
||||
query = wb.str();
|
||||
}
|
||||
@ -1692,7 +1702,6 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
|
||||
local_context.setSettings(task_cluster->settings_pull);
|
||||
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().getInputStream());
|
||||
|
||||
std::set<String> res;
|
||||
if (block)
|
||||
{
|
||||
ColumnWithTypeAndName & column = block.getByPosition(0);
|
||||
|
@ -0,0 +1,39 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<source_cluster>
|
||||
<shard>
|
||||
<weight>1</weight>
|
||||
<replica>
|
||||
<host>s0_0_0</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</source_cluster>
|
||||
<default_cluster>
|
||||
|
||||
<shard>
|
||||
<weight>1</weight>
|
||||
<replica>
|
||||
<host>s1_1_0</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
|
||||
</default_cluster>
|
||||
</remote_servers>
|
||||
<max_workers>1</max_workers>
|
||||
|
||||
<tables>
|
||||
<table_copier_test1>
|
||||
<cluster_pull>source_cluster</cluster_pull>
|
||||
<database_pull>default</database_pull>
|
||||
<table_pull>copier_test1</table_pull>
|
||||
|
||||
<cluster_push>default_cluster</cluster_push>
|
||||
<database_push>default</database_push>
|
||||
<table_push>copier_test1_1</table_push>
|
||||
<engine>ENGINE = MergeTree ORDER BY date SETTINGS index_granularity = 8192</engine>
|
||||
<sharding_key>rand()</sharding_key>
|
||||
</table_copier_test1>
|
||||
</tables>
|
||||
</yandex>
|
@ -230,6 +230,27 @@ class Task_no_arg:
|
||||
instance = cluster.instances['s1_1_0']
|
||||
instance.query("DROP TABLE copier_test1_1")
|
||||
|
||||
class Task_non_partitioned_table:
|
||||
|
||||
def __init__(self, cluster):
|
||||
self.cluster = cluster
|
||||
self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table"
|
||||
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read()
|
||||
self.rows = 1000000
|
||||
|
||||
def start(self):
|
||||
instance = cluster.instances['s0_0_0']
|
||||
instance.query(
|
||||
"create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192")
|
||||
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
|
||||
|
||||
def check(self):
|
||||
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
|
||||
instance = cluster.instances['s0_0_0']
|
||||
instance.query("DROP TABLE copier_test1")
|
||||
instance = cluster.instances['s1_1_0']
|
||||
instance.query("DROP TABLE copier_test1_1")
|
||||
|
||||
|
||||
def execute_task(task, cmd_options):
|
||||
task.start()
|
||||
@ -359,6 +380,8 @@ def test_no_index(started_cluster):
|
||||
def test_no_arg(started_cluster):
|
||||
execute_task(Task_no_arg(started_cluster), [])
|
||||
|
||||
def test_non_partitioned_table(started_cluster):
|
||||
execute_task(Task_non_partitioned_table(started_cluster), [])
|
||||
|
||||
if __name__ == '__main__':
|
||||
with contextmanager(started_cluster)() as cluster:
|
||||
|
Loading…
Reference in New Issue
Block a user