This commit is contained in:
Nikita Mikhaylov 2021-04-30 01:56:41 +03:00
parent 01511d0cbe
commit 33dcebbb24
4 changed files with 9 additions and 62 deletions

View File

@ -1405,11 +1405,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// 3) Create helping table on the whole destination cluster
auto & settings_push = task_cluster->settings_push;
/// Get a connection to any shard to fetch `CREATE` query
auto connection = task_table.cluster_push->getAnyShardInfo().pool->get(timeouts, &settings_push, true);
/// Execute a request and get `CREATE` query as a string.
String create_query = getRemoteCreateTable(task_shard.task_table.table_push, *connection, settings_push);
/// Parse it to ASTPtr
ParserCreateQuery parser_create_query;
auto create_query_ast = parseQuery(parser_create_query, create_query, settings_push.max_query_size, settings_push.max_parser_depth);
/// Define helping table database and name for current partition piece
@ -1417,8 +1415,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
task_table.table_push.first,
task_table.table_push.second + "_piece_" + toString(current_piece_number)};
/// This is a bit of legacy, because we now could parse and engine AST from the whole create query.
/// But this is needed to make helping table non-replicated. We simply don't need this
auto new_engine_push_ast = task_table.engine_push_ast;
if (task_table.isReplicatedTable())
new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
@ -1427,17 +1424,11 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast);
String query = queryToString(create_query_push_ast);
<<<<<<< HEAD
LOG_INFO(log, "Create destination tables. Query: \n {}", wrapWithColor(query));
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
=======
LOG_INFO(log, "Create destination tables. Query: \n {}", query);
UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "Destination tables {} have been created on {} shards of {}",
>>>>>>> better
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
@ -1750,7 +1741,6 @@ String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, C
}
ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
@ -1766,20 +1756,6 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
}
ASTPtr ClusterCopier::getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_push, true);
String create_query_pull_str = getRemoteCreateTable(
task_shard.task_table.table_push,
*connection_entry,
task_cluster->settings_push);
ParserCreateQuery parser_create_query;
const auto & settings = getContext()->getSettingsRef();
return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
}
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
TaskShard & task_shard, bool create_split)
@ -2033,17 +2009,6 @@ UInt64 ClusterCopier::executeQueryOnCluster(
tryLogCurrentException(log);
continue;
}
<<<<<<< HEAD
remote_query_executor->finish();
++successfully_executed;
}
catch (...)
{
LOG_WARNING(log, "An error occured while processing query : \n {}", wrapWithColor(query));
tryLogCurrentException(log);
=======
>>>>>>> better
}
}

View File

@ -24,9 +24,7 @@ public:
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
log(log_) {
std::cout << "Level from constructor" << log->getLevel() << std::endl;
}
log(log_) {}
void init();
@ -162,7 +160,6 @@ protected:
String getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings);
ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
ASTPtr getCreateTableForPushShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard);
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true);

View File

@ -77,6 +77,8 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = 0;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
@ -92,17 +94,17 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
/// Override important settings
settings_pull.readonly = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_pull.prefer_localhost_replica, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
set_default_value(settings_push.prefer_localhost_replica, 0);
}
}

View File

@ -14,7 +14,7 @@ CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
COPYING_FAIL_PROBABILITY = 0.33
COPYING_FAIL_PROBABILITY = 0.1
MOVING_FAIL_PROBABILITY = 0.1
cluster = None
@ -151,20 +151,3 @@ def test_trivial_copy(started_cluster, use_sample_offset):
else:
execute_task(TaskTrivial(started_cluster, use_sample_offset), [])
@pytest.mark.parametrize(('use_sample_offset'),[False,True])
def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(('use_sample_offset'),[False,True])
def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])