Speed up initialization, fixed settings passing. [#CLICKHOUSE-3346]

This commit is contained in:
Vitaliy Lyudvichenko 2018-03-01 21:33:27 +03:00
parent d6635c3bf5
commit 270b6c968e
5 changed files with 173 additions and 98 deletions

View File

@ -27,7 +27,6 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterExistsQuery.h>
#include <Interpreters/InterpreterShowCreateQuery.h>
#include <Interpreters/InterpreterFactory.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -62,6 +61,7 @@
#include <Server/StatusFile.h>
#include <Storages/registerStorages.h>
#include <Common/formatReadable.h>
#include <daemon/OwnPatternFormatter.h>
namespace DB
@ -157,12 +157,12 @@ struct TaskTable;
struct TaskCluster;
struct ClusterPartition;
using TasksPartition = std::map<String, ShardPartition>;
using TasksPartition = std::map<String, ShardPartition, std::greater<String>>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = std::shared_ptr<TaskShard>;
using TasksShard = std::vector<TaskShardPtr>;
using TasksTable = std::list<TaskTable>;
using ClusterPartitions = std::map<String, ClusterPartition>;
using ClusterPartitions = std::map<String, ClusterPartition, std::greater<String>>;
/// Just destination partition of a shard
@ -207,6 +207,7 @@ struct TaskShard
UInt32 indexInCluster() const { return info.shard_num - 1; }
String getDescription() const;
String getHostNameExample() const;
/// Used to sort clusters by thier proximity
ShardPriority priority;
@ -452,11 +453,19 @@ String TaskTable::getPartitionIsDirtyPath(const String & partition_name) const
String DB::TaskShard::getDescription() const
{
return "" + toString(numberInCluster())
+ " of pull table " + getDatabaseDotTable(task_table.table_pull)
+ " of cluster " + task_table.cluster_pull_name;
std::stringstream ss;
ss << "N" << numberInCluster()
<< " (with a replica " << getHostNameExample() << ","
<< " of pull table " + getDatabaseDotTable(task_table.table_pull)
<< " of cluster " + task_table.cluster_pull_name << ")";
return ss.str();
}
String DB::TaskShard::getHostNameExample() const
{
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}
static bool isExtedndedDefinitionStorage(const ASTPtr & storage_ast)
@ -743,93 +752,148 @@ public:
task_table.initShards(task_cluster->random_engine);
}
LOG_DEBUG(log, "Loaded " << task_cluster->table_tasks.size() << " table tasks");
LOG_DEBUG(log, "Will process " << task_cluster->table_tasks.size() << " table tasks");
/// Compute set of partitions, assume set of partitions aren't changed during the processing
for (auto & task_table : task_cluster->table_tasks)
{
LOG_DEBUG(log, "Set up table task " << task_table.table_id);
for (const TaskShardPtr & task_shard : task_table.all_shards)
{
if (task_shard->info.pool == nullptr)
{
throw Exception("It is impossible to have only local shards, at least port number must be different",
ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(log, "Set up shard " << task_shard->getDescription());
LOG_DEBUG(log, "There are " << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones");
auto existing_partitions_names = getShardPartitions(*task_shard);
Strings filtered_partitions_names;
/// Check that user specified correct partition names
auto check_partition_format = [&] (const String & partition_text_quoted)
{
const DataTypePtr & type = task_shard->partition_key_column.type;
MutableColumnPtr column_dummy = type->createColumn();
ReadBufferFromString rb(partition_text_quoted);
try
{
type->deserializeTextQuoted(*column_dummy, rb);
}
catch (Exception & e)
{
throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
}
};
if (task_table.has_enabled_partitions)
{
/// Process partition in order specified by <enabled_partitions/>
for (const String & partition_name : task_table.enabled_partitions)
{
check_partition_format(partition_name);
auto it = existing_partitions_names.find(partition_name);
/// Do not process partition if it is not in enabled_partitions list
if (it == existing_partitions_names.end())
{
LOG_WARNING(log, "There is no enabled " << partition_name << " specified in enabled_partitions in shard "
<< task_shard->getDescription());
continue;
}
filtered_partitions_names.emplace_back(*it);
}
for (const String & partition_name : existing_partitions_names)
{
if (!task_table.enabled_partitions_set.count(partition_name))
{
LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in "
<< "enabled_partitions of " << task_table.table_id);
}
}
}
else
{
for (const String & partition_name : existing_partitions_names)
filtered_partitions_names.emplace_back(partition_name);
}
for (const String & partition_name : filtered_partitions_names)
{
task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name));
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
cluster_partition.shards.emplace_back(task_shard);
}
LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription());
}
}
/// Do not initialize tables, will make deferred initialization in process()
getZooKeeper()->createAncestors(getWorkersPath() + "/");
}
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void initTaskTable(TaskTable & task_table)
{
LOG_INFO(log, "Set up table task " << task_table.table_id);
LOG_DEBUG(log, "There are " << task_table.all_shards.size() << " shards, " << task_table.local_shards.size() << " of them are local ones");
/// Check that user specified correct partition names
auto check_partition_format = [] (const DataTypePtr & type, const String & partition_text_quoted)
{
MutableColumnPtr column_dummy = type->createColumn();
ReadBufferFromString rb(partition_text_quoted);
try
{
type->deserializeTextQuoted(*column_dummy, rb);
}
catch (Exception & e)
{
throw Exception("Partition " + partition_text_quoted + " has incorrect format. " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
}
};
auto retry = [this] (auto && func, size_t max_tries)
{
std::exception_ptr exception;
for (size_t try_number = 1; try_number <= max_tries; ++try_number)
{
try
{
return func();
}
catch (...)
{
exception = std::current_exception();
if (try_number < max_tries)
{
tryLogCurrentException(log, "Will retry");
std::this_thread::sleep_for(default_sleep_time);
}
}
}
std::rethrow_exception(exception);
};
/// Fetch partitions list from a shard
auto process_shard = [this, check_partition_format, retry] (const TaskShardPtr & task_shard)
{
TaskTable & task_table = task_shard->task_table;
LOG_INFO(log, "Set up shard " << task_shard->getDescription());
auto get_partitions = [&] () { return getShardPartitions(*task_shard); };
auto existing_partitions_names = retry(get_partitions, 60);
Strings filtered_partitions_names;
Strings missing_partitions;
if (task_table.has_enabled_partitions)
{
/// Process partition in order specified by <enabled_partitions/>
for (const String & partition_name : task_table.enabled_partitions)
{
/// Check that user specified correct partition names
check_partition_format(task_shard->partition_key_column.type, partition_name);
auto it = existing_partitions_names.find(partition_name);
/// Do not process partition if it is not in enabled_partitions list
if (it == existing_partitions_names.end())
{
missing_partitions.emplace_back(partition_name);
continue;
}
filtered_partitions_names.emplace_back(*it);
}
for (const String & partition_name : existing_partitions_names)
{
if (!task_table.enabled_partitions_set.count(partition_name))
{
LOG_DEBUG(log, "Partition " << partition_name << " will not be processed, since it is not in "
<< "enabled_partitions of " << task_table.table_id);
}
}
}
else
{
for (const String & partition_name : existing_partitions_names)
filtered_partitions_names.emplace_back(partition_name);
}
for (const String & partition_name : filtered_partitions_names)
{
task_shard->partition_tasks.emplace(partition_name, ShardPartition(*task_shard, partition_name));
}
if (!missing_partitions.empty())
{
std::stringstream ss;
for (const String & missing_partition : missing_partitions)
ss << " " << missing_partition;
LOG_WARNING(log, "There are no " << missing_partitions.size() << " partitions from enabled_partitions in shard "
<< task_shard->getDescription() << " :" << ss.str());
}
LOG_DEBUG(log, "Will copy " << task_shard->partition_tasks.size() << " partitions from shard " << task_shard->getDescription());
};
{
ThreadPool thread_pool(2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([task_shard, process_shard]() { process_shard(task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
}
/// After partitions of each shard are initialized, initialize cluster partitions
for (const TaskShardPtr & task_shard : task_table.all_shards)
{
for (const auto & partition : task_shard->partition_tasks)
{
const String & partition_name = partition.first;
ClusterPartition & cluster_partition = task_table.cluster_partitions[partition_name];
cluster_partition.shards.emplace_back(task_shard);
}
}
}
void reloadTaskDescription()
{
String task_config_str;
@ -973,6 +1037,9 @@ public:
if (task_table.all_shards.empty())
continue;
/// Deferred initialization
initTaskTable(task_table);
task_table.watch.restart();
bool table_is_done = false;
@ -1441,7 +1508,8 @@ protected:
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", inject_fault ? "1" : "");
LOG_DEBUG(log, "Executing SELECT query: " << queryToString(query_select_ast));
LOG_DEBUG(log, "Executing SELECT query and pull from " << task_shard.getDescription()
<< " : " << queryToString(query_select_ast));
ASTPtr query_insert_ast;
{
@ -1609,7 +1677,7 @@ protected:
return parseQuery(parser_create_query, create_query_pull_str);
}
void createShardInternalTables(TaskShard & task_shard)
void createShardInternalTables(TaskShard & task_shard, bool create_split = true)
{
TaskTable & task_table = task_shard.task_table;
@ -1635,17 +1703,16 @@ protected:
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_split_shard, storage_split_ast);
//LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
dropAndCreateLocalTable(create_table_pull_ast);
//LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
dropAndCreateLocalTable(create_table_split_ast);
if (create_split)
dropAndCreateLocalTable(create_table_split_ast);
}
std::set<String> getShardPartitions(TaskShard & task_shard)
{
createShardInternalTables(task_shard);
createShardInternalTables(task_shard, false);
TaskTable & task_table = task_shard.task_table;
@ -1663,6 +1730,7 @@ protected:
ASTPtr query_ast = parseQuery(parser_query, query);
Context local_context = context;
local_context.setSettings(task_cluster->settings_pull);
Block block = getBlockWithAllStreamData(InterpreterFactory::get(query_ast, local_context)->execute().in);
std::set<String> res;
@ -1911,8 +1979,8 @@ void ClusterCopierApp::setupLogging()
console_channel->open();
}
Poco::AutoPtr<Poco::PatternFormatter> formatter(new Poco::PatternFormatter);
formatter->setProperty("pattern", "%L%Y-%m-%d %H:%M:%S.%i <%p> %s: %t");
Poco::AutoPtr<OwnPatternFormatter> formatter = new OwnPatternFormatter(nullptr);
formatter->setProperty("times", "local");
Poco::AutoPtr<Poco::FormattingChannel> formatting_channel(new Poco::FormattingChannel(formatter));
formatting_channel->setChannel(split_channel);
split_channel->open();

View File

@ -24,7 +24,7 @@
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /cluster.db.table - directory of table_hits task
* /cluster.db.table1 - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
@ -45,7 +45,7 @@
* During this procedure a single 'cleaner' worker is selected. The worker waits for stopping all partition
* workers, removes /shards node, executes DROP PARTITION on each destination node and removes /is_dirty node.
* /cleaner- An ephemeral node used to select 'cleaner' worker. Contains ID of the worker.
* /test_visits
* /cluster.db.table2
* ...
*/

View File

@ -3,6 +3,8 @@
<profiles>
<default>
<log_queries>1</log_queries>
<!-- Just to test settings_pull -->
<max_rows_in_distinct>5</max_rows_in_distinct>
</default>
</profiles>

View File

@ -10,6 +10,7 @@
<!-- Setting used to fetch data -->
<settings_pull>
<max_rows_in_distinct>0</max_rows_in_distinct>
</settings_pull>
<!-- Setting used to insert data -->

View File

@ -8,6 +8,10 @@
<connect_timeout>1</connect_timeout>
</settings>
<settings_pull>
<max_rows_in_distinct>0</max_rows_in_distinct>
</settings_pull>
<!-- Tasks -->
<tables>
<AB>