Minimal working example. [#CLICKHOUSE-3346]

This commit is contained in:
Vitaliy Lyudvichenko 2017-11-09 21:06:36 +03:00
parent c818beedcb
commit e1fafb1f2f
7 changed files with 521 additions and 174 deletions

View File

@ -131,11 +131,12 @@ private:
struct OpResult : public zoo_op_result_t
{
/// Указатели в этой структуре указывают на поля в классе Op.
/// Поэтому деструктор не нужен
/// Pointers in this class point to fields of class Op.
/// Op instances have the same (or longer lifetime), therefore destructor is not required.
};
using Ops = std::vector<std::unique_ptr<Op>>;
using OpPtr = std::unique_ptr<Op>;
using Ops = std::vector<OpPtr>;
using OpResults = std::vector<OpResult>;
using OpResultsPtr = std::shared_ptr<OpResults>;
using Strings = std::vector<std::string>;

View File

@ -28,6 +28,15 @@ namespace CurrentMetrics
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace zkutil
{
@ -1001,4 +1010,19 @@ ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
return asyncMultiImpl(ops, true);
}
size_t getFailedOpIndex(const OpResultsPtr & op_results)
{
if (!op_results)
throw DB::Exception("OpResults is nullptr", DB::ErrorCodes::LOGICAL_ERROR);
for (size_t index = 0; index < op_results->size(); ++index)
{
if ((*op_results)[index].err != ZOK)
return index;
}
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -420,6 +420,11 @@ private:
bool is_dirty = false;
};
/// Returns first op which op_result != ZOK or throws an exception
size_t getFailedOpIndex(const OpResultsPtr & op_results);
using ZooKeeperPtr = ZooKeeper::Ptr;

View File

@ -4,6 +4,7 @@
#include <vector>
#include <memory>
#include <boost/noncopyable.hpp>
#include "IBlockInputStream.h"
namespace DB

View File

@ -16,20 +16,21 @@ bool isAtomicSet(std::atomic<bool> * val)
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
template <typename Pred>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, Pred && is_cancelled)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (isAtomicSet(is_cancelled))
if (is_cancelled())
break;
to.write(block);
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
/// For outputting additional information in some formats.
@ -42,11 +43,28 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
to.setExtremes(input->getExtremes());
}
if (isAtomicSet(is_cancelled))
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <functional>
namespace DB
@ -14,4 +15,6 @@ class IBlockOutputStream;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
}

View File

