style + enable trivial test

This commit is contained in:
Nikita Mikhaylov 2021-04-27 15:34:56 +03:00
parent b4895b1e72
commit ac072243d6
2 changed files with 87 additions and 68 deletions

View File

@ -22,6 +22,12 @@ namespace ErrorCodes
}
std::string wrapWithColor(const std::string & value)
{
return "\u001b[36;1m" + value + "\u001b[0m";
}
void ClusterCopier::init()
{
auto zookeeper = getContext()->getZooKeeper();
@ -194,6 +200,7 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
LOG_INFO(log, "Waiting for {} setup jobs", thread_pool.active());
thread_pool.wait();
}
std::cout << "discoverTablePartitions finished" << std::endl;
}
void ClusterCopier::uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
@ -587,22 +594,6 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
}
/// Try to drop destination partition in original table
DatabaseAndTableName original_table = task_table.table_push;
if (task_table.allow_to_drop_target_partitions)
{
WriteBufferFromOwnString ss;
ss << "ALTER TABLE " << getQuotedTable(original_table) << ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") << partition_name;
UInt64 num_shards_drop_partition = executeQueryOnCluster(task_table.cluster_push, ss.str(), task_cluster->settings_push, ClusterExecutionMode::ON_EACH_SHARD);
LOG_INFO(log, "Drop partiton {} in original table {} have been executed successfully on {} shards of {}",
partition_name, getQuotedTable(original_table), num_shards_drop_partition, task_table.cluster_push->getShardCount());
}
/// Move partition to original destination table.
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
@ -611,15 +602,19 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
ASTPtr query_alter_ast;
String query_alter_ast_string;
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table = DatabaseAndTableName(original_table.first,
original_table.second + "_piece_" +
toString(current_piece_number));
Settings settings_push = task_cluster->settings_push;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
UInt64 max_successful_executions_per_shard = 0;
if (settings_push.replication_alter_partitions_sync == 1)
{
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
max_successful_executions_per_shard = 1;
}
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
@ -638,11 +633,17 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
if (settings_push.replication_alter_partitions_sync == 1)
{
LOG_INFO(log, "Destination tables {} have been executed alter query successfully on {} shards of {}",
getQuotedTable(original_table), num_nodes, task_table.cluster_push->getShardCount());
LOG_INFO(
log,
"Destination tables {} have been executed alter query successfully on {} shards of {}",
getQuotedTable(task_table.table_push),
num_nodes,
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
{
return TaskStatus::Error;
}
}
else
{
@ -658,6 +659,30 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
if (inject_fault)
throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
try
{
String query_deduplicate_ast_string;
if (!task_table.isReplicatedTable())
{
query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
LOG_INFO(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_deduplicate_ast_string);
UInt64 num_nodes = executeQueryOnCluster(
task_table.cluster_push,
query_deduplicate_ast_string,
task_cluster->settings_push,
ClusterExecutionMode::ON_EACH_SHARD);
LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
}
}
catch (...)
{
LOG_INFO(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
throw;
}
}
/// Create node to signal that we finished moving
@ -827,6 +852,9 @@ bool ClusterCopier::tryDropPartitionPiece(
String query = "ALTER TABLE " + getQuotedTable(helping_table);
query += ((task_partition.name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") + task_partition.name + "";
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
@ -1090,7 +1118,7 @@ TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & t
InterpreterCreateQuery::prepareOnClusterQuery(create, getContext(), task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
LOG_INFO(log, "Create destination tables. Query: \n {}", query);
LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query));
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
@ -1365,7 +1393,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast);
String query = queryToString(create_query_push_ast);
LOG_INFO(log, "Create destination tables. Query: \n {}", query);
LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query));
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
@ -1509,6 +1537,11 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto actions = std::make_shared<ExpressionActions>(actions_dag, ExpressionActionsSettings::fromContext(getContext()));
input = std::make_shared<ExpressionBlockInputStream>(pure_input, actions);
std::cout << "Input:" << std::endl;
std::cout << input->getHeader().dumpStructure() << std::endl;
std::cout << "Output:" << std::endl;
std::cout << output->getHeader().dumpStructure() << std::endl;
}
/// Fail-fast optimization to abort copying when the current clean state expires
@ -1802,7 +1835,7 @@ std::set<String> ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
LOG_INFO(log, "Computing destination partition set, executing query: \n {}", query);
LOG_INFO(log, "Computing destination partition set, executing query: \n {}", wrapWithColor(query));
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
@ -1958,7 +1991,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
}
catch (...)
{
LOG_WARNING(log, "An error occured while processing query : \n {}", query);
LOG_WARNING(log, "An error occured while processing query : \n {}", wrapWithColor(query));
tryLogCurrentException(log);
}
}

View File

@ -1,15 +1,18 @@
import os
import sys
import time
from contextlib import contextmanager
import docker
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import kazoo
import pytest
import docker
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
COPYING_FAIL_PROBABILITY = 0.33
MOVING_FAIL_PROBABILITY = 0.1
@ -40,7 +43,6 @@ def started_cluster():
yield cluster
finally:
pass
cluster.shutdown()
@ -85,9 +87,14 @@ def execute_task(task, cmd_options):
zk = cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print("No node /clickhouse-copier. It is Ok in first test.")
zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config)
zk.create(zk_task_path + "/description", task.copier_task_config.encode())
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
@ -99,23 +106,28 @@ def execute_task(task, cmd_options):
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
print(cmd)
copiers = list(cluster.instances.keys())
for instance_name, instance in cluster.instances.items():
for instance_name in copiers:
instance = cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs/config-copier.xml"),
"/etc/clickhouse-server/config-copier.xml")
print("Copied copier config to {}".format(instance.name))
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
docker_api.exec_start(exec_id, detach=True)
output = docker_api.exec_start(exec_id).decode('utf8')
print(output)
copiers_exec_ids.append(exec_id)
print("Copier for {} ({}) has started".format(instance.name, instance.ip_address))
# Wait for copiers stopping and check their return codes
for exec_id, instance in zip(copiers_exec_ids, iter(cluster.instances.values())):
for exec_id, instance_name in zip(copiers_exec_ids, copiers):
instance = cluster.instances[instance_name]
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(1)
time.sleep(0.5)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
@ -128,53 +140,27 @@ def execute_task(task, cmd_options):
# Tests
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'),[False,True])
def test_trivial_copy(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--experimental-use-sample-offset', '1'])
else:
print("AAAAA")
execute_task(TaskTrivial(started_cluster, use_sample_offset), [])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'),[False,True])
def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'),[False,True])
def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])