support copying of tables with data skipping indices

This commit is contained in:
Nikita Mikhaylov 2021-04-23 02:54:57 +03:00
parent 86c9373fb7
commit 9022646633
3 changed files with 120 additions and 7 deletions

View File

@ -694,9 +694,10 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
return TaskStatus::Finished;
}
/// Remove column's TTL expression from `CREATE` query
/// This is needed to create internal Distributed table
/// Also it removes MATEREALIZED or ALIAS columns not to copy additional and useless data over the network.
/// This is needed to create internal Distributed table
/// Removes column's TTL expression from `CREATE` query
/// Removes MATEREALIZED or ALIAS columns not to copy additional and useless data over the network.
/// Removes data skipping indices.
ASTPtr ClusterCopier::removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns)
{
const ASTs & column_asts = query_ast->as<ASTCreateQuery &>().columns_list->columns->children;
@ -1279,6 +1280,8 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
if (!limit.empty())
query += " LIMIT " + limit;
query += "FORMAT Native";
ParserQuery p_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@ -1497,7 +1500,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
ASTPtr query_insert_ast;
{
String query;
query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " FORMAT Native ";
ParserQuery p_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@ -1772,7 +1775,7 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
auto create_query_ast = removeAliasMaterializedAndTTLColumnsFromCreateQuery(
task_shard.current_pull_table_create_query,
task_shard.current_pull_table_create_query,
task_table.allow_to_copy_alias_and_materialized_columns);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
@ -1876,7 +1879,7 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: \n {}",
LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: \n {}",
task_shard.getDescription(), partition_quoted_name, query_ast->formatForErrorMessage());
auto local_context = Context::createCopy(context);

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_skip_index</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_skip_index</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column3, Column2, Column1)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -184,6 +184,71 @@ class TaskTTL:
assert a == b, "Data"
class TaskSkipIndex:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = '/clickhouse-copier/task_skip_index'
self.container_task_file = "/task_skip_index.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_skip_index.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
first = cluster.instances["first"]
first.query("CREATE DATABASE db_skip_index;")
first.query("""CREATE TABLE db_skip_index.source
(
Column1 UInt64,
Column2 Int32,
Column3 Date,
Column4 DateTime,
Column5 String,
INDEX a (Column1 * Column2, Column5) TYPE minmax GRANULARITY 3,
INDEX b (Column1 * length(Column5)) TYPE set(1000) GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY (toYYYYMMDD(Column3), Column3)
PRIMARY KEY (Column1, Column2, Column3)
ORDER BY (Column1, Column2, Column3)
SETTINGS index_granularity = 8192""")
first.query("""INSERT INTO db_skip_index.source SELECT * FROM generateRandom(
'Column1 UInt64, Column2 Int32, Column3 Date, Column4 DateTime, Column5 String', 1, 10, 2) LIMIT 100;""")
second = cluster.instances["second"]
second.query("CREATE DATABASE db_skip_index;")
second.query("""CREATE TABLE db_skip_index.destination
(
Column1 UInt64,
Column2 Int32,
Column3 Date,
Column4 DateTime,
Column5 String,
INDEX a (Column1 * Column2, Column5) TYPE minmax GRANULARITY 3,
INDEX b (Column1 * length(Column5)) TYPE set(1000) GRANULARITY 4
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(Column3)
ORDER BY (Column3, Column2, Column1);""")
print("Preparation completed")
def check(self):
first = cluster.instances["first"]
second = cluster.instances["second"]
a = first.query("SELECT count() from db_skip_index.source")
b = second.query("SELECT count() from db_skip_index.destination")
assert a == b, "Count"
a = TSV(first.query("""SELECT sipHash64(*) from db_skip_index.source
ORDER BY (Column1, Column2, Column3, Column4, Column5)"""))
b = TSV(second.query("""SELECT sipHash64(*) from db_skip_index.destination
ORDER BY (Column1, Column2, Column3, Column4, Column5)"""))
assert a == b, "Data"
def execute_task(task, cmd_options):
task.start()
@ -241,4 +306,9 @@ def test_different_schema(started_cluster):
@pytest.mark.timeout(600)
def test_ttl_columns(started_cluster):
execute_task(TaskTTL(started_cluster), [])
execute_task(TaskTTL(started_cluster), [])
@pytest.mark.timeout(600)
def test_skip_index(started_cluster):
execute_task(TaskSkipIndex(started_cluster), [])