clickhouse-copier: Allow empty database name

This is needed to make clickhouse-copier work with cross-replication
setups (also known as [circular replication][1]).

When database name is not specified we rely on `<default_database>`
element from replica stanza.

Example:

```xml
<source_cluster>
    <shard>
        <replica>
            <host>localhost</host>
            <port>9000</port>
            <default_database>r0</default_database>
        </replica>
        <replica>
            <host>localhost</host>
            <port>666</port>
            <default_database>r1</default_database>
        </replica>
    </shard>
    <shard>
        <replica>
            <host>localhost</host>
            <port>666</port>
            <default_database>r0</default_database>
        </replica>
        <replica>
            <host>localhost</host>
            <port>9000</port>
            <default_database>r1</default_database>
        </replica>
    </shard>
</source_cluster>
```

[1]: https://www.altinity.com/blog/2018/5/10/circular-replication-cluster-topology-in-clickhouse
This commit is contained in:
Nicolae Vartolomei 2019-06-25 21:55:52 +00:00
parent 71872fc3c3
commit 59d66bb1d5

View File

@ -96,14 +96,19 @@ namespace
using DatabaseAndTableName = std::pair<String, String>;
String getDatabaseDotTable(const String & database, const String & table)
String getQuotedTable(const String & database, const String & table)
{
if (database.empty())
{
return backQuoteIfNeed(table);
}
return backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
}
String getDatabaseDotTable(const DatabaseAndTableName & db_and_table)
String getQuotedTable(const DatabaseAndTableName & db_and_table)
{
return getDatabaseDotTable(db_and_table.first, db_and_table.second);
return getQuotedTable(db_and_table.first, db_and_table.second);
}
@ -467,7 +472,7 @@ String DB::TaskShard::getDescription() const
std::stringstream ss;
ss << "N" << numberInCluster()
<< " (having a replica " << getHostNameExample()
<< ", pull table " + getDatabaseDotTable(task_table.table_pull)
<< ", pull table " + getQuotedTable(task_table.table_pull)
<< " of cluster " + task_table.cluster_pull_name << ")";
return ss.str();
}
@ -1296,7 +1301,7 @@ protected:
/// Remove all status nodes
zookeeper->tryRemoveRecursive(current_shards_path);
String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push);
String query = "ALTER TABLE " + getQuotedTable(task_table.table_push);
query += " DROP PARTITION " + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310
@ -1539,7 +1544,7 @@ protected:
auto get_select_query = [&] (const DatabaseAndTableName & from_table, const String & fields, String limit = "")
{
String query;
query += "SELECT " + fields + " FROM " + getDatabaseDotTable(from_table);
query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
/// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
if (!task_table.where_condition_str.empty())
@ -1677,7 +1682,7 @@ protected:
LOG_DEBUG(log, "Create destination tables. Query: " << query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push,
PoolMode::GET_MANY);
LOG_DEBUG(log, "Destination tables " << getDatabaseDotTable(task_table.table_push) << " have been created on " << shards
LOG_DEBUG(log, "Destination tables " << getQuotedTable(task_table.table_push) << " have been created on " << shards
<< " shards of " << task_table.cluster_push->getShardCount());
}
@ -1699,7 +1704,7 @@ protected:
ASTPtr query_insert_ast;
{
String query;
query += "INSERT INTO " + getDatabaseDotTable(task_shard.table_split_shard) + " VALUES ";
query += "INSERT INTO " + getQuotedTable(task_shard.table_split_shard) + " VALUES ";
ParserQuery p_query(query.data() + query.size());
query_insert_ast = parseQuery(p_query, query, 0);
@ -1824,7 +1829,7 @@ protected:
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings * settings = nullptr)
{
String query = "SHOW CREATE TABLE " + getDatabaseDotTable(table);
String query = "SHOW CREATE TABLE " + getQuotedTable(table);
Block block = getBlockWithAllStreamData(std::make_shared<RemoteBlockInputStream>(
connection, query, InterpreterShowCreateQuery::getSampleBlock(), context, settings));
@ -1887,7 +1892,7 @@ protected:
{
WriteBufferFromOwnString wb;
wb << "SELECT DISTINCT " << queryToString(task_table.engine_push_partition_key_ast) << " AS partition FROM"
<< " " << getDatabaseDotTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
<< " " << getQuotedTable(task_shard.table_read_shard) << " ORDER BY partition DESC";
query = wb.str();
}
@ -1929,7 +1934,7 @@ protected:
{
WriteBufferFromOwnString wb;
wb << "SELECT 1"
<< " FROM "<< getDatabaseDotTable(task_shard.table_read_shard)
<< " FROM "<< getQuotedTable(task_shard.table_read_shard)
<< " WHERE " << queryToString(task_table.engine_push_partition_key_ast) << " = " << partition_quoted_name
<< " LIMIT 1";
query = wb.str();