This commit is contained in:
Nikita Mikhaylov 2021-04-22 21:04:32 +03:00
parent 2017d2f918
commit 90ab394769
14 changed files with 558 additions and 235 deletions

View File

@ -6,7 +6,9 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/setThreadName.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
@ -20,6 +22,12 @@ namespace ErrorCodes
}
std::string wrapWithColor(const std::string & value)
{
return "\u001b[36;1m" + value + "\u001b[0m";
}
void ClusterCopier::init()
{
auto zookeeper = getContext()->getZooKeeper();
@ -29,7 +37,7 @@ void ClusterCopier::init()
if (response.error != Coordination::Error::ZOK)
return;
UInt64 version = ++task_description_version;
LOG_DEBUG(log, "Task description should be updated, local version {}", version);
LOG_INFO(log, "Task description should be updated, local version {}", version);
};
task_description_path = task_zookeeper_path + "/description";
@ -50,7 +58,7 @@ void ClusterCopier::init()
task_table.initShards(task_cluster->random_engine);
}
LOG_DEBUG(log, "Will process {} table tasks", task_cluster->table_tasks.size());
LOG_INFO(log, "Will process {} table tasks", task_cluster->table_tasks.size());
/// Do not initialize tables, will make deferred initialization in process()
@ -138,7 +146,7 @@ void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts,
{
if (!task_table.enabled_partitions_set.count(partition_name))
{
LOG_DEBUG(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
LOG_INFO(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
}
}
}
@ -173,7 +181,7 @@ void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts,
LOG_WARNING(log, "There are no {} partitions from enabled_partitions in shard {} :{}", missing_partitions.size(), task_shard->getDescription(), ss.str());
}
LOG_DEBUG(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
LOG_INFO(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
}
void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads)
@ -189,9 +197,10 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
discoverShardPartitions(timeouts, task_shard);
});
LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
LOG_INFO(log, "Waiting for {} setup jobs", thread_pool.active());
thread_pool.wait();
}
std::cout << "discoverTablePartitions finished" << std::endl;
}
void ClusterCopier::uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
@ -213,7 +222,7 @@ void ClusterCopier::uploadTaskDescription(const std::string & task_path, const s
if (code != Coordination::Error::ZOK && force)
zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})",
LOG_INFO(log, "Task description {} uploaded to {} with result {} ({})",
((code != Coordination::Error::ZOK && !force) ? "not " : ""), local_task_description_path, code, Coordination::errorMessage(code));
}
@ -230,7 +239,7 @@ void ClusterCopier::reloadTaskDescription()
if (code != Coordination::Error::ZOK)
throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid);
LOG_INFO(log, "Loading description, zxid={}", task_description_current_stat.czxid);
auto config = getConfigurationFromXMLString(task_config_str);
/// Setup settings
@ -250,7 +259,7 @@ void ClusterCopier::updateConfigIfNeeded()
if (!is_outdated_version && !is_expired_session)
return;
LOG_DEBUG(log, "Updating task description");
LOG_INFO(log, "Updating task description");
reloadTaskDescription();
task_description_current_version = version_to_update;
@ -361,7 +370,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
if (static_cast<UInt64>(stat.numChildren) >= task_cluster->max_workers)
{
LOG_DEBUG(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
LOG_INFO(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
@ -387,7 +396,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
/// Try to make fast retries
if (num_bad_version_errors > 3)
{
LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
LOG_INFO(log, "A concurrent worker has just been added, will check free worker slots again");
std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution<int>(1, 1000)(task_cluster->random_engine));
std::this_thread::sleep_for(random_sleep_time);
num_bad_version_errors = 0;
@ -422,7 +431,7 @@ bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_tabl
{
bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
if (!piece_is_done)
LOG_DEBUG(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
LOG_INFO(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
answer &= piece_is_done;
}
@ -438,7 +447,7 @@ bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_tabl
bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
size_t piece_number, const TasksShard & shards_with_partition)
{
LOG_DEBUG(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
LOG_INFO(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
auto zookeeper = getContext()->getZooKeeper();
@ -530,7 +539,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
inject_fault = value < move_fault_probability;
}
LOG_DEBUG(log, "Try to move {} to destination table", partition_name);
LOG_INFO(log, "Try to move {} to destination table", partition_name);
auto zookeeper = getContext()->getZooKeeper();
@ -548,7 +557,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
LOG_INFO(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
return TaskStatus::Active;
}
@ -565,13 +574,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
LOG_INFO(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
LOG_INFO(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
/// Remove is_done marker.
zookeeper->remove(current_partition_attach_is_done);
@ -588,7 +597,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
/// Move partition to original destination table.
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
LOG_DEBUG(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
LOG_INFO(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
ASTPtr query_alter_ast;
String query_alter_ast_string;
@ -611,7 +620,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
" FROM " + getQuotedTable(helping_table);
LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
LOG_INFO(log, "Executing ALTER query: {}", query_alter_ast_string);
try
{
@ -620,9 +629,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
task_table.cluster_push,
query_alter_ast_string,
task_cluster->settings_push,
PoolMode::GET_MANY,
execution_mode,
max_successful_executions_per_shard);
execution_mode);
if (settings_push.replication_alter_partitions_sync == 1)
{
@ -645,7 +652,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
}
catch (...)
{
LOG_DEBUG(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
LOG_INFO(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
throw;
}
@ -660,20 +667,20 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_deduplicate_ast_string);
LOG_INFO(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_deduplicate_ast_string);
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_deduplicate_ast_string,
task_cluster->settings_push,
PoolMode::GET_MANY);
ClusterExecutionMode::ON_EACH_SHARD);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
LOG_INFO(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
throw;
}
}
@ -739,6 +746,8 @@ std::shared_ptr<ASTCreateQuery> rewriteCreateQueryStorage(const ASTPtr & create_
res->children.clear();
res->set(res->columns_list, create.columns_list->clone());
res->set(res->storage, new_storage_ast->clone());
/// Just to make it better and don't store additional flag like `is_table_created` somewhere else
res->if_not_exists = true;
return res;
}
@ -771,7 +780,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
LOG_INFO(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@ -784,7 +793,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (stat.numChildren != 0)
{
LOG_DEBUG(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
LOG_INFO(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@ -804,7 +813,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
LOG_INFO(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
return false;
}
@ -842,12 +851,11 @@ bool ClusterCopier::tryDropPartitionPiece(
/// It is important, DROP PARTITION must be done synchronously
settings_push.replication_alter_partitions_sync = 2;
LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
LOG_INFO(log, "Execute distributed DROP PARTITION: {}", query);
/// We have to drop partition_piece on each replica
size_t num_shards = executeQueryOnCluster(
cluster_push, query,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP PARTITION was successfully executed on {} nodes of a cluster.", num_shards);
@ -863,7 +871,7 @@ bool ClusterCopier::tryDropPartitionPiece(
}
else
{
LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
LOG_INFO(log, "Clean state is altered when dropping the partition, cowardly bailing");
/// clean state is stale
return false;
}
@ -907,7 +915,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
++cluster_partition.total_tries;
LOG_DEBUG(log, "Processing partition {} for the whole cluster", partition_name);
LOG_INFO(log, "Processing partition {} for the whole cluster", partition_name);
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
@ -929,7 +937,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
{
const size_t number_of_splits = task_table.number_of_splits;
shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name, number_of_splits));
LOG_DEBUG(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
LOG_INFO(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
/// To save references in the future.
auto shard_partition_it = shard->partition_tasks.find(partition_name);
PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;
@ -942,7 +950,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
}
else
{
LOG_DEBUG(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
LOG_INFO(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
continue;
}
}
@ -1100,18 +1108,18 @@ TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & t
InterpreterCreateQuery::prepareOnClusterQuery(create, getContext(), task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query));
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
// if (shards != task_table.cluster_push->getShardCount())
// {
// return TaskStatus::Error;
// }
}
catch (...)
{
@ -1226,17 +1234,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto create_is_dirty_node = [&] (const CleanStateClock & clock)
{
if (clock.is_stale())
LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
LOG_INFO(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
else if (!clock.is_clean())
LOG_DEBUG(log, "Thank you, Captain Obvious");
LOG_INFO(log, "Thank you, Captain Obvious");
else if (clock.discovery_version)
{
LOG_DEBUG(log, "Updating clean state clock");
LOG_INFO(log, "Updating clean state clock");
zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value());
}
else
{
LOG_DEBUG(log, "Creating clean state clock");
LOG_INFO(log, "Creating clean state clock");
zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
}
};
@ -1271,7 +1279,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// Load balancing
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task);
LOG_DEBUG(log, "Processing {}", current_task_piece_status_path);
LOG_INFO(log, "Processing {}", current_task_piece_status_path);
const String piece_status_path = partition_piece.getPartitionPieceShardsPath();
@ -1282,12 +1290,12 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// Do not start if partition piece is dirty, try to clean it
if (is_clean)
{
LOG_DEBUG(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
LOG_INFO(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
zookeeper->createAncestors(current_task_piece_status_path);
}
else
{
LOG_DEBUG(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
LOG_INFO(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
try
{
@ -1312,7 +1320,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path);
LOG_INFO(log, "Someone is already processing {}", current_task_piece_is_active_path);
return TaskStatus::Active;
}
@ -1328,13 +1336,13 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
LOG_DEBUG(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
LOG_INFO(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
LOG_DEBUG(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
LOG_INFO(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
create_is_dirty_node(clean_state_clock);
return TaskStatus::Error;
@ -1342,6 +1350,53 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
}
/// Try create table (if not exists) on each shard
/// We have to create this table even in case that partition piece is empty
/// This is significant, because we will have simplier code
{
/// 1) Get columns description from any replica of destination cluster
/// 2) Change ENGINE, database and table name
/// 3) Create helping table on the whole destination cluster
auto & settings_push = task_cluster->settings_push;
/// Get a connection to any shard to fetch `CREATE` query
auto connection = task_table.cluster_push->getAnyShardInfo().pool->get(timeouts, &settings_push, true);
/// Execute a request and get `CREATE` query as a string.
String create_query = getRemoteCreateTable(task_shard.task_table.table_push, *connection, settings_push);
/// Parse it to ASTPtr
ParserCreateQuery parser_create_query;
auto create_query_ast = parseQuery(parser_create_query, create_query, settings_push.max_query_size, settings_push.max_parser_depth);
/// Define helping table database and name for current partition piece
DatabaseAndTableName database_and_table_for_current_piece{
task_table.table_push.first,
task_table.table_push.second + "_piece_" + toString(current_piece_number)};
/// This is a bit of legacy, because we now could parse and engine AST from the whole create query.
/// But this is needed to make helping table non-replicated. We simply don't need this
auto new_engine_push_ast = task_table.engine_push_ast;
if (task_table.isReplicatedTable())
new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
/// Take columns definition from destination table, new database and table name, and new engine (non replicated variant of MergeTree)
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast);
String query = queryToString(create_query_push_ast);
LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query));
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
// if (shards != task_table.cluster_push->getShardCount())
// {
// return TaskStatus::Error;
// }
}
/// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
/// whether each shard have processed each partitition (and its pieces).
if (partition_piece.is_absent_piece)
@ -1349,9 +1404,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
if (res == Coordination::Error::ZNODEEXISTS)
LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
LOG_INFO(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
if (res == Coordination::Error::ZOK)
LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
LOG_INFO(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
return TaskStatus::Finished;
}
@ -1415,40 +1470,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent);
}
/// Try create table (if not exists) on each shard
{
/// Define push table for current partition piece
auto database_and_table_for_current_piece= std::pair<String, String>(
task_table.table_push.first,
task_table.table_push.second + "_piece_" + toString(current_piece_number));
auto new_engine_push_ast = task_table.engine_push_ast;
if (task_table.isReplicatedTable())
{
new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
}
auto create_query_push_ast = rewriteCreateQueryStorage(
task_shard.current_pull_table_create_query,
database_and_table_for_current_piece, new_engine_push_ast);
create_query_push_ast->as<ASTCreateQuery &>().if_not_exists = true;
String query = queryToString(create_query_push_ast);
LOG_DEBUG(log, "Create destination tables. Query: {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
if (shards != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
/// Do the copying
{
@ -1462,7 +1483,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
LOG_DEBUG(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
LOG_INFO(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
ASTPtr query_insert_ast;
{
@ -1473,7 +1494,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
const auto & settings = getContext()->getSettingsRef();
query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_DEBUG(log, "Executing INSERT query: {}", query);
LOG_INFO(log, "Executing INSERT query: {}", query);
}
try
@ -1491,8 +1512,24 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
input = io_select.getInputStream();
auto pure_input = io_select.getInputStream();
output = io_insert.out;
/// Add converting actions to make it possible to copy blocks with slightly different schema
const auto & select_block = pure_input->getHeader();
const auto & insert_block = output->getHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
select_block.getColumnsWithTypeAndName(),
insert_block.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Position);
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext()));
input = std::make_shared<ExpressionBlockInputStream>(pure_input, actions);
std::cout << "Input:" << std::endl;
std::cout << input->getHeader().dumpStructure() << std::endl;
std::cout << "Output:" << std::endl;
std::cout << output->getHeader().dumpStructure() << std::endl;
}
/// Fail-fast optimization to abort copying when the current clean state expires
@ -1600,7 +1637,7 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number)
{
LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number);
LOG_INFO(log, "Removing helping tables piece {}", current_piece_number);
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table
@ -1611,17 +1648,17 @@ void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table,
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
LOG_INFO(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE);
UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
LOG_DEBUG(log, "Removing helping tables");
LOG_INFO(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
dropHelpingTablesByPieceNumber(task_table, current_piece_number);
@ -1630,7 +1667,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
LOG_INFO(log, "Try drop partition partition from all helping tables.");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
@ -1641,17 +1678,16 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
LOG_INFO(log, "Execute distributed DROP PARTITION: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
settings_push,
PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
}
LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
@ -1666,6 +1702,8 @@ String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, C
return typeid_cast<const ColumnString &>(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
@ -1680,6 +1718,21 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
}
ASTPtr ClusterCopier::getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_push, true);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_push,
*connection_entry,
task_cluster->settings_push);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();
return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
}
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
TaskShard & task_shard, bool create_split)
@ -1688,6 +1741,8 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard);
// task_shard.current_push_table_create_query = getCreateTableForPushShard(timeouts, task_shard);
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
@ -1709,7 +1764,8 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
// auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
auto create_query_ast = task_shard.current_pull_table_create_query;
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
dropAndCreateLocalTable(create_table_pull_ast);
@ -1768,7 +1824,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_DEBUG(log, "Computing destination partition set, executing query: {}", query);
LOG_INFO(log, "Computing destination partition set, executing query: \n {}", wrapWithColor(query));
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
@ -1787,7 +1843,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
}
}
LOG_DEBUG(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
LOG_INFO(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
return res;
}
@ -1799,21 +1855,22 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
TaskTable & task_table = task_shard.task_table;
std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
+ " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) +
" = (" + partition_quoted_name + " AS partition_key))";
WriteBufferFromOwnString ss;
ss << "SELECT 1 FROM " << getQuotedTable(task_shard.table_read_shard);
ss << " WHERE (" << queryToString(task_table.engine_push_partition_key_ast);
ss << " = (" + partition_quoted_name << " AS partition_key))";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
query += " LIMIT 1";
LOG_DEBUG(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
ss << " AND (" << task_table.where_condition_str << ")";
ss << " LIMIT 1";
auto query = ss.str();
ParserQuery parser_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: \n {}",
task_shard.getDescription(), partition_quoted_name, query_ast->formatForErrorMessage());
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
@ -1847,7 +1904,7 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
query += " LIMIT 1";
LOG_DEBUG(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
LOG_INFO(log, "Checking shard {} for partition {} piece {} existence, executing query: \n \u001b[36m {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
ParserQuery parser_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@ -1857,12 +1914,13 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
local_context->setSettings(task_cluster->settings_pull);
auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
if (result != 0)
LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
LOG_INFO(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
else
LOG_DEBUG(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
LOG_INFO(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
return result != 0;
}
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
@ -1870,112 +1928,64 @@ UInt64 ClusterCopier::executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
PoolMode pool_mode,
ClusterExecutionMode execution_mode,
UInt64 max_successful_executions_per_shard) const
ClusterExecutionMode execution_mode) const
{
auto num_shards = cluster->getShardsInfo().size();
std::vector<UInt64> per_shard_num_successful_replicas(num_shards, 0);
ParserQuery p_query(query.data() + query.size());
ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
/// We will have to execute query on each replica of a shard.
ClusterPtr cluster_for_query = cluster;
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
max_successful_executions_per_shard = 0;
cluster_for_query = cluster->getClusterWithReplicasAsShards(current_settings);
std::atomic<size_t> origin_replicas_number = 0;
std::vector<std::shared_ptr<Connection>> connections;
connections.reserve(cluster->getShardCount());
/// We need to execute query on one replica at least
auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
std::atomic<UInt64> successfully_executed = 0;
for (const auto & replicas : cluster_for_query->getShardsAddresses())
{
setThreadName("QueryForShard");
const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
num_successful_executions = 0;
auto increment_and_check_exit = [&] () -> bool
const auto & node = replicas[0];
try
{
++num_successful_executions;
return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
};
connections.emplace_back(std::make_shared<Connection>(
node.host_name, node.port, node.default_database,
node.user, node.password, node.cluster, node.cluster_secret,
"ClusterCopier", node.compression, node.secure
));
UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
/// We execute only Alter, Create and Drop queries.
const auto header = Block{};
origin_replicas_number += num_replicas;
UInt64 num_local_replicas = shard.getLocalNodeCount();
UInt64 num_remote_replicas = num_replicas - num_local_replicas;
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), query, header, getContext(),
/*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
/// In that case we don't have local replicas, but do it just in case
for (UInt64 i = 0; i < num_local_replicas; ++i)
{
auto interpreter = InterpreterFactory::get(query_ast, getContext());
interpreter->execute();
if (increment_and_check_exit())
return;
}
/// Will try to make as many as possible queries
if (shard.hasRemoteConnections())
{
shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
auto shard_context = Context::createCopy(context);
shard_context->setSettings(shard_settings);
for (auto & connection : connections)
try
{
if (connection.isNull())
continue;
try
{
/// CREATE TABLE and DROP PARTITION queries return empty block
RemoteBlockInputStream stream{*connection, query, Block{}, shard_context};
NullBlockOutputStream output{Block{}};
copyData(stream, output);
if (increment_and_check_exit())
return;
}
catch (const Exception &)
{
LOG_INFO(log, getCurrentExceptionMessage(false, true));
}
remote_query_executor->sendQuery();
}
catch (...)
{
LOG_WARNING(log, "Seemns like node with address {} is unreachable {}", node.host_name);
}
while (true)
{
auto block = remote_query_executor->read();
if (!block)
break;
}
remote_query_executor->finish();
++successfully_executed;
}
catch (...)
{
LOG_WARNING(log, "An error occured while processing query : \n {}", wrapWithColor(query));
tryLogCurrentException(log);
}
};
{
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
thread_pool.wait();
}
UInt64 successful_nodes = 0;
for (UInt64 num_replicas : per_shard_num_successful_replicas)
{
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
successful_nodes += num_replicas;
else
/// Count only successful shards
successful_nodes += (num_replicas > 0);
}
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
{
LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
}
return successful_nodes;
return successfully_executed.load();
}
}

View File

@ -18,12 +18,15 @@ public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
ContextPtr context_)
ContextPtr context_,
Poco::Logger * log_)
: WithContext(context_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
log(&Poco::Logger::get("ClusterCopier")) {}
log(log_) {
std::cout << "Level from constructor" << log->getLevel() << std::endl;
}
void init();
@ -159,6 +162,7 @@ protected:
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
ASTPtr getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true);
@ -189,9 +193,7 @@ protected:
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD) const;
private:
String task_zookeeper_path;

View File

@ -21,7 +21,7 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
log_level = config().getString("log-level", "info");
is_safe_mode = config().has("safe-mode");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
@ -110,6 +110,8 @@ void ClusterCopierApp::mainImpl()
ThreadStatus thread_status;
auto * log = &logger();
log->setLevel(6); /// Information
std::cout << log->getLevel() << std::endl;
LOG_INFO(log, "Starting clickhouse-copier (id {}, host_id {}, path {}, revision {})", process_id, host_id, process_path, ClickHouseRevision::getVersionRevision());
SharedContextHolder shared_context = Context::createShared();
@ -137,7 +139,7 @@ void ClusterCopierApp::mainImpl()
CurrentThread::QueryScope query_scope(context);
std::cout << "Will construct copier" << std::endl;
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context, log);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);

View File

@ -50,7 +50,6 @@ struct TaskTable
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
bool isReplicatedTable() const { return is_replicated_table; }
/// Partitions will be split into number-of-splits pieces.
@ -181,6 +180,7 @@ struct TaskShard
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;

View File

@ -497,14 +497,14 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ClusterPtr optimized_cluster = getOptimizedCluster(local_context, metadata_snapshot, query_info.query);
if (optimized_cluster)
{
LOG_DEBUG(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
LOG_TRACE(log, "Skipping irrelevant shards - the query will be sent to the following shards of the cluster (shard numbers): {}",
makeFormattedListOfShards(optimized_cluster));
cluster = optimized_cluster;
query_info.optimized_cluster = cluster;
}
else
{
LOG_DEBUG(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
LOG_TRACE(log, "Unable to figure out irrelevant shards from WHERE/PREWHERE clauses - the query will be sent to all shards of the cluster{}",
has_sharding_key ? "" : " (no sharding key)");
}
}
@ -536,7 +536,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_block);
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
LOG_TRACE(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
return *stage;
}
}
@ -789,7 +789,7 @@ void StorageDistributed::startup()
if (inc > file_names_increment.value)
file_names_increment.value.store(inc);
}
LOG_DEBUG(log, "Auto-increment is {}", file_names_increment.value);
LOG_TRACE(log, "Auto-increment is {}", file_names_increment.value);
}
@ -799,9 +799,9 @@ void StorageDistributed::shutdown()
std::lock_guard lock(cluster_nodes_mutex);
LOG_DEBUG(log, "Joining background threads for async INSERT");
LOG_TRACE(log, "Joining background threads for async INSERT");
cluster_nodes_data.clear();
LOG_DEBUG(log, "Background threads for async INSERT joined");
LOG_TRACE(log, "Background threads for async INSERT joined");
}
void StorageDistributed::drop()
{
@ -819,13 +819,13 @@ void StorageDistributed::drop()
if (relative_data_path.empty())
return;
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
LOG_TRACE(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = data_volume->getDisks();
for (const auto & disk : disks)
disk->removeRecursive(relative_data_path);
LOG_DEBUG(log, "Removed");
LOG_TRACE(log, "Removed");
}
Strings StorageDistributed::getDataPaths() const
@ -845,7 +845,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, Co
{
std::lock_guard lock(cluster_nodes_mutex);
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE");
LOG_TRACE(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE");
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
@ -853,7 +853,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, Co
it = cluster_nodes_data.erase(it);
}
LOG_DEBUG(log, "Removed");
LOG_TRACE(log, "Removed");
}
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
@ -881,7 +881,7 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
if (std::filesystem::is_empty(dir_path))
{
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
LOG_TRACE(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
/// Will be created by DistributedBlockOutputStream on demand.
std::filesystem::remove(dir_path);
}
@ -1138,7 +1138,7 @@ void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
disk->moveDirectory(relative_data_path, new_path_to_table_data);
auto new_path = disk->getPath() + new_path_to_table_data;
LOG_DEBUG(log, "Updating path to {}", new_path);
LOG_TRACE(log, "Updating path to {}", new_path);
std::lock_guard lock(cluster_nodes_mutex);
for (auto & node : cluster_nodes_data)

View File

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<level>fatal</level>
<log>/var/log/clickhouse-server/copier/log.log</log>
<errorlog>/var/log/clickhouse-server/copier/log.err.log</errorlog>
<size>1000M</size>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
</yandex>

View File

@ -0,0 +1,6 @@
<?xml version="1.0"?>
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>fatal</level>
<log>/var/log/clickhouse-server/copier/log.log</log>
<errorlog>/var/log/clickhouse-server/copier/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/copier/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/copier/stdout.log</stdout>
</logger>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<dbuser>
<password>12345678</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</dbuser>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -32,12 +32,12 @@
<table_events>
<cluster_pull>events</cluster_pull>
<database_pull>dailyhistory</database_pull>
<table_pull>yellow_tripdata</table_pull>
<table_pull>yellow_tripdata_staging</table_pull>
<cluster_push>events</cluster_push>
<database_push>monthlyhistory</database_push>
<table_push>yellow_tripdata</table_push>
<table_push>yellow_tripdata_staging</table_push>
<engine>Engine=ReplacingMergeTree() PRIMARY KEY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id) PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))</engine>
<sharding_key>rand()</sharding_key>
<sharding_key>sipHash64(id) % 3</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_different_schema</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_different_schema</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column9, Column1, Column2, Column3, Column4)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,175 @@
import os
import sys
import time
import logging
import subprocess
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import docker
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
COPYING_FAIL_PROBABILITY = 0.33
MOVING_FAIL_PROBABILITY = 0.1
cluster = None
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
cluster = ClickHouseCluster(__file__)
for name in ["first", "second", "third"]:
cluster.add_instance(name,
main_configs=["configs_two_nodes/conf.d/clusters.xml", "configs_two_nodes/conf.d/ddl.xml"], user_configs=["configs_two_nodes/users.xml"],
with_zookeeper=True, external_data_path=os.path.join(CURRENT_TEST_DIR, "./data"))
cluster.start()
yield cluster
finally:
pass
cluster.shutdown()
# Will copy table from `first` node to `second`
class TaskWithDifferentSchema:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = '/clickhouse-copier/task_with_different_schema'
self.container_task_file = "/task_with_different_schema.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_with_different_schema.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
first = cluster.instances["first"]
first.query("CREATE DATABASE db_different_schema;")
first.query("""CREATE TABLE db_different_schema.source
(
Column1 String,
Column2 UInt32,
Column3 Date,
Column4 DateTime,
Column5 UInt16,
Column6 String,
Column7 String,
Column8 String,
Column9 String,
Column10 String,
Column11 String,
Column12 Decimal(3, 1),
Column13 DateTime,
Column14 UInt16
)
ENGINE = MergeTree()
PARTITION BY (toYYYYMMDD(Column3), Column3)
PRIMARY KEY (Column1, Column2, Column3, Column4, Column6, Column7, Column8, Column9)
ORDER BY (Column1, Column2, Column3, Column4, Column6, Column7, Column8, Column9)
SETTINGS index_granularity = 8192""")
first.query("""INSERT INTO db_different_schema.source SELECT * FROM generateRandom(
'Column1 String, Column2 UInt32, Column3 Date, Column4 DateTime, Column5 UInt16,
Column6 String, Column7 String, Column8 String, Column9 String, Column10 String,
Column11 String, Column12 Decimal(3, 1), Column13 DateTime, Column14 UInt16', 1, 10, 2) LIMIT 100;""")
second = cluster.instances["second"]
second.query("CREATE DATABASE db_different_schema;")
second.query("""CREATE TABLE db_different_schema.destination
(
Column1 LowCardinality(String) CODEC(LZ4),
Column2 UInt32 CODEC(LZ4),
Column3 Date CODEC(DoubleDelta, LZ4),
Column4 DateTime CODEC(DoubleDelta, LZ4),
Column5 UInt16 CODEC(LZ4),
Column6 LowCardinality(String) CODEC(ZSTD),
Column7 LowCardinality(String) CODEC(ZSTD),
Column8 LowCardinality(String) CODEC(ZSTD),
Column9 LowCardinality(String) CODEC(ZSTD),
Column10 String CODEC(ZSTD(6)),
Column11 LowCardinality(String) CODEC(LZ4),
Column12 Decimal(3,1) CODEC(LZ4),
Column13 DateTime CODEC(DoubleDelta, LZ4),
Column14 UInt16 CODEC(LZ4)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(Column3)
ORDER BY (Column9, Column1, Column2, Column3, Column4);""")
print("Preparation completed")
def check(self):
first = cluster.instances["first"]
second = cluster.instances["second"]
a = first.query("SELECT count() from db_different_schema.source")
b = second.query("SELECT count() from db_different_schema.destination")
assert a == b, "Count"
a = TSV(first.query("""SELECT sipHash64(*) from db_different_schema.source
ORDER BY (Column1, Column2, Column3, Column4, Column5, Column6, Column7, Column8, Column9, Column10, Column11, Column12, Column13, Column14)"""))
b = TSV(second.query("""SELECT sipHash64(*) from db_different_schema.destination
ORDER BY (Column1, Column2, Column3, Column4, Column5, Column6, Column7, Column8, Column9, Column10, Column11, Column12, Column13, Column14)"""))
assert a == b, "Data"
def execute_task(task, cmd_options):
task.start()
zk = cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-copier.xml',
'--task-path', task.zk_task_path,
'--task-file', task.container_task_file,
'--task-upload-force', 'true',
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
print(cmd)
for instance_name, instance in cluster.instances.items():
instance = cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_taxi/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")
logging.info("Copied copier config to {}".format(instance.name))
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
output = docker_api.exec_start(exec_id).decode('utf8')
logging.info(output)
copiers_exec_ids.append(exec_id)
logging.info("Copier for {} ({}) has started".format(instance.name, instance.ip_address))
# time.sleep(1000)
# Wait for copiers stopping and check their return codes
for exec_id, instance in zip(copiers_exec_ids, iter(cluster.instances.values())):
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(1)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
try:
task.check()
finally:
zk.delete(task.zk_task_path, recursive=True)
# Tests
@pytest.mark.timeout(1200)
def test1(started_cluster):
execute_task(TaskWithDifferentSchema(started_cluster), [])

View File

@ -114,7 +114,7 @@ class Task:
junk1 String, junk2 String
)
Engine = ReplacingMergeTree() PRIMARY KEY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id) PARTITION BY (toYYYYMMDD(tpep_pickup_datetime))""")
instance.query("CREATE TABLE dailyhistory.yellow_tripdata ON CLUSTER events AS dailyhistory.yellow_tripdata_staging ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, rand());")
instance.query("CREATE TABLE dailyhistory.yellow_tripdata ON CLUSTER events AS dailyhistory.yellow_tripdata_staging ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, sipHash64(id) % 3);")
# monthly partition database
instance.query("create database monthlyhistory on cluster events;")
@ -127,25 +127,38 @@ class Task:
pickup_location_id String, dropoff_location_id String, congestion_surcharge String, junk1 String, junk2 String
)
Engine = ReplacingMergeTree() PRIMARY KEY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id) PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))""")
instance.query("CREATE TABLE monthlyhistory.yellow_tripdata ON CLUSTER events AS monthlyhistory.yellow_tripdata_staging ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, rand());")
instance.query("CREATE TABLE monthlyhistory.yellow_tripdata ON CLUSTER events AS monthlyhistory.yellow_tripdata_staging ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, sipHash64(id) % 3);")
logging.info("Inserting in container")
print("Inserting in container")
first_query = """INSERT INTO dailyhistory.yellow_tripdata(
vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,
rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,
fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge) FORMAT CSV"""
instance.exec_in_container(['bash', '-c', 'cat /usr/share/clickhouse-external-data/first.csv | /usr/bin/clickhouse client --query="{}"'.format(first_query)], privileged=True)
logging.info("Insert completed")
print("Insert completed")
def check(self):
instance = cluster.instances["first"]
a = TSV(instance.query("SELECT count() from dailyhistory.yellow_tripdata"))
b = TSV(instance.query("SELECT count() from monthlyhistory.yellow_tripdata"))
assert a == b, "Distributed tables"
for instance_name, instance in cluster.instances.items():
instance = cluster.instances[instance_name]
a = instance.query("SELECT count() from dailyhistory.yellow_tripdata_staging")
b = instance.query("SELECT count() from monthlyhistory.yellow_tripdata_staging")
print(a, b)
assert a == b
assert a == b, "MergeTree tables on each shard"
a = TSV(instance.query("SELECT sipHash64(*) from dailyhistory.yellow_tripdata_staging ORDER BY id"))
b = TSV(instance.query("SELECT sipHash64(*) from monthlyhistory.yellow_tripdata_staging ORDER BY id"))
assert a == b, "Data on each shard"
def execute_task(task, cmd_options):