@ -52,6 +52,10 @@
#include <Parsers/ASTLiteral.h>
#include <Interpreters/executeQuery.h>
#include <IO/WriteBufferNull.h>
#include <Parsers/ParserQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterFactory.h>
namespace DB
@ -62,6 +66,7 @@ namespace ErrorCodes
extern const int NO_ZOOKEEPER;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_TABLE;
extern const int UNFINISHED;
}
@ -86,12 +91,6 @@ namespace
using DatabaseAndTableName = std::pair<String, String>;
struct TaskCluster;
struct TaskTable;
struct TaskShard;
struct TaskPartition;
enum class TaskState
{
Started = 0,
@ -108,6 +107,11 @@ struct TaskStateWithOwner
TaskState state{TaskState::Unknown};
String owner;
static String getData(TaskState state, const String & owner)
{
return TaskStateWithOwner(state, owner).toString();
}
String toString()
{
WriteBufferFromOwnString wb;
@ -132,26 +136,35 @@ struct TaskStateWithOwner
};
/// Hierarchical task description
struct TaskPartition;
struct TaskShard;
struct TaskTable;
struct TaskCluster;
using TasksPartition = std::vector<TaskPartition>;
using ShardInfo = Cluster::ShardInfo;
using TaskShardPtr = TaskShard *;
using TasksShard = std::vector<TaskShard>;
using TasksShardPtrs = std::vector<TaskShard *>;
using TasksTable = std::list<TaskTable>;
using PartitionToShards = std::map<String, TasksShardPtrs>;
struct TaskPartition
{
TaskPartition(TaskShard & parent, const String & name_) : task_shard(parent), name(name_) {}
String getCommonPartitionZooKeeperPath() const;
String getZooKeeperPath() const;
String getCommonPartitionPath() const;
String getCommonPartitionIsDirtyPath() const;
String getCommonPartitionActiveWorkersPath() const;
String getActiveWorkerPath() const;
String getCommonPartitionShardsPath() const;
String getShardStatusPath() const;
TaskShard & task_shard;
String name;
String create_query_pull;
ASTPtr create_query_pull_ast;
ASTPtr create_query_push;
};
using TasksPartition = std::vector<TaskPartition>;
using ShardInfo = Cluster::ShardInfo;
struct TaskShard
{
TaskShard(TaskTable & parent, const ShardInfo & info_) : task_table(parent), info(info_) {}
@ -164,11 +177,6 @@ struct TaskShard
TasksPartition partitions;
};
using TaskShardPtr = TaskShard *;
using TasksShard = std::vector<TaskShard>;
using TasksShardPtrs = std::vector<TaskShard *>;
struct TaskTable
{
TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix,
@ -182,12 +190,10 @@ struct TaskTable
/// Source cluster and table
String cluster_pull_name;
DatabaseAndTableName table_pull;
String db_table_pull;
/// Destination cluster and table
String cluster_push_name;
DatabaseAndTableName table_push;
String db_table_push;
/// Storage of destination table
String engine_push_str;
@ -197,8 +203,8 @@ struct TaskTable
DatabaseAndTableName table_split;
String sharding_key_str;
ASTPtr sharding_key_ast;
String engine_proxy_str;
ASTPtr engine_proxy_ast;
ASTPtr engine_split_ast;
String engine_split_str;
/// Additional WHERE expression to filter input data
String where_condition_str;
@ -220,13 +226,12 @@ struct TaskTable
/// Prioritized list of shards
Shards shards_pull;
PartitionToShards partition_to_shards;
template <class URNG>
void initShards(URNG && urng);
};
using TasksTable = std::list<TaskTable>;
struct TaskCluster
{
TaskCluster(const String & task_zookeeper_path_, const Poco::Util::AbstractConfiguration & config, const String & base_key, const String & default_local_database_);
@ -269,6 +274,43 @@ String getDatabaseDotTable(const DatabaseAndTableName & db_and_table)
return getDatabaseDotTable(db_and_table.first, db_and_table.second);
}
/// Detailed status of ZooKeeper multi operation
struct MultiOpStatus
{
int32_t code = ZOK;
int failed_op_index = 0;
zkutil::OpPtr failed_op;
};
/// Atomically checks that is_dirty node is not exists, and made the remaining op
/// Returns relative number of failed operation in the second field (the passed op has 0 index)
static MultiOpStatus checkNoNodeAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
const String & checking_node_path,
zkutil::OpPtr && op)
{
zkutil::Ops ops;
ops.emplace_back(std::make_unique<zkutil::Op::Create>(checking_node_path, "", zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(checking_node_path, -1));
ops.emplace_back(std::move(op));
MultiOpStatus res;
zkutil::OpResultsPtr results;
res.code = zookeeper->tryMulti(ops, &results);
if (res.code != ZOK)
{
auto index = zkutil::getFailedOpIndex(results);
res.failed_op_index = static_cast<int>(index) - 2;
res.failed_op = ops.at(index)->clone();
}
return res;
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
std::shared_ptr<ASTStorage> createASTStorageDistributed(
const String & cluster_name, const String & database, const String & table, const ASTPtr & sharding_key_ast = nullptr)
{
@ -305,19 +347,38 @@ Block getBlockWithAllStreamData(const BlockInputStreamPtr & stream)
}
String TaskPartition::getCommonPartitionZooKeeperPath() const
String TaskPartition::getCommonPartitionPath() const
{
return task_shard.task_table.task_cluster.task_zookeeper_path // root
+ "/table_" + escapeForFileName(task_shard.task_table.name_in_config) // table_test.hits
+ "/" + name; // 201701
}
String TaskPartition::getZooKeeperPath() const
String TaskPartition::getShardStatusPath() const
{
return getCommonPartitionZooKeeperPath() // /root/table_test.hits/201701
+ "/" + toString(task_shard.info.shard_num); // 1 (the first shard)
return getCommonPartitionPath() // /root/table_test.hits/201701
+ "/shards/" + toString(task_shard.info.shard_num); // 1 (the first shard)
}
String TaskPartition::getCommonPartitionShardsPath() const
{
return getCommonPartitionPath() + "/shards";
}
String TaskPartition::getCommonPartitionActiveWorkersPath() const
{
return getCommonPartitionPath() + "/active_workers";
}
String TaskPartition::getActiveWorkerPath() const
{
return getCommonPartitionActiveWorkersPath() + "/partition_workers/" + toString(task_shard.info.shard_num);
}
String TaskPartition::getCommonPartitionIsDirtyPath() const
{
return getCommonPartitionPath() + "/is_dirty";
}
TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config, const String & prefix_,
const String & table_key)
@ -332,11 +393,9 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
table_pull.first = config.getString(table_prefix + "database_pull");
table_pull.second = config.getString(table_prefix + "table_pull");
db_table_pull = getDatabaseDotTable(table_pull);
table_push.first = config.getString(table_prefix + "database_push");
table_push.second = config.getString(table_prefix + "table_push");
db_table_push = getDatabaseDotTable(table_push);
engine_push_str = config.getString(table_prefix + "engine");
{
@ -348,8 +407,8 @@ TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfigurati
{
ParserExpressionWithOptionalAlias parser_expression(false);
sharding_key_ast = parseQuery(parser_expression, sharding_key_str);
engine_proxy_ast = createASTStorageDistributed(cluster_pull_name, table_pull.first, table_pull.second, sharding_key_ast);
engine_proxy_str = queryToString(engine_proxy_ast);
engine_split_ast = createASTStorageDistributed(cluster_push_name, table_push.first, table_push.second, sharding_key_ast);
engine_split_str = queryToString(engine_split_ast);
table_split = DatabaseAndTableName(task_cluster.default_local_database, ".split." + name_in_config);
}
@ -481,6 +540,10 @@ public:
settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value);
settings_pull.preferred_block_size_bytes = 0;
Settings & settings_push = task_cluster->settings_push;
settings_push.insert_distributed_timeout = 0;
settings_push.insert_distributed_sync = 1;
/// Set up clusters
context.setClustersConfig(task_cluster_config, task_cluster->clusters_prefix);
@ -511,7 +574,7 @@ public:
LOG_DEBUG(log, "Set up table task " << task_table.name_in_config << " ("
<< "cluster " << task_table.cluster_pull_name
<< ", table " << task_table.db_table_pull
<< ", table " << getDatabaseDotTable(task_table.table_pull)
<< ", shard " << task_shard->info.shard_num << ")");
LOG_DEBUG(log, "There are "
@ -523,9 +586,12 @@ public:
Strings partitions = getRemotePartitions(task_table.table_pull, *task_shard->connection_entry, &task_cluster->settings_pull);
for (const String & partition_name : partitions)
{
task_shard->partitions.emplace_back(*task_shard, partition_name);
task_table.partition_to_shards[partition_name].emplace_back(task_shard);
}
LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " parts");
LOG_DEBUG(log, "Will fetch " << task_shard->partitions.size() << " partitions");
}
}
@ -537,7 +603,18 @@ public:
{
for (TaskTable & task_table : task_cluster->table_tasks)
{
for (TaskShardPtr task_shard : task_table.shards_pull.all_shards_prioritized)
auto & shards = task_table.shards_pull.all_shards_prioritized;
if (shards.empty())
continue;
/// Try process all partitions of the first shard
for (TaskPartition & task_partition : shards[0]->partitions)
{
processPartitionTask(task_partition);
}
for (TaskShardPtr task_shard : shards)
{
for (TaskPartition & task_partition : task_shard->partitions)
{
@ -563,8 +640,6 @@ protected:
{
while (true)
{
auto zookeeper = getZooKeeper();
zkutil::Stat stat;
zookeeper->get(getWorkersPath(), &stat);
@ -572,9 +647,7 @@ protected:
{
LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")"
<< ". Postpone processing " << task_description);
using namespace std::literals::chrono_literals;
std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(default_sleep_time);
}
else
{
@ -602,145 +675,333 @@ protected:
return res;
}
bool processPartitionTask(TaskPartition & task_partition)
bool tryDropPartition(TaskPartition & task_partition, const zkutil::ZooKeeperPtr & zookeeper)
{
auto zookeeper = getZooKeeper();
TaskTable & task_table = task_partition.task_shard.task_table;
String partition_task_node = task_partition.getZooKeeperPath();
String partition_task_active_node = partition_task_node + "/active_worker";
String partition_task_status_node = partition_task_node + "/state";
String current_shards_path = task_partition.getCommonPartitionShardsPath();
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String dirt_cleaner_path = is_dirty_flag_path + "/cleaner";
/// Load balancing
auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, partition_task_node);
/// Create ephemeral node to mark that we are active
zookeeper->createAncestors(partition_task_active_node);
zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
zkutil::EphemeralNodeHolder::Ptr cleaner_holder;
try
{
partition_task_node_holder = zkutil::EphemeralNodeHolder::create(partition_task_active_node, *zookeeper, host_id);
cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id);
}
catch (const zkutil::KeeperException & e)
catch (zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already processing " << partition_task_node);
LOG_DEBUG(log, "Partition " << task_partition.name << " is cleaning now by somebody, sleep");
std::this_thread::sleep_for(default_sleep_time);
return false;
}
throw;
}
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
/// We need to update table definitions for each part, it could be changed after ALTER
String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *task_shard.connection_entry,
&task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
ASTPtr create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str);
DatabaseAndTableName table_pull(working_database_name, ".pull." + task_table.name_in_config);
DatabaseAndTableName table_split(working_database_name, ".split." + task_table.name_in_config);
String table_pull_cluster_name = ".pull." + task_table.cluster_pull_name;
size_t current_shard_index = task_shard.info.shard_num - 1;
ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(current_shard_index);
context.setCluster(table_pull_cluster_name, cluster_pull_current_shard);
auto storage_pull_ast = createASTStorageDistributed(table_pull_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_proxy_ast;
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_pull, storage_pull_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast);
LOG_DEBUG(log, "Create current pull table. Query: " << queryToString(create_table_pull_ast));
dropAndCreateLocalTable(create_table_pull_ast);
LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
dropAndCreateLocalTable(create_table_split_ast);
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast);
LOG_DEBUG(log, "Push table create query: " << queryToString(create_query_push_ast));
TaskStateWithOwner start_state(TaskState::Started, host_id);
auto code = zookeeper->tryCreate(partition_task_status_node, start_state.toString(), zkutil::CreateMode::Persistent);
if (code == ZNODEEXISTS)
zkutil::Stat stat;
if (zookeeper->exists(current_shards_path, &stat))
{
auto status = TaskStateWithOwner::fromString(zookeeper->get(partition_task_status_node));
if (status.state == TaskState::Finished)
if (stat.numChildren != 0)
{
LOG_DEBUG(log, "Task " << partition_task_node << " has been executed by " << status.owner);
return true;
}
else
{
LOG_DEBUG(log, "Found abandoned task " << partition_task_node);
/// Restart shard
LOG_DEBUG(log, "Partition " << task_partition.name << " contains " << stat.numChildren << " active workers, sleep");
std::this_thread::sleep_for(default_sleep_time);
return false;
}
}
else if (code != ZOK)
throw zkutil::KeeperException(code, partition_task_status_node);
/// Try create table (if not exists) on each shard
/// Remove all status nodes
zookeeper->tryRemoveRecursive(current_shards_path);
String query = "ALTER TABLE " + getDatabaseDotTable(task_table.table_push) +
" DROP PARTITION ID " + backQuoteIfNeed(task_partition.name);
ClusterPtr & cluster_push = task_table.cluster_push;
Settings & settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP PARTITION: " << query);
/// Limit number of max executing replicas to 1
size_t num_shards = executeQueryOnCluster(cluster_push, query, nullptr, &settings_push, PoolMode::GET_ALL, 1);
if (num_shards < cluster_push->getShardCount())
{
auto query_ast = create_query_push_ast->clone();
typeid_cast<ASTCreateQuery &>(*query_ast).if_not_exists = true;
String query = queryToString(query_ast);
LOG_DEBUG(log, "Create remote tables " << query);
executeQueryOnOneReplicaAtLeast(task_table.cluster_push, query_ast, query, &task_cluster->settings_push);
LOG_INFO(log, "DROP PARTITION wasn't successfully executed on " << cluster_push->getShardCount() - num_shards << " shards");
return false;
}
/// Do the main action
/// Remove the locking node
zookeeper->remove(is_dirty_flag_path);
return true;
}
bool processPartitionTask(TaskPartition & task_partition)
{
try
{
String query;
{
std::stringstream ss;
ss << "INSERT INTO " << getDatabaseDotTable(table_split)
<< " SELECT * FROM " << getDatabaseDotTable(table_pull)
<< " WHERE (_part LIKE '" << task_partition.name << "%')";
return processPartitionTaskImpl(task_partition);
}
catch (...)
{
tryLogCurrentException(log, "An error occurred while processing partition " + task_partition.name);
return false;
}
}
if (!task_table.where_condition_str.empty())
ss << " AND (" + task_table.where_condition_str + ")";
bool processPartitionTaskImpl(TaskPartition & task_partition)
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
query = ss.str();
}
auto zookeeper = getZooKeeper();
auto acl = zookeeper->getDefaultACL();
LOG_DEBUG(log, "Executing query: " << query);
String is_dirty_flag_path = task_partition.getCommonPartitionIsDirtyPath();
String current_task_is_active_path = task_partition.getActiveWorkerPath();
String current_task_status_path = task_partition.getShardStatusPath();
/// Load balancing
auto worker_node_holder = createWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path);
LOG_DEBUG(log, "Processing " << current_task_status_path);
/// Do not start if partition is dirty, try to clean it
if (zookeeper->exists(is_dirty_flag_path))
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is dirty, try to drop it");
try
{
Context local_context = context;
local_context.getSettingsRef() = task_cluster->settings_push;
ReadBufferFromString istr(query);
WriteBufferNull ostr;
executeQuery(istr, ostr, false, local_context, {});
tryDropPartition(task_partition, zookeeper);
}
catch (...)
{
tryLogCurrentException(log);
tryLogCurrentException(log, "An error occurred while clean partition");
}
return false;
}
/// Create ephemeral node to mark that we are active
zookeeper->createAncestors(current_task_is_active_path);
zkutil::EphemeralNodeHolderPtr partition_task_node_holder;
try
{
partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_is_active_path, *zookeeper, host_id);
}
catch (const zkutil::KeeperException & e)
{
if (e.code == ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path);
return false;
}
throw;
}
/// Exit if task has been already processed, create blocking node if it is abandoned
{
String status_data;
if (zookeeper->tryGet(current_task_status_path, status_data))
{
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "Task " << current_task_status_path << " has been successfully executed by " << status.owner);
return true;
}
// Task is abandoned, initialize DROP PARTITION
LOG_DEBUG(log, "Task " << current_task_status_path << " has not been successfully finished by " << status.owner);
zookeeper->create(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
return false;
}
}
TaskStateWithOwner state_finish(TaskState::Started, host_id);
zkutil::Stat stat;
code = zookeeper->trySet(partition_task_status_node, state_finish.toString(), 0, &stat);
zookeeper->createAncestors(current_task_status_path);
if (code == ZBADVERSION)
/// We need to update table definitions for each part, it could be changed after ALTER
ASTPtr create_query_pull_ast;
{
LOG_DEBUG(log, "Someone made the node abandoned. Will refill partition");
return false;
/// Fetch and parse (possibly) new definition
String create_query_pull_str = getRemoteCreateTable(task_table.table_pull, *task_shard.connection_entry,
&task_cluster->settings_pull);
ParserCreateQuery parser_create_query;
create_query_pull_ast = parseQuery(parser_create_query, create_query_pull_str);
}
else if (code != ZOK)
throw zkutil::KeeperException(code, partition_task_status_node);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole cluster
DatabaseAndTableName table_shard(working_database_name, ".read_shard." + task_table.name_in_config);
DatabaseAndTableName table_split(working_database_name, ".split." + task_table.name_in_config);
{
/// Create special cluster with single shard
String shard_read_cluster_name = ".read_shard." + task_table.cluster_pull_name;
size_t current_shard_index = task_shard.info.shard_num - 1;
ClusterPtr cluster_pull_current_shard = task_table.cluster_pull->getClusterWithSingleShard(current_shard_index);
context.setCluster(shard_read_cluster_name, cluster_pull_current_shard);
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
const auto & storage_split_ast = task_table.engine_split_ast;
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_shard, storage_shard_ast);
auto create_table_split_ast = rewriteCreateQueryStorage(create_query_pull_ast, table_split, storage_split_ast);
LOG_DEBUG(log, "Create shard reading table. Query: " << queryToString(create_table_pull_ast));
dropAndCreateLocalTable(create_table_pull_ast);
LOG_DEBUG(log, "Create split table. Query: " << queryToString(create_table_split_ast));
dropAndCreateLocalTable(create_table_split_ast);
}
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner::getData(TaskState::Started, host_id);
auto op_create = std::make_unique<zkutil::Op::Create>(current_task_status_path, start_state, acl, zkutil::CreateMode::Persistent);
auto multi_status = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_create));
if (multi_status.code != ZOK)
{
if (multi_status.failed_op_index < 0)
{
LOG_INFO(log, "Partition " << task_partition.name << " is dirty and will be dropped and refilled");
return false;
}
throw zkutil::KeeperException(multi_status.code, current_task_status_path);
}
}
/// Try create table (if not exists) on each shard
{
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_pull_ast, task_table.table_push, task_table.engine_push_ast);
typeid_cast<ASTCreateQuery &>(*create_query_push_ast).if_not_exists = true;
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create remote push tables. Query: " << query);
executeQueryOnCluster(task_table.cluster_push, query, create_query_push_ast, &task_cluster->settings_push);
}
/// Do the copying
{
ASTPtr query_select_ast;
{
String query;
query += "SELECT * FROM " + getDatabaseDotTable(table_shard);
query += " WHERE (_part LIKE '" + task_partition.name + "%')";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
ParserQuery p_query(query.data() + query.size());
query_select_ast = parseQuery(p_query, query);
LOG_DEBUG(log, "Executing SELECT query: " << query);
}
ASTPtr query_insert_ast;
{
String query;
query += "INSERT INTO " + getDatabaseDotTable(table_split) + " VALUES ";
ParserQuery p_query(query.data() + query.size());
query_insert_ast = parseQuery(p_query, query);
LOG_DEBUG(log, "Executing INSERT query: " << query);
}
try
{
/// Custom INSERT SELECT implementation
Context context_select = context;
context_select.getSettingsRef() = task_cluster->settings_pull;
Context context_insert = context;
context_insert.getSettingsRef() = task_cluster->settings_push;
InterpreterSelectQuery interpreter_select(query_select_ast, context_select);
BlockIO io_select = interpreter_select.execute();
InterpreterInsertQuery interpreter_insert(query_insert_ast, context_insert);
BlockIO io_insert = interpreter_insert.execute();
using ExistsFuture = zkutil::ZooKeeper::ExistsFuture;
auto future_is_dirty = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
constexpr size_t check_period_milliseconds = 500;
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data
auto cancel_check = [&] ()
{
if (zookeeper->expired())
throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
if (future_is_dirty != nullptr)
{
zkutil::ZooKeeper::StatAndExists status;
try
{
status = future_is_dirty->get();
future_is_dirty.reset();
}
catch (zkutil::KeeperException & e)
{
future_is_dirty.reset();
if (e.isTemporaryError())
LOG_INFO(log, "ZooKeeper is lagging: " << e.displayText());
else
throw;
}
if (status.exists)
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
}
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
{
watch.restart();
future_is_dirty = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
}
return false;
};
/// Main work is here
copyData(*io_select.in, *io_insert.out, cancel_check);
// Just in case
if (future_is_dirty != nullptr)
future_is_dirty.get();
}
catch (...)
{
tryLogCurrentException(log, "An error occurred during copying, partition will be marked as dirty");
return false;
}
}
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto op_set = std::make_unique<zkutil::Op::SetData>(current_task_status_path, state_finished, 0);
auto multi_status = checkNoNodeAndCommit(zookeeper, is_dirty_flag_path, std::move(op_set));
if (multi_status.code != ZOK)
{
if (multi_status.failed_op_index < 0)
LOG_INFO(log, "Partition " << task_partition.name << " became dirty and will be dropped and refilled");
else
LOG_INFO(log, "Someone made the node abandoned. Will refill partition. " << ::zerror(multi_status.code));
return false;
}
}
LOG_DEBUG(log, "Data partition copied");
return true;
}
@ -810,64 +1071,94 @@ protected:
return res;
}
size_t executeQueryOnOneReplicaAtLeast(const ClusterPtr & cluster, const ASTPtr & query_ast, const String & query,
const Settings * settings = nullptr) const
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
size_t executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const ASTPtr & query_ast_ = nullptr,
const Settings * settings = nullptr,
PoolMode pool_mode = PoolMode::GET_ALL,
size_t max_successful_executions_per_shard = 0) const
{
auto num_shards = cluster->getShardsInfo().size();
std::vector<size_t> per_shard_num_sucessful_replicas(num_shards, 0);
std::vector<size_t> per_shard_num_successful_replicas(num_shards, 0);
ASTPtr query_ast;
if (query_ast_ == nullptr)
{
ParserQuery p_query(query.data() + query.size());
query_ast = parseQuery(p_query, query);
}
else
query_ast = query_ast_;
/// We need to execute query on one replica at least
auto do_for_shard = [&] (size_t shard_index)
{
const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
size_t num_sucessful_replicas = 0;
size_t & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
num_successful_executions = 0;
auto increment_and_check_exit = [&] ()
{
++num_successful_executions;
return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
};
/// In that case we don't have local replicas, but do it just in case
for (size_t i = 0; i < shard.getLocalNodeCount(); ++i)
{
InterpreterCreateQuery interpreter(query_ast, context);
interpreter.execute();
++num_sucessful_replicas;
auto interpreter = InterpreterFactory::get(query_ast, context);
interpreter->execute();
if (increment_and_check_exit())
return;
}
/// Will try to make as many as possible queries
if (shard.hasRemoteConnections())
{
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(settings, PoolMode::GET_ALL);
std::vector<IConnectionPool::Entry> connections = shard.pool->getMany(settings, pool_mode);
for (auto & connection : connections)
{
if (!connection.isNull())
{
RemoteBlockInputStream stream(*connection, query, context, settings);
NullBlockOutputStream output;
try
{
RemoteBlockInputStream stream(*connection, query, context, settings);
NullBlockOutputStream output;
copyData(stream, output);
++num_sucessful_replicas;
if (increment_and_check_exit())
return;
}
catch (const Exception & e)
{
tryLogCurrentException(log);
LOG_INFO(log, getCurrentExceptionMessage(false, true));
}
}
}
}
per_shard_num_sucessful_replicas[shard_index] = num_sucessful_replicas;
};
ThreadPool thread_pool(getNumberOfPhysicalCPUCores());
{
ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores()));
for (size_t shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.wait();
for (size_t shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
size_t sucessful_shards = 0;
for (size_t num_replicas : per_shard_num_sucessful_replicas)
sucessful_shards += (num_replicas > 0);
thread_pool.wait();
}
return sucessful_shards;
size_t successful_shards = 0;
for (size_t num_replicas : per_shard_num_successful_replicas)
successful_shards += (num_replicas > 0);
return successful_shards;
}
String getTableStructureAndCheckConsistency(TaskTable & table_task)
@ -896,7 +1187,7 @@ protected:
for (size_t i = 0; i < block.rows(); ++i)
{
if (structure_class_col.getElement(i) != 0)
throw Exception("Structures of table " + table_task.db_table_pull + " are different on cluster " +
throw Exception("Structures of table " + getDatabaseDotTable(table_task.table_pull) + " are different on cluster " +
table_task.cluster_pull_name, ErrorCodes::BAD_ARGUMENTS);
if (rows == 0)
@ -935,6 +1226,8 @@ private:
Context & context;
Poco::Logger * log;
std::chrono::milliseconds default_sleep_time{1000};
};
@ -965,15 +1258,17 @@ public:
po::variables_map options;
po::store(po::command_line_parser(argc, argv).options(options_desc).positional(positional_desc).run(), options);
po::notify(options);
if (options.count("help"))
{
std::cerr << "Copies tables from one cluster to another" << std::endl;
std::cerr << "Usage: clickhouse copier <config-file> <task-path>" << std::endl;
std::cerr << options_desc << std::endl;
return;
}
po::notify(options);
if (config_xml.empty() || !Poco::File(config_xml).exists())
throw Exception("ZooKeeper configuration file " + config_xml + " doesn't exist", ErrorCodes::BAD_ARGUMENTS);