2017-10-13 19:13:41 +00:00
# include "ClusterCopier.h"
2018-01-11 20:51:30 +00:00
2020-02-19 15:01:08 +00:00
# include "Internals.h"
2021-04-29 19:16:51 +00:00
# include "StatusAccumulator.h"
2020-02-19 15:01:08 +00:00
2017-10-13 19:13:41 +00:00
# include <Common/ZooKeeper/ZooKeeper.h>
2018-04-03 17:37:30 +00:00
# include <Common/ZooKeeper/KeeperException.h>
2020-05-30 17:53:55 +00:00
# include <Common/setThreadName.h>
2020-12-10 22:05:02 +00:00
# include <IO/ConnectionTimeoutsContext.h>
2021-04-22 18:04:32 +00:00
# include <Interpreters/InterpreterInsertQuery.h>
# include <Processors/Transforms/ExpressionTransform.h>
2021-09-14 17:48:18 +00:00
# include <Processors/QueryPipelineBuilder.h>
2021-09-03 17:29:36 +00:00
# include <Processors/Chain.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/Executors/PushingPipelineExecutor.h>
2021-09-16 17:40:42 +00:00
# include <Processors/Sources/RemoteSource.h>
2021-04-22 18:04:32 +00:00
# include <DataStreams/ExpressionBlockInputStream.h>
2017-10-13 19:13:41 +00:00
namespace DB
2020-02-25 18:58:28 +00:00
{
2017-10-13 19:13:41 +00:00
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED ;
extern const int LOGICAL_ERROR ;
extern const int UNFINISHED ;
extern const int BAD_ARGUMENTS ;
}
2020-02-25 18:58:28 +00:00
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : init ( )
2017-10-13 19:13:41 +00:00
{
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
task_description_watch_callback = [ this ] ( const Coordination : : WatchResponse & response )
{
2020-06-12 15:09:12 +00:00
if ( response . error ! = Coordination : : Error : : ZOK )
2020-02-19 15:01:08 +00:00
return ;
2020-02-20 17:26:20 +00:00
UInt64 version = + + task_description_version ;
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Task description should be updated, local version {} " , version ) ;
2020-02-19 15:01:08 +00:00
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
task_description_path = task_zookeeper_path + " /description " ;
task_cluster = std : : make_unique < TaskCluster > ( task_zookeeper_path , working_database_name ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
reloadTaskDescription ( ) ;
2017-10-13 19:13:41 +00:00
2021-04-29 19:16:51 +00:00
task_cluster - > loadTasks ( * task_cluster_current_config ) ;
getContext ( ) - > setClustersConfig ( task_cluster_current_config , task_cluster - > clusters_prefix ) ;
2020-02-19 15:01:08 +00:00
/// Set up shards and their priority
task_cluster - > random_engine . seed ( task_cluster - > random_device ( ) ) ;
for ( auto & task_table : task_cluster - > table_tasks )
2019-06-25 21:55:52 +00:00
{
2021-04-10 23:33:54 +00:00
task_table . cluster_pull = getContext ( ) - > getCluster ( task_table . cluster_pull_name ) ;
task_table . cluster_push = getContext ( ) - > getCluster ( task_table . cluster_push_name ) ;
2020-02-19 15:01:08 +00:00
task_table . initShards ( task_cluster - > random_engine ) ;
2019-06-25 21:55:52 +00:00
}
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Will process {} table tasks " , task_cluster - > table_tasks . size ( ) ) ;
2020-02-19 15:01:08 +00:00
/// Do not initialize tables, will make deferred initialization in process()
zookeeper - > createAncestors ( getWorkersPathVersion ( ) + " / " ) ;
zookeeper - > createAncestors ( getWorkersPath ( ) + " / " ) ;
2021-04-29 19:16:51 +00:00
/// Init status node
zookeeper - > createIfNotExists ( task_zookeeper_path + " /status " , " {} " ) ;
2018-02-20 21:03:38 +00:00
}
2020-02-19 15:01:08 +00:00
template < typename T >
decltype ( auto ) ClusterCopier : : retry ( T & & func , UInt64 max_tries )
2018-02-20 21:03:38 +00:00
{
2020-02-19 15:01:08 +00:00
std : : exception_ptr exception ;
2020-11-24 18:22:50 +00:00
if ( max_tries = = 0 )
throw Exception ( " Cannot perform zero retries " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-02-19 15:01:08 +00:00
for ( UInt64 try_number = 1 ; try_number < = max_tries ; + + try_number )
{
try
{
return func ( ) ;
}
catch ( . . . )
{
exception = std : : current_exception ( ) ;
if ( try_number < max_tries )
{
tryLogCurrentException ( log , " Will retry " ) ;
std : : this_thread : : sleep_for ( default_sleep_time ) ;
}
}
}
std : : rethrow_exception ( exception ) ;
2018-02-20 21:03:38 +00:00
}
2020-02-19 15:01:08 +00:00
void ClusterCopier : : discoverShardPartitions ( const ConnectionTimeouts & timeouts , const TaskShardPtr & task_shard )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
TaskTable & task_table = task_shard - > task_table ;
2017-10-13 19:13:41 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Discover partitions of shard {} " , task_shard - > getDescription ( ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
auto get_partitions = [ & ] ( ) { return getShardPartitions ( timeouts , * task_shard ) ; } ;
auto existing_partitions_names = retry ( get_partitions , 60 ) ;
Strings filtered_partitions_names ;
Strings missing_partitions ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Check that user specified correct partition names
auto check_partition_format = [ ] ( const DataTypePtr & type , const String & partition_text_quoted )
2017-11-09 18:06:36 +00:00
{
2020-02-19 15:01:08 +00:00
MutableColumnPtr column_dummy = type - > createColumn ( ) ;
ReadBufferFromString rb ( partition_text_quoted ) ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
try
{
2021-03-09 14:46:52 +00:00
type - > getDefaultSerialization ( ) - > deserializeTextQuoted ( * column_dummy , rb , FormatSettings ( ) ) ;
2020-02-19 15:01:08 +00:00
}
catch ( Exception & e )
{
throw Exception ( " Partition " + partition_text_quoted + " has incorrect format. " + e . displayText ( ) , ErrorCodes : : BAD_ARGUMENTS ) ;
}
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( task_table . has_enabled_partitions )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
/// Process partition in order specified by <enabled_partitions/>
for ( const String & partition_name : task_table . enabled_partitions )
{
/// Check that user specified correct partition names
check_partition_format ( task_shard - > partition_key_column . type , partition_name ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
auto it = existing_partitions_names . find ( partition_name ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Do not process partition if it is not in enabled_partitions list
if ( it = = existing_partitions_names . end ( ) )
{
missing_partitions . emplace_back ( partition_name ) ;
continue ;
}
filtered_partitions_names . emplace_back ( * it ) ;
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
for ( const String & partition_name : existing_partitions_names )
{
if ( ! task_table . enabled_partitions_set . count ( partition_name ) )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} will not be processed, since it is not in enabled_partitions of {} " , partition_name , task_table . table_id ) ;
2020-02-19 15:01:08 +00:00
}
}
}
else
{
for ( const String & partition_name : existing_partitions_names )
filtered_partitions_names . emplace_back ( partition_name ) ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
for ( const String & partition_name : filtered_partitions_names )
{
2020-03-12 19:46:48 +00:00
const size_t number_of_splits = task_table . number_of_splits ;
task_shard - > partition_tasks . emplace ( partition_name , ShardPartition ( * task_shard , partition_name , number_of_splits ) ) ;
2020-02-19 15:01:08 +00:00
task_shard - > checked_partitions . emplace ( partition_name , true ) ;
2020-02-18 13:39:22 +00:00
auto shard_partition_it = task_shard - > partition_tasks . find ( partition_name ) ;
PartitionPieces & shard_partition_pieces = shard_partition_it - > second . pieces ;
for ( size_t piece_number = 0 ; piece_number < number_of_splits ; + + piece_number )
{
bool res = checkPresentPartitionPiecesOnCurrentShard ( timeouts , * task_shard , partition_name , piece_number ) ;
shard_partition_pieces . emplace_back ( shard_partition_it - > second , piece_number , res ) ;
}
2020-02-19 15:01:08 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( ! missing_partitions . empty ( ) )
{
2020-11-10 18:22:26 +00:00
WriteBufferFromOwnString ss ;
2020-02-19 15:01:08 +00:00
for ( const String & missing_partition : missing_partitions )
ss < < " " < < missing_partition ;
2017-11-09 18:06:36 +00:00
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " There are no {} partitions from enabled_partitions in shard {} :{} " , missing_partitions . size ( ) , task_shard - > getDescription ( ) , ss . str ( ) ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-07 13:02:47 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Will copy {} partitions from shard {} " , task_shard - > partition_tasks . size ( ) , task_shard - > getDescription ( ) ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-08 11:07:58 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : discoverTablePartitions ( const ConnectionTimeouts & timeouts , TaskTable & task_table , UInt64 num_threads )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
/// Fetch partitions list from a shard
{
ThreadPool thread_pool ( num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores ( ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
for ( const TaskShardPtr & task_shard : task_table . all_shards )
2020-05-30 17:53:55 +00:00
thread_pool . scheduleOrThrowOnError ( [ this , timeouts , task_shard ] ( )
{
setThreadName ( " DiscoverPartns " ) ;
discoverShardPartitions ( timeouts , task_shard ) ;
} ) ;
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Waiting for {} setup jobs " , thread_pool . active ( ) ) ;
2020-02-19 15:01:08 +00:00
thread_pool . wait ( ) ;
}
}
2017-11-14 13:13:24 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : uploadTaskDescription ( const std : : string & task_path , const std : : string & task_file , const bool force )
2017-11-14 13:13:24 +00:00
{
2020-02-19 15:01:08 +00:00
auto local_task_description_path = task_path + " /description " ;
2017-11-14 13:13:24 +00:00
2020-02-19 15:01:08 +00:00
String task_config_str ;
2017-11-14 13:13:24 +00:00
{
2020-02-19 15:01:08 +00:00
ReadBufferFromFile in ( task_file ) ;
readStringUntilEOF ( task_config_str , in ) ;
2017-11-14 13:13:24 +00:00
}
2020-02-19 15:01:08 +00:00
if ( task_config_str . empty ( ) )
return ;
2018-02-20 21:03:38 +00:00
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
zookeeper - > createAncestors ( local_task_description_path ) ;
auto code = zookeeper - > tryCreate ( local_task_description_path , task_config_str , zkutil : : CreateMode : : Persistent ) ;
2020-06-12 15:09:12 +00:00
if ( code ! = Coordination : : Error : : ZOK & & force )
2020-02-19 15:01:08 +00:00
zookeeper - > createOrUpdate ( local_task_description_path , task_config_str , zkutil : : CreateMode : : Persistent ) ;
2018-02-20 21:03:38 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Task description {} uploaded to {} with result {} ({}) " ,
2020-06-12 15:09:12 +00:00
( ( code ! = Coordination : : Error : : ZOK & & ! force ) ? " not " : " " ) , local_task_description_path , code , Coordination : : errorMessage ( code ) ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : reloadTaskDescription ( )
{
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2020-02-19 15:01:08 +00:00
task_description_watch_zookeeper = zookeeper ;
2018-02-20 21:03:38 +00:00
2020-02-18 13:39:22 +00:00
Coordination : : Stat stat { } ;
2018-02-20 21:03:38 +00:00
2021-04-29 19:16:51 +00:00
/// It will throw exception if such a node doesn't exist.
auto task_config_str = zookeeper - > get ( task_description_path , & stat ) ;
2018-02-20 21:03:38 +00:00
2021-04-29 19:16:51 +00:00
LOG_INFO ( log , " Loading task description " ) ;
task_cluster_current_config = getConfigurationFromXMLString ( task_config_str ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
/// Setup settings
2021-04-29 19:16:51 +00:00
task_cluster - > reloadSettings ( * task_cluster_current_config ) ;
2021-04-10 23:33:54 +00:00
getContext ( ) - > setSettings ( task_cluster - > settings_common ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : updateConfigIfNeeded ( )
2017-10-13 19:13:41 +00:00
{
2020-02-20 17:26:20 +00:00
UInt64 version_to_update = task_description_version ;
bool is_outdated_version = task_description_current_version ! = version_to_update ;
bool is_expired_session = ! task_description_watch_zookeeper | | task_description_watch_zookeeper - > expired ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( ! is_outdated_version & & ! is_expired_session )
return ;
2017-11-14 13:13:24 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Updating task description " ) ;
2020-02-19 15:01:08 +00:00
reloadTaskDescription ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-20 17:26:20 +00:00
task_description_current_version = version_to_update ;
2020-02-19 15:01:08 +00:00
}
2018-02-07 13:02:47 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : process ( const ConnectionTimeouts & timeouts )
{
for ( TaskTable & task_table : task_cluster - > table_tasks )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Process table task {} with {} shards, {} of them are local ones " , task_table . table_id , task_table . all_shards . size ( ) , task_table . local_shards . size ( ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( task_table . all_shards . empty ( ) )
continue ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Discover partitions of each shard and total set of partitions
if ( ! task_table . has_enabled_partitions )
{
/// If there are no specified enabled_partitions, we must discover them manually
discoverTablePartitions ( timeouts , task_table ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// After partitions of each shard are initialized, initialize cluster partitions
for ( const TaskShardPtr & task_shard : task_table . all_shards )
{
for ( const auto & partition_elem : task_shard - > partition_tasks )
{
const String & partition_name = partition_elem . first ;
task_table . cluster_partitions . emplace ( partition_name , ClusterPartition { } ) ;
}
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
for ( auto & partition_elem : task_table . cluster_partitions )
{
const String & partition_name = partition_elem . first ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
for ( const TaskShardPtr & task_shard : task_table . all_shards )
task_shard - > checked_partitions . emplace ( partition_name ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
task_table . ordered_partition_names . emplace_back ( partition_name ) ;
}
}
else
{
/// If enabled_partitions are specified, assume that each shard has all partitions
/// We will refine partition set of each shard in future
2018-01-11 12:23:59 +00:00
2020-02-19 15:01:08 +00:00
for ( const String & partition_name : task_table . enabled_partitions )
{
task_table . cluster_partitions . emplace ( partition_name , ClusterPartition { } ) ;
task_table . ordered_partition_names . emplace_back ( partition_name ) ;
}
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
task_table . watch . restart ( ) ;
2018-02-08 11:07:58 +00:00
2020-02-19 15:01:08 +00:00
/// Retry table processing
bool table_is_done = false ;
for ( UInt64 num_table_tries = 0 ; num_table_tries < max_table_tries ; + + num_table_tries )
{
if ( tryProcessTable ( timeouts , task_table ) )
{
table_is_done = true ;
break ;
}
}
2018-03-02 12:28:00 +00:00
2020-02-19 15:01:08 +00:00
if ( ! table_is_done )
{
throw Exception ( " Too many tries to process table " + task_table . table_id + " . Abort remaining execution " ,
ErrorCodes : : UNFINISHED ) ;
}
2018-02-07 13:02:47 +00:00
}
2020-02-19 15:01:08 +00:00
}
2018-02-07 13:02:47 +00:00
2020-02-19 15:01:08 +00:00
/// Protected section
2018-02-20 21:03:38 +00:00
2020-02-20 17:26:20 +00:00
/*
* Creates task worker node and checks maximum number of workers not to exceed the limit .
2020-08-08 01:21:04 +00:00
* To achieve this we have to check version of workers_version_path node and create current_worker_path
2020-02-20 17:26:20 +00:00
* node atomically .
* */
2020-02-19 15:01:08 +00:00
zkutil : : EphemeralNodeHolder : : Ptr ClusterCopier : : createTaskWorkerNodeAndWaitIfNeed (
const zkutil : : ZooKeeperPtr & zookeeper ,
const String & description ,
bool unprioritized )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
std : : chrono : : milliseconds current_sleep_time = default_sleep_time ;
static constexpr std : : chrono : : milliseconds max_sleep_time ( 30000 ) ; // 30 sec
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
if ( unprioritized )
std : : this_thread : : sleep_for ( current_sleep_time ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
String workers_version_path = getWorkersPathVersion ( ) ;
2020-02-20 17:26:20 +00:00
String workers_path = getWorkersPath ( ) ;
String current_worker_path = getCurrentWorkerNodePath ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
UInt64 num_bad_version_errors = 0 ;
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
while ( true )
{
updateConfigIfNeeded ( ) ;
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
Coordination : : Stat stat ;
zookeeper - > get ( workers_version_path , & stat ) ;
auto version = stat . version ;
zookeeper - > get ( workers_path , & stat ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( static_cast < UInt64 > ( stat . numChildren ) > = task_cluster - > max_workers )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Too many workers ({}, maximum {}). Postpone processing {} " , stat . numChildren , task_cluster - > max_workers , description ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( unprioritized )
current_sleep_time = std : : min ( max_sleep_time , current_sleep_time + default_sleep_time ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
std : : this_thread : : sleep_for ( current_sleep_time ) ;
num_bad_version_errors = 0 ;
}
else
{
Coordination : : Requests ops ;
ops . emplace_back ( zkutil : : makeSetRequest ( workers_version_path , description , version ) ) ;
ops . emplace_back ( zkutil : : makeCreateRequest ( current_worker_path , description , zkutil : : CreateMode : : Ephemeral ) ) ;
Coordination : : Responses responses ;
auto code = zookeeper - > tryMulti ( ops , responses ) ;
2017-10-13 19:13:41 +00:00
2020-06-12 15:09:12 +00:00
if ( code = = Coordination : : Error : : ZOK | | code = = Coordination : : Error : : ZNODEEXISTS )
2020-02-19 15:01:08 +00:00
return std : : make_shared < zkutil : : EphemeralNodeHolder > ( current_worker_path , * zookeeper , false , false , description ) ;
2017-10-13 19:13:41 +00:00
2020-06-12 15:09:12 +00:00
if ( code = = Coordination : : Error : : ZBADVERSION )
2020-02-19 15:01:08 +00:00
{
+ + num_bad_version_errors ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Try to make fast retries
if ( num_bad_version_errors > 3 )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " A concurrent worker has just been added, will check free worker slots again " ) ;
2020-02-19 15:01:08 +00:00
std : : chrono : : milliseconds random_sleep_time ( std : : uniform_int_distribution < int > ( 1 , 1000 ) ( task_cluster - > random_engine ) ) ;
std : : this_thread : : sleep_for ( random_sleep_time ) ;
num_bad_version_errors = 0 ;
}
}
else
throw Coordination : : Exception ( code ) ;
}
}
2017-10-13 19:13:41 +00:00
}
2020-02-18 13:39:22 +00:00
2020-02-21 16:00:50 +00:00
bool ClusterCopier : : checkPartitionPieceIsClean (
const zkutil : : ZooKeeperPtr & zookeeper ,
const CleanStateClock & clean_state_clock ,
2020-03-27 22:44:13 +00:00
const String & task_status_path )
2017-10-13 19:13:41 +00:00
{
2020-02-21 16:00:50 +00:00
LogicalClock task_start_clock ;
2020-02-18 13:39:22 +00:00
2020-02-21 16:00:50 +00:00
Coordination : : Stat stat { } ;
if ( zookeeper - > exists ( task_status_path , & stat ) )
task_start_clock = LogicalClock ( stat . mzxid ) ;
2020-02-18 13:39:22 +00:00
2020-03-17 16:50:22 +00:00
return clean_state_clock . is_clean ( ) & & ( ! task_start_clock . hasHappened ( ) | | clean_state_clock . discovery_zxid < = task_start_clock ) ;
2020-02-18 13:39:22 +00:00
}
2020-02-21 16:00:50 +00:00
bool ClusterCopier : : checkAllPiecesInPartitionAreDone ( const TaskTable & task_table , const String & partition_name , const TasksShard & shards_with_partition )
2020-02-18 13:39:22 +00:00
{
bool answer = true ;
2020-03-13 14:19:20 +00:00
for ( size_t piece_number = 0 ; piece_number < task_table . number_of_splits ; + + piece_number )
{
bool piece_is_done = checkPartitionPieceIsDone ( task_table , partition_name , piece_number , shards_with_partition ) ;
if ( ! piece_is_done )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} is not already done. " , partition_name , piece_number ) ;
2020-03-13 14:19:20 +00:00
answer & = piece_is_done ;
}
2020-02-18 13:39:22 +00:00
return answer ;
}
/* The same as function above
* Assume that we don ' t know on which shards do we have partition certain piece .
* We ' ll check them all ( I mean shards that contain the whole partition )
* And shards that don ' t have certain piece MUST mark that piece is_done true .
* */
bool ClusterCopier : : checkPartitionPieceIsDone ( const TaskTable & task_table , const String & partition_name ,
size_t piece_number , const TasksShard & shards_with_partition )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Check that all shards processed partition {} piece {} successfully " , partition_name , piece_number ) ;
2017-10-13 19:13:41 +00:00
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-18 13:39:22 +00:00
/// Collect all shards that contain partition piece number piece_number.
Strings piece_status_paths ;
2020-05-18 08:08:55 +00:00
for ( const auto & shard : shards_with_partition )
2020-02-19 15:01:08 +00:00
{
ShardPartition & task_shard_partition = shard - > partition_tasks . find ( partition_name ) - > second ;
2020-02-18 13:39:22 +00:00
ShardPartitionPiece & shard_partition_piece = task_shard_partition . pieces [ piece_number ] ;
piece_status_paths . emplace_back ( shard_partition_piece . getShardStatusPath ( ) ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
std : : vector < int64_t > zxid1 , zxid2 ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
try
{
std : : vector < zkutil : : ZooKeeper : : FutureGet > get_futures ;
2020-02-18 13:39:22 +00:00
for ( const String & path : piece_status_paths )
2020-02-19 15:01:08 +00:00
get_futures . emplace_back ( zookeeper - > asyncGet ( path ) ) ;
2017-11-14 13:13:24 +00:00
2020-02-19 15:01:08 +00:00
// Check that state is Finished and remember zxid
for ( auto & future : get_futures )
{
auto res = future . get ( ) ;
2019-11-11 06:53:21 +00:00
2020-02-19 15:01:08 +00:00
TaskStateWithOwner status = TaskStateWithOwner : : fromString ( res . data ) ;
if ( status . state ! = TaskState : : Finished )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " The task {} is being rewritten by {}. Partition piece will be rechecked " , res . data , status . owner ) ;
2020-02-19 15:01:08 +00:00
return false ;
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
zxid1 . push_back ( res . stat . pzxid ) ;
}
2017-11-09 18:06:36 +00:00
2020-02-21 16:00:50 +00:00
const String piece_is_dirty_flag_path = task_table . getCertainPartitionPieceIsDirtyPath ( partition_name , piece_number ) ;
const String piece_is_dirty_cleaned_path = task_table . getCertainPartitionPieceIsCleanedPath ( partition_name , piece_number ) ;
const String piece_task_status_path = task_table . getCertainPartitionPieceTaskStatusPath ( partition_name , piece_number ) ;
CleanStateClock clean_state_clock ( zookeeper , piece_is_dirty_flag_path , piece_is_dirty_cleaned_path ) ;
const bool is_clean = checkPartitionPieceIsClean ( zookeeper , clean_state_clock , piece_task_status_path ) ;
2020-02-18 13:39:22 +00:00
2020-02-21 16:00:50 +00:00
if ( ! is_clean )
2020-02-19 15:01:08 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} become dirty " , partition_name ) ;
2020-02-21 16:00:50 +00:00
return false ;
2020-02-19 15:01:08 +00:00
}
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
get_futures . clear ( ) ;
2020-02-18 13:39:22 +00:00
for ( const String & path : piece_status_paths )
2020-02-19 15:01:08 +00:00
get_futures . emplace_back ( zookeeper - > asyncGet ( path ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
// Remember zxid of states again
for ( auto & future : get_futures )
{
auto res = future . get ( ) ;
zxid2 . push_back ( res . stat . pzxid ) ;
}
}
catch ( const Coordination : : Exception & e )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " A ZooKeeper error occurred while checking partition {} piece number {}. Will recheck the partition. Error: {} " , partition_name , toString ( piece_number ) , e . displayText ( ) ) ;
2020-02-19 15:01:08 +00:00
return false ;
}
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
// If all task is finished and zxid is not changed then partition could not become dirty again
2020-02-18 13:39:22 +00:00
for ( UInt64 shard_num = 0 ; shard_num < piece_status_paths . size ( ) ; + + shard_num )
2020-02-19 15:01:08 +00:00
{
if ( zxid1 [ shard_num ] ! = zxid2 [ shard_num ] )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " The task {} is being modified now. Partition piece will be rechecked " , piece_status_paths [ shard_num ] ) ;
2020-02-19 15:01:08 +00:00
return false ;
}
}
2017-11-14 13:13:24 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece number {} is copied successfully " , partition_name , toString ( piece_number ) ) ;
2020-02-19 15:01:08 +00:00
return true ;
2019-11-11 06:53:21 +00:00
}
2020-03-13 14:19:20 +00:00
2020-03-18 13:25:49 +00:00
TaskStatus ClusterCopier : : tryMoveAllPiecesToDestinationTable ( const TaskTable & task_table , const String & partition_name )
2020-03-13 14:19:20 +00:00
{
2020-03-16 21:05:38 +00:00
bool inject_fault = false ;
if ( move_fault_probability > 0 )
{
double value = std : : uniform_real_distribution < > ( 0 , 1 ) ( task_table . task_cluster . random_engine ) ;
inject_fault = value < move_fault_probability ;
}
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Try to move {} to destination table " , partition_name ) ;
2020-03-13 14:19:20 +00:00
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2020-03-13 14:19:20 +00:00
const auto current_partition_attach_is_active = task_table . getPartitionAttachIsActivePath ( partition_name ) ;
const auto current_partition_attach_is_done = task_table . getPartitionAttachIsDonePath ( partition_name ) ;
/// Create ephemeral node to mark that we are active and process the partition
zookeeper - > createAncestors ( current_partition_attach_is_active ) ;
zkutil : : EphemeralNodeHolderPtr partition_attach_node_holder ;
try
{
partition_attach_node_holder = zkutil : : EphemeralNodeHolder : : create ( current_partition_attach_is_active , * zookeeper , host_id ) ;
}
catch ( const Coordination : : Exception & e )
{
2020-06-12 15:09:12 +00:00
if ( e . code = = Coordination : : Error : : ZNODEEXISTS )
2020-03-13 14:19:20 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Someone is already moving pieces {} " , current_partition_attach_is_active ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Active ;
2020-03-13 14:19:20 +00:00
}
throw ;
}
/// Exit if task has been already processed;
/// create blocking node to signal cleaning up if it is abandoned
{
String status_data ;
if ( zookeeper - > tryGet ( current_partition_attach_is_done , status_data ) )
{
TaskStateWithOwner status = TaskStateWithOwner : : fromString ( status_data ) ;
if ( status . state = = TaskState : : Finished )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " All pieces for partition from this task {} has been successfully moved to destination table by {} " , current_partition_attach_is_active , status . owner ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2020-03-13 14:19:20 +00:00
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself. " , current_partition_attach_is_active , status . owner ) ;
2020-03-17 16:23:47 +00:00
/// Remove is_done marker.
zookeeper - > remove ( current_partition_attach_is_done ) ;
2020-03-13 14:19:20 +00:00
}
}
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner : : getData ( TaskState : : Started , host_id ) ;
zookeeper - > create ( current_partition_attach_is_done , start_state , zkutil : : CreateMode : : Persistent ) ;
}
2021-06-04 14:21:47 +00:00
/// Try to drop destination partition in original table
if ( task_table . allow_to_drop_target_partitions )
{
DatabaseAndTableName original_table = task_table . table_push ;
WriteBufferFromOwnString ss ;
ss < < " ALTER TABLE " < < getQuotedTable ( original_table ) < < ( ( partition_name = = " 'all' " ) ? " DROP PARTITION ID " : " DROP PARTITION " ) < < partition_name ;
UInt64 num_shards_drop_partition = executeQueryOnCluster ( task_table . cluster_push , ss . str ( ) , task_cluster - > settings_push , ClusterExecutionMode : : ON_EACH_SHARD ) ;
LOG_INFO ( log , " Drop partition {} in original table {} have been executed successfully on {} shards of {} " ,
partition_name , getQuotedTable ( original_table ) , num_shards_drop_partition , task_table . cluster_push - > getShardCount ( ) ) ;
}
2020-03-16 21:05:38 +00:00
/// Move partition to original destination table.
2020-03-13 14:19:20 +00:00
for ( size_t current_piece_number = 0 ; current_piece_number < task_table . number_of_splits ; + + current_piece_number )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Trying to move partition {} piece {} to original table " , partition_name , toString ( current_piece_number ) ) ;
2020-03-13 14:19:20 +00:00
2020-03-16 21:05:38 +00:00
ASTPtr query_alter_ast ;
String query_alter_ast_string ;
2020-03-13 14:19:20 +00:00
2021-04-27 12:34:56 +00:00
DatabaseAndTableName original_table = task_table . table_push ;
2020-03-16 21:05:38 +00:00
DatabaseAndTableName helping_table = DatabaseAndTableName ( original_table . first ,
original_table . second + " _piece_ " +
toString ( current_piece_number ) ) ;
2020-03-13 14:19:20 +00:00
2020-03-20 12:18:26 +00:00
Settings settings_push = task_cluster - > settings_push ;
2021-03-19 12:51:29 +00:00
ClusterExecutionMode execution_mode = ClusterExecutionMode : : ON_EACH_NODE ;
2021-06-02 12:46:37 +00:00
2021-03-19 12:51:29 +00:00
if ( settings_push . replication_alter_partitions_sync = = 1 )
execution_mode = ClusterExecutionMode : : ON_EACH_SHARD ;
2020-03-20 12:18:26 +00:00
2020-03-16 21:05:38 +00:00
query_alter_ast_string + = " ALTER TABLE " + getQuotedTable ( original_table ) +
2020-11-21 04:32:29 +00:00
( ( partition_name = = " 'all' " ) ? " ATTACH PARTITION ID " : " ATTACH PARTITION " ) + partition_name +
2020-03-20 12:18:26 +00:00
" FROM " + getQuotedTable ( helping_table ) ;
2020-03-13 14:19:20 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Executing ALTER query: {} " , query_alter_ast_string ) ;
2020-03-13 14:19:20 +00:00
2020-03-16 21:05:38 +00:00
try
{
2021-03-19 12:51:29 +00:00
/// Try attach partition on each shard
UInt64 num_nodes = executeQueryOnCluster (
task_table . cluster_push ,
query_alter_ast_string ,
task_cluster - > settings_push ,
2021-04-22 18:04:32 +00:00
execution_mode ) ;
2021-03-19 12:51:29 +00:00
if ( settings_push . replication_alter_partitions_sync = = 1 )
{
2021-04-27 12:34:56 +00:00
LOG_INFO (
log ,
" Destination tables {} have been executed alter query successfully on {} shards of {} " ,
getQuotedTable ( task_table . table_push ) ,
num_nodes ,
task_table . cluster_push - > getShardCount ( ) ) ;
2021-03-19 12:51:29 +00:00
if ( num_nodes ! = task_table . cluster_push - > getShardCount ( ) )
return TaskStatus : : Error ;
}
else
{
LOG_INFO ( log , " Number of nodes that executed ALTER query successfully : {} " , toString ( num_nodes ) ) ;
}
2020-03-16 21:05:38 +00:00
}
catch ( . . . )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Error while moving partition {} piece {} to original table " , partition_name , toString ( current_piece_number ) ) ;
2021-06-03 23:58:47 +00:00
LOG_WARNING ( log , " In case of non-replicated tables it can cause duplicates. " ) ;
2020-03-16 21:05:38 +00:00
throw ;
}
2020-03-13 14:19:20 +00:00
2020-03-16 21:05:38 +00:00
if ( inject_fault )
throw Exception ( " Copy fault injection is activated " , ErrorCodes : : UNFINISHED ) ;
2020-03-13 14:19:20 +00:00
}
/// Create node to signal that we finished moving
{
String state_finished = TaskStateWithOwner : : getData ( TaskState : : Finished , host_id ) ;
zookeeper - > set ( current_partition_attach_is_done , state_finished , 0 ) ;
2021-04-29 19:16:51 +00:00
/// Also increment a counter of processed partitions
while ( true )
{
Coordination : : Stat stat ;
auto status_json = zookeeper - > get ( task_zookeeper_path + " /status " , & stat ) ;
auto statuses = StatusAccumulator : : fromJSON ( status_json ) ;
/// Increment status for table.
auto status_for_table = ( * statuses ) [ task_table . name_in_config ] ;
status_for_table . processed_partitions_count + = 1 ;
( * statuses ) [ task_table . name_in_config ] = status_for_table ;
auto statuses_to_commit = StatusAccumulator : : serializeToJSON ( statuses ) ;
auto error = zookeeper - > trySet ( task_zookeeper_path + " /status " , statuses_to_commit , stat . version , & stat ) ;
if ( error = = Coordination : : Error : : ZOK )
break ;
}
2020-03-13 14:19:20 +00:00
}
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2020-03-13 14:19:20 +00:00
}
2021-04-22 23:54:57 +00:00
/// This is needed to create internal Distributed table
/// Removes column's TTL expression from `CREATE` query
/// Removes MATEREALIZED or ALIAS columns not to copy additional and useless data over the network.
/// Removes data skipping indices.
2021-04-22 22:32:16 +00:00
ASTPtr ClusterCopier : : removeAliasMaterializedAndTTLColumnsFromCreateQuery ( const ASTPtr & query_ast , bool allow_to_copy_alias_and_materialized_columns )
2017-11-14 13:13:24 +00:00
{
2020-02-19 15:01:08 +00:00
const ASTs & column_asts = query_ast - > as < ASTCreateQuery & > ( ) . columns_list - > columns - > children ;
auto new_columns = std : : make_shared < ASTExpressionList > ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
for ( const ASTPtr & column_ast : column_asts )
{
const auto & column = column_ast - > as < ASTColumnDeclaration & > ( ) ;
2019-11-11 06:53:21 +00:00
2021-04-22 20:37:22 +00:00
/// Skip this columns
2021-04-22 22:32:16 +00:00
if ( ! column . default_specifier . empty ( ) & & ! allow_to_copy_alias_and_materialized_columns )
2020-02-19 15:01:08 +00:00
{
ColumnDefaultKind kind = columnDefaultKindFromString ( column . default_specifier ) ;
if ( kind = = ColumnDefaultKind : : Materialized | | kind = = ColumnDefaultKind : : Alias )
continue ;
}
2019-11-11 06:53:21 +00:00
2021-04-22 20:37:22 +00:00
/// Remove TTL on columns definition.
2021-04-22 22:32:16 +00:00
auto new_column_ast = column_ast - > clone ( ) ;
auto & new_column = new_column_ast - > as < ASTColumnDeclaration & > ( ) ;
if ( new_column . ttl )
new_column . ttl . reset ( ) ;
2021-04-22 20:37:22 +00:00
2021-04-22 22:32:16 +00:00
new_columns - > children . emplace_back ( new_column_ast ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
ASTPtr new_query_ast = query_ast - > clone ( ) ;
auto & new_query = new_query_ast - > as < ASTCreateQuery & > ( ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
auto new_columns_list = std : : make_shared < ASTColumns > ( ) ;
new_columns_list - > set ( new_columns_list - > columns , new_columns ) ;
2021-06-01 22:03:08 +00:00
/// Skip indices and projections are not needed, because distributed table doesn't support it.
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
new_query . replace ( new_query . columns_list , new_columns_list ) ;
return new_query_ast ;
2018-02-20 21:03:38 +00:00
}
2020-03-18 00:57:00 +00:00
/// Replaces ENGINE and table name in a create query
2020-03-18 19:04:42 +00:00
std : : shared_ptr < ASTCreateQuery > rewriteCreateQueryStorage ( const ASTPtr & create_query_ast ,
const DatabaseAndTableName & new_table ,
const ASTPtr & new_storage_ast )
2018-02-20 21:03:38 +00:00
{
2020-02-19 15:01:08 +00:00
const auto & create = create_query_ast - > as < ASTCreateQuery & > ( ) ;
auto res = std : : make_shared < ASTCreateQuery > ( create ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
if ( create . storage = = nullptr | | new_storage_ast = = nullptr )
throw Exception ( " Storage is not specified " , ErrorCodes : : LOGICAL_ERROR ) ;
2019-04-04 09:22:54 +00:00
2020-02-19 15:01:08 +00:00
res - > database = new_table . first ;
res - > table = new_table . second ;
2019-04-04 09:22:54 +00:00
2020-02-19 15:01:08 +00:00
res - > children . clear ( ) ;
res - > set ( res - > columns_list , create . columns_list - > clone ( ) ) ;
res - > set ( res - > storage , new_storage_ast - > clone ( ) ) ;
2021-04-22 18:04:32 +00:00
/// Just to make it better and don't store additional flag like `is_table_created` somewhere else
res - > if_not_exists = true ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
return res ;
2018-02-20 21:03:38 +00:00
}
2017-11-14 13:13:24 +00:00
2020-02-20 18:58:00 +00:00
bool ClusterCopier : : tryDropPartitionPiece (
ShardPartition & task_partition ,
const size_t current_piece_number ,
const zkutil : : ZooKeeperPtr & zookeeper ,
const CleanStateClock & clean_state_clock )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
if ( is_safe_mode )
throw Exception ( " DROP PARTITION is prohibited in safe mode " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
TaskTable & task_table = task_partition . task_shard . task_table ;
2020-02-20 18:58:00 +00:00
ShardPartitionPiece & partition_piece = task_partition . pieces [ current_piece_number ] ;
2017-10-13 19:13:41 +00:00
2020-02-20 18:58:00 +00:00
const String current_shards_path = partition_piece . getPartitionPieceShardsPath ( ) ;
const String current_partition_active_workers_dir = partition_piece . getPartitionPieceActiveWorkersPath ( ) ;
const String is_dirty_flag_path = partition_piece . getPartitionPieceIsDirtyPath ( ) ;
const String dirty_cleaner_path = partition_piece . getPartitionPieceCleanerPath ( ) ;
const String is_dirty_cleaned_path = partition_piece . getPartitionPieceIsCleanedPath ( ) ;
2018-02-07 13:02:47 +00:00
2020-02-19 15:01:08 +00:00
zkutil : : EphemeralNodeHolder : : Ptr cleaner_holder ;
try
2017-10-13 19:13:41 +00:00
{
2020-02-18 13:39:22 +00:00
cleaner_holder = zkutil : : EphemeralNodeHolder : : create ( dirty_cleaner_path , * zookeeper , host_id ) ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
catch ( const Coordination : : Exception & e )
2017-10-13 19:13:41 +00:00
{
2020-06-12 15:09:12 +00:00
if ( e . code = = Coordination : : Error : : ZNODEEXISTS )
2020-02-19 15:01:08 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} is cleaning now by somebody, sleep " , task_partition . name , toString ( current_piece_number ) ) ;
2020-02-19 15:01:08 +00:00
std : : this_thread : : sleep_for ( default_sleep_time ) ;
return false ;
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
throw ;
2017-10-13 19:13:41 +00:00
}
2018-01-11 12:23:59 +00:00
2020-02-18 13:39:22 +00:00
Coordination : : Stat stat { } ;
2020-02-19 15:01:08 +00:00
if ( zookeeper - > exists ( current_partition_active_workers_dir , & stat ) )
2018-01-11 12:23:59 +00:00
{
2020-02-19 15:01:08 +00:00
if ( stat . numChildren ! = 0 )
2018-01-25 12:18:27 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} contains {} active workers while trying to drop it. Going to sleep. " , task_partition . name , stat . numChildren ) ;
2020-02-19 15:01:08 +00:00
std : : this_thread : : sleep_for ( default_sleep_time ) ;
return false ;
2018-01-25 12:18:27 +00:00
}
else
{
2020-02-19 15:01:08 +00:00
zookeeper - > remove ( current_partition_active_workers_dir ) ;
2018-01-25 12:18:27 +00:00
}
2018-01-11 12:23:59 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
{
zkutil : : EphemeralNodeHolder : : Ptr active_workers_lock ;
try
{
active_workers_lock = zkutil : : EphemeralNodeHolder : : create ( current_partition_active_workers_dir , * zookeeper , host_id ) ;
}
catch ( const Coordination : : Exception & e )
{
2020-06-12 15:09:12 +00:00
if ( e . code = = Coordination : : Error : : ZNODEEXISTS )
2020-02-19 15:01:08 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} is being filled now by somebody, sleep " , task_partition . name ) ;
2020-02-19 15:01:08 +00:00
return false ;
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
throw ;
}
2017-11-14 13:13:24 +00:00
2020-02-19 15:01:08 +00:00
// Lock the dirty flag
zookeeper - > set ( is_dirty_flag_path , host_id , clean_state_clock . discovery_version . value ( ) ) ;
2020-02-20 18:58:00 +00:00
zookeeper - > tryRemove ( partition_piece . getPartitionPieceCleanStartPath ( ) ) ;
2020-02-18 13:39:22 +00:00
CleanStateClock my_clock ( zookeeper , is_dirty_flag_path , is_dirty_cleaned_path ) ;
2017-11-14 13:13:24 +00:00
2020-02-19 15:01:08 +00:00
/// Remove all status nodes
2017-11-14 13:13:24 +00:00
{
2020-02-19 15:01:08 +00:00
Strings children ;
2020-06-12 15:09:12 +00:00
if ( zookeeper - > tryGetChildren ( current_shards_path , children ) = = Coordination : : Error : : ZOK )
2020-02-19 15:01:08 +00:00
for ( const auto & child : children )
{
zookeeper - > removeRecursive ( current_shards_path + " / " + child ) ;
}
2017-11-14 13:13:24 +00:00
}
2020-03-12 19:46:48 +00:00
DatabaseAndTableName original_table = task_table . table_push ;
DatabaseAndTableName helping_table = DatabaseAndTableName ( original_table . first , original_table . second + " _piece_ " + toString ( current_piece_number ) ) ;
String query = " ALTER TABLE " + getQuotedTable ( helping_table ) ;
2020-11-21 04:32:29 +00:00
query + = ( ( task_partition . name = = " 'all' " ) ? " DROP PARTITION ID " : " DROP PARTITION " ) + task_partition . name + " " ;
2017-11-14 13:13:24 +00:00
2021-04-27 12:34:56 +00:00
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
2020-02-19 15:01:08 +00:00
ClusterPtr & cluster_push = task_table . cluster_push ;
Settings settings_push = task_cluster - > settings_push ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// It is important, DROP PARTITION must be done synchronously
settings_push . replication_alter_partitions_sync = 2 ;
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Execute distributed DROP PARTITION: {} " , query ) ;
2020-03-12 16:48:28 +00:00
/// We have to drop partition_piece on each replica
2020-03-17 18:07:54 +00:00
size_t num_shards = executeQueryOnCluster (
2020-03-10 20:04:08 +00:00
cluster_push , query ,
2020-05-30 17:53:55 +00:00
settings_push ,
2020-03-12 16:48:28 +00:00
ClusterExecutionMode : : ON_EACH_NODE ) ;
2017-10-13 19:13:41 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " DROP PARTITION was successfully executed on {} nodes of a cluster. " , num_shards ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Update the locking node
if ( ! my_clock . is_stale ( ) )
{
zookeeper - > set ( is_dirty_flag_path , host_id , my_clock . discovery_version . value ( ) ) ;
if ( my_clock . clean_state_version )
2020-02-18 13:39:22 +00:00
zookeeper - > set ( is_dirty_cleaned_path , host_id , my_clock . clean_state_version . value ( ) ) ;
2020-02-19 15:01:08 +00:00
else
2020-02-18 13:39:22 +00:00
zookeeper - > create ( is_dirty_cleaned_path , host_id , zkutil : : CreateMode : : Persistent ) ;
2020-02-19 15:01:08 +00:00
}
else
2017-11-14 17:45:15 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Clean state is altered when dropping the partition, cowardly bailing " ) ;
2020-02-19 15:01:08 +00:00
/// clean state is stale
return false ;
}
2017-10-13 19:13:41 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} was dropped on cluster {} " , task_partition . name , toString ( current_piece_number ) , task_table . cluster_push_name ) ;
2020-06-12 15:09:12 +00:00
if ( zookeeper - > tryCreate ( current_shards_path , host_id , zkutil : : CreateMode : : Persistent ) = = Coordination : : Error : : ZNODEEXISTS )
2020-02-19 15:01:08 +00:00
zookeeper - > set ( current_shards_path , host_id ) ;
}
2017-10-13 19:13:41 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} is safe for work now. " , task_partition . name , toString ( current_piece_number ) ) ;
2020-02-19 15:01:08 +00:00
return true ;
}
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
bool ClusterCopier : : tryProcessTable ( const ConnectionTimeouts & timeouts , TaskTable & task_table )
2017-10-13 19:13:41 +00:00
{
2021-03-23 07:04:25 +00:00
/// Create destination table
2021-03-19 12:51:29 +00:00
TaskStatus task_status = TaskStatus : : Error ;
task_status = tryCreateDestinationTable ( timeouts , task_table ) ;
/// Exit if success
if ( task_status ! = TaskStatus : : Finished )
{
LOG_WARNING ( log , " Create destination Tale Failed " ) ;
return false ;
}
2021-04-29 19:16:51 +00:00
/// Set all_partitions_count for table in Zookeeper
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
while ( true )
{
Coordination : : Stat stat ;
auto status_json = zookeeper - > get ( task_zookeeper_path + " /status " , & stat ) ;
auto statuses = StatusAccumulator : : fromJSON ( status_json ) ;
/// Exit if someone already set the initial value for this table.
if ( statuses - > find ( task_table . name_in_config ) ! = statuses - > end ( ) )
break ;
( * statuses ) [ task_table . name_in_config ] = StatusAccumulator : : TableStatus
{
/*all_partitions_count=*/ task_table . ordered_partition_names . size ( ) ,
/*processed_partition_count=*/ 0
} ;
auto statuses_to_commit = StatusAccumulator : : serializeToJSON ( statuses ) ;
auto error = zookeeper - > trySet ( task_zookeeper_path + " /status " , statuses_to_commit , stat . version ) ;
if ( error = = Coordination : : Error : : ZOK )
break ;
}
2020-02-19 15:01:08 +00:00
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Process each partition that is present in cluster
for ( const String & partition_name : task_table . ordered_partition_names )
{
if ( ! task_table . cluster_partitions . count ( partition_name ) )
throw Exception ( " There are no expected partition " + partition_name + " . It is a bug " , ErrorCodes : : LOGICAL_ERROR ) ;
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
ClusterPartition & cluster_partition = task_table . cluster_partitions [ partition_name ] ;
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
Stopwatch watch ;
2020-02-18 13:39:22 +00:00
/// We will check all the shards of the table and check if they contain current partition.
2020-02-19 15:01:08 +00:00
TasksShard expected_shards ;
UInt64 num_failed_shards = 0 ;
2019-12-02 02:05:30 +00:00
2020-02-19 15:01:08 +00:00
+ + cluster_partition . total_tries ;
2017-11-09 18:06:36 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Processing partition {} for the whole cluster " , partition_name ) ;
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
bool has_shard_to_process = false ;
for ( const TaskShardPtr & shard : task_table . all_shards )
2018-03-05 00:47:25 +00:00
{
2020-02-19 15:01:08 +00:00
/// Does shard have a node with current partition?
if ( shard - > partition_tasks . count ( partition_name ) = = 0 )
2018-03-05 00:47:25 +00:00
{
2020-02-19 15:01:08 +00:00
/// If not, did we check existence of that partition previously?
if ( shard - > checked_partitions . count ( partition_name ) = = 0 )
2018-03-05 00:47:25 +00:00
{
2020-02-19 15:01:08 +00:00
auto check_shard_has_partition = [ & ] ( ) { return checkShardHasPartition ( timeouts , * shard , partition_name ) ; } ;
bool has_partition = retry ( check_shard_has_partition ) ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
shard - > checked_partitions . emplace ( partition_name ) ;
2019-11-11 06:53:21 +00:00
2020-02-19 15:01:08 +00:00
if ( has_partition )
{
2020-03-12 19:46:48 +00:00
const size_t number_of_splits = task_table . number_of_splits ;
shard - > partition_tasks . emplace ( partition_name , ShardPartition ( * shard , partition_name , number_of_splits ) ) ;
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Discovered partition {} in shard {} " , partition_name , shard - > getDescription ( ) ) ;
2020-02-18 13:39:22 +00:00
/// To save references in the future.
auto shard_partition_it = shard - > partition_tasks . find ( partition_name ) ;
PartitionPieces & shard_partition_pieces = shard_partition_it - > second . pieces ;
for ( size_t piece_number = 0 ; piece_number < number_of_splits ; + + piece_number )
{
auto res = checkPresentPartitionPiecesOnCurrentShard ( timeouts , * shard , partition_name , piece_number ) ;
shard_partition_pieces . emplace_back ( shard_partition_it - > second , piece_number , res ) ;
}
2020-02-19 15:01:08 +00:00
}
else
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Found that shard {} does not contain current partition {} " , shard - > getDescription ( ) , partition_name ) ;
2020-02-19 15:01:08 +00:00
continue ;
}
}
else
2017-11-09 18:06:36 +00:00
{
2020-02-19 15:01:08 +00:00
/// We have already checked that partition, but did not discover it
previous_shard_is_instantly_finished = true ;
continue ;
2017-11-09 18:06:36 +00:00
}
2020-02-19 15:01:08 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
auto it_shard_partition = shard - > partition_tasks . find ( partition_name ) ;
2020-02-18 13:39:22 +00:00
/// Previously when we discovered that shard does not contain current partition, we skipped it.
/// At this moment partition have to be present.
2020-02-19 15:01:08 +00:00
if ( it_shard_partition = = shard - > partition_tasks . end ( ) )
2020-02-18 13:39:22 +00:00
throw Exception ( " There are no such partition in a shard. This is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-02-19 15:01:08 +00:00
auto & partition = it_shard_partition - > second ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
expected_shards . emplace_back ( shard ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Do not sleep if there is a sequence of already processed shards to increase startup
bool is_unprioritized_task = ! previous_shard_is_instantly_finished & & shard - > priority . is_remote ;
2021-03-19 12:51:29 +00:00
task_status = TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
bool was_error = false ;
has_shard_to_process = true ;
for ( UInt64 try_num = 0 ; try_num < max_shard_partition_tries ; + + try_num )
2017-11-14 16:59:45 +00:00
{
2020-02-19 15:01:08 +00:00
task_status = tryProcessPartitionTask ( timeouts , partition , is_unprioritized_task ) ;
2017-11-14 16:59:45 +00:00
2020-02-19 15:01:08 +00:00
/// Exit if success
2020-03-18 13:25:49 +00:00
if ( task_status = = TaskStatus : : Finished )
2020-02-19 15:01:08 +00:00
break ;
2017-11-14 16:59:45 +00:00
2020-02-19 15:01:08 +00:00
was_error = true ;
2017-11-14 16:59:45 +00:00
2020-02-19 15:01:08 +00:00
/// Skip if the task is being processed by someone
2020-03-18 13:25:49 +00:00
if ( task_status = = TaskStatus : : Active )
2020-02-19 15:01:08 +00:00
break ;
2017-11-14 16:59:45 +00:00
2020-02-19 15:01:08 +00:00
/// Repeat on errors
std : : this_thread : : sleep_for ( default_sleep_time ) ;
2017-11-14 16:59:45 +00:00
}
2020-02-19 15:01:08 +00:00
2020-03-18 13:25:49 +00:00
if ( task_status = = TaskStatus : : Error )
2020-02-19 15:01:08 +00:00
+ + num_failed_shards ;
previous_shard_is_instantly_finished = ! was_error ;
2017-11-14 16:59:45 +00:00
}
2020-02-19 15:01:08 +00:00
cluster_partition . elapsed_time_seconds + = watch . elapsedSeconds ( ) ;
/// Check that whole cluster partition is done
/// Firstly check the number of failed partition tasks, then look into ZooKeeper and ensure that each partition is done
2020-03-13 14:19:20 +00:00
bool partition_copying_is_done = num_failed_shards = = 0 ;
2020-02-19 15:01:08 +00:00
try
2017-10-13 19:13:41 +00:00
{
2020-03-13 14:19:20 +00:00
partition_copying_is_done =
2020-02-18 13:39:22 +00:00
! has_shard_to_process
2020-03-13 14:19:20 +00:00
| | ( partition_copying_is_done & & checkAllPiecesInPartitionAreDone ( task_table , partition_name , expected_shards ) ) ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
catch ( . . . )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
tryLogCurrentException ( log ) ;
2020-03-13 14:19:20 +00:00
partition_copying_is_done = false ;
}
bool partition_moving_is_done = false ;
/// Try to move only if all pieces were copied.
if ( partition_copying_is_done )
{
2020-03-16 21:05:38 +00:00
for ( UInt64 try_num = 0 ; try_num < max_shard_partition_piece_tries_for_alter ; + + try_num )
2020-03-13 14:19:20 +00:00
{
2020-03-16 21:05:38 +00:00
try
{
auto res = tryMoveAllPiecesToDestinationTable ( task_table , partition_name ) ;
2020-03-18 13:25:49 +00:00
/// Exit and mark current task is done.
if ( res = = TaskStatus : : Finished )
2020-03-16 21:05:38 +00:00
{
partition_moving_is_done = true ;
break ;
}
2020-03-18 13:25:49 +00:00
/// Exit if this task is active.
if ( res = = TaskStatus : : Active )
2020-03-17 16:23:47 +00:00
break ;
2020-03-16 21:05:38 +00:00
2020-03-18 13:25:49 +00:00
/// Repeat on errors.
2020-03-17 16:23:47 +00:00
std : : this_thread : : sleep_for ( default_sleep_time ) ;
2020-03-16 21:05:38 +00:00
}
2020-03-17 18:07:54 +00:00
catch ( . . . )
{
2020-08-08 01:21:04 +00:00
tryLogCurrentException ( log , " Some error occurred while moving pieces to destination table for partition " + partition_name ) ;
2020-03-16 21:05:38 +00:00
}
2020-03-13 14:19:20 +00:00
}
2017-10-13 19:13:41 +00:00
}
2020-03-13 14:19:20 +00:00
if ( partition_copying_is_done & & partition_moving_is_done )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
task_table . finished_cluster_partitions . emplace ( partition_name ) ;
2018-01-09 19:12:43 +00:00
2020-02-19 15:01:08 +00:00
task_table . bytes_copied + = cluster_partition . bytes_copied ;
task_table . rows_copied + = cluster_partition . rows_copied ;
double elapsed = cluster_partition . elapsed_time_seconds ;
2018-01-09 19:12:43 +00:00
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " It took {} seconds to copy partition {}: {} uncompressed bytes, {} rows and {} source blocks are copied " ,
2020-05-23 22:21:29 +00:00
elapsed , partition_name ,
formatReadableSizeWithDecimalSuffix ( cluster_partition . bytes_copied ) ,
formatReadableQuantity ( cluster_partition . rows_copied ) ,
cluster_partition . blocks_copied ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( cluster_partition . rows_copied )
2017-11-09 18:06:36 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Average partition speed: {} per second. " , formatReadableSizeWithDecimalSuffix ( cluster_partition . bytes_copied / elapsed ) ) ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( task_table . rows_copied )
2017-10-13 19:13:41 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Average table {} speed: {} per second. " , task_table . table_id , formatReadableSizeWithDecimalSuffix ( task_table . bytes_copied / elapsed ) ) ;
2020-02-19 15:01:08 +00:00
}
}
}
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
UInt64 required_partitions = task_table . cluster_partitions . size ( ) ;
UInt64 finished_partitions = task_table . finished_cluster_partitions . size ( ) ;
bool table_is_done = finished_partitions > = required_partitions ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
if ( ! table_is_done )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Table {} is not processed yet.Copied {} of {}, will retry " , task_table . table_id , finished_partitions , required_partitions ) ;
2020-02-19 15:01:08 +00:00
}
2021-01-28 02:06:31 +00:00
else
{
/// Delete helping tables in case that whole table is done
dropHelpingTables ( task_table ) ;
}
2018-03-14 19:07:57 +00:00
2020-02-19 15:01:08 +00:00
return table_is_done ;
}
2017-11-09 18:06:36 +00:00
2021-03-19 12:51:29 +00:00
TaskStatus ClusterCopier : : tryCreateDestinationTable ( const ConnectionTimeouts & timeouts , TaskTable & task_table )
{
/// Try create original table (if not exists) on each shard
//TaskTable & task_table = task_shard.task_table;
const TaskShardPtr task_shard = task_table . all_shards . at ( 0 ) ;
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard - > current_pull_table_create_query = getCreateTableForPullShard ( timeouts , * task_shard ) ;
try
{
auto create_query_push_ast
= rewriteCreateQueryStorage ( task_shard - > current_pull_table_create_query , task_table . table_push , task_table . engine_push_ast ) ;
auto & create = create_query_push_ast - > as < ASTCreateQuery & > ( ) ;
create . if_not_exists = true ;
2021-04-10 23:33:54 +00:00
InterpreterCreateQuery : : prepareOnClusterQuery ( create , getContext ( ) , task_table . cluster_push_name ) ;
2021-03-19 12:51:29 +00:00
String query = queryToString ( create_query_push_ast ) ;
2021-04-29 19:33:34 +00:00
LOG_INFO ( log , " Create destination tables. Query: \n {} " , query ) ;
2021-06-03 23:58:47 +00:00
UInt64 shards = executeQueryOnCluster ( task_table . cluster_push , query , task_cluster - > settings_push , ClusterExecutionMode : : ON_EACH_NODE ) ;
2021-03-19 12:51:29 +00:00
LOG_INFO (
log ,
" Destination tables {} have been created on {} shards of {} " ,
getQuotedTable ( task_table . table_push ) ,
shards ,
task_table . cluster_push - > getShardCount ( ) ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " Error while creating original table. Maybe we are not first. " ) ;
}
return TaskStatus : : Finished ;
}
2020-02-18 13:39:22 +00:00
/// Job for copying partition from particular shard.
2020-03-18 13:25:49 +00:00
TaskStatus ClusterCopier : : tryProcessPartitionTask ( const ConnectionTimeouts & timeouts , ShardPartition & task_partition , bool is_unprioritized_task )
2020-02-19 15:01:08 +00:00
{
2020-03-18 13:25:49 +00:00
TaskStatus res ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
try
{
2020-02-18 13:39:22 +00:00
res = iterateThroughAllPiecesInPartition ( timeouts , task_partition , is_unprioritized_task ) ;
2020-02-19 15:01:08 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred while processing partition " + task_partition . name ) ;
2020-03-18 13:25:49 +00:00
res = TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
/// At the end of each task check if the config is updated
try
{
updateConfigIfNeeded ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred while updating the config " ) ;
}
2018-03-26 18:39:28 +00:00
2020-02-19 15:01:08 +00:00
return res ;
}
2017-11-09 18:06:36 +00:00
2020-03-18 13:25:49 +00:00
TaskStatus ClusterCopier : : iterateThroughAllPiecesInPartition ( const ConnectionTimeouts & timeouts , ShardPartition & task_partition ,
2020-02-18 13:39:22 +00:00
bool is_unprioritized_task )
{
const size_t total_number_of_pieces = task_partition . task_shard . task_table . number_of_splits ;
2020-03-18 13:25:49 +00:00
TaskStatus res { TaskStatus : : Finished } ;
2020-03-07 00:05:49 +00:00
bool was_failed_pieces = false ;
bool was_active_pieces = false ;
2020-02-18 13:39:22 +00:00
2020-02-25 12:38:11 +00:00
for ( size_t piece_number = 0 ; piece_number < total_number_of_pieces ; piece_number + + )
{
2020-03-07 00:05:49 +00:00
for ( UInt64 try_num = 0 ; try_num < max_shard_partition_tries ; + + try_num )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Attempt number {} to process partition {} piece number {} on shard number {} with index {}. " ,
2020-05-23 22:21:29 +00:00
try_num , task_partition . name , piece_number ,
task_partition . task_shard . numberInCluster ( ) ,
task_partition . task_shard . indexInCluster ( ) ) ;
2020-03-07 00:05:49 +00:00
res = processPartitionPieceTaskImpl ( timeouts , task_partition , piece_number , is_unprioritized_task ) ;
/// Exit if success
2020-03-18 13:25:49 +00:00
if ( res = = TaskStatus : : Finished )
2020-03-07 00:05:49 +00:00
break ;
/// Skip if the task is being processed by someone
2020-03-18 13:25:49 +00:00
if ( res = = TaskStatus : : Active )
2020-03-07 00:05:49 +00:00
break ;
/// Repeat on errors
std : : this_thread : : sleep_for ( default_sleep_time ) ;
}
2020-03-18 13:25:49 +00:00
was_active_pieces = ( res = = TaskStatus : : Active ) ;
was_failed_pieces = ( res = = TaskStatus : : Error ) ;
2020-02-25 12:38:11 +00:00
}
2020-02-18 13:39:22 +00:00
2020-03-07 00:05:49 +00:00
if ( was_failed_pieces )
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-03-07 00:05:49 +00:00
if ( was_active_pieces )
2020-03-18 13:25:49 +00:00
return TaskStatus : : Active ;
2020-03-07 00:05:49 +00:00
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2020-02-18 13:39:22 +00:00
}
2020-02-20 17:26:20 +00:00
2020-03-18 13:25:49 +00:00
TaskStatus ClusterCopier : : processPartitionPieceTaskImpl (
2020-02-20 17:26:20 +00:00
const ConnectionTimeouts & timeouts , ShardPartition & task_partition ,
const size_t current_piece_number , bool is_unprioritized_task )
2020-02-19 15:01:08 +00:00
{
TaskShard & task_shard = task_partition . task_shard ;
TaskTable & task_table = task_shard . task_table ;
2020-02-20 17:26:20 +00:00
ClusterPartition & cluster_partition = task_table . getClusterPartition ( task_partition . name ) ;
2020-02-18 13:39:22 +00:00
ShardPartitionPiece & partition_piece = task_partition . pieces [ current_piece_number ] ;
const size_t number_of_splits = task_table . number_of_splits ;
const String primary_key_comma_separated = task_table . primary_key_comma_separated ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
/// We need to update table definitions for each partition, it could be changed after ALTER
2020-02-21 16:00:50 +00:00
createShardInternalTables ( timeouts , task_shard , true ) ;
auto split_table_for_current_piece = task_shard . list_of_split_tables_on_shard [ current_piece_number ] ;
2018-02-07 13:02:47 +00:00
2021-04-10 23:33:54 +00:00
auto zookeeper = getContext ( ) - > getZooKeeper ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-20 17:26:20 +00:00
const String piece_is_dirty_flag_path = partition_piece . getPartitionPieceIsDirtyPath ( ) ;
const String piece_is_dirty_cleaned_path = partition_piece . getPartitionPieceIsCleanedPath ( ) ;
2020-02-18 13:39:22 +00:00
const String current_task_piece_is_active_path = partition_piece . getActiveWorkerPath ( ) ;
2020-02-20 17:26:20 +00:00
const String current_task_piece_status_path = partition_piece . getShardStatusPath ( ) ;
2018-01-09 19:12:43 +00:00
2020-02-19 15:01:08 +00:00
/// Auxiliary functions:
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Creates is_dirty node to initialize DROP PARTITION
2020-02-20 17:26:20 +00:00
auto create_is_dirty_node = [ & ] ( const CleanStateClock & clock )
2020-02-19 15:01:08 +00:00
{
if ( clock . is_stale ( ) )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Clean state clock is stale while setting dirty flag, cowardly bailing " ) ;
2020-02-19 15:01:08 +00:00
else if ( ! clock . is_clean ( ) )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Thank you, Captain Obvious " ) ;
2020-02-19 15:01:08 +00:00
else if ( clock . discovery_version )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Updating clean state clock " ) ;
2020-02-18 13:39:22 +00:00
zookeeper - > set ( piece_is_dirty_flag_path , host_id , clock . discovery_version . value ( ) ) ;
2020-02-19 15:01:08 +00:00
}
else
2017-10-13 19:13:41 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Creating clean state clock " ) ;
2020-02-18 13:39:22 +00:00
zookeeper - > create ( piece_is_dirty_flag_path , host_id , zkutil : : CreateMode : : Persistent ) ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Returns SELECT query filtering current partition and applying user filter
2020-02-20 17:26:20 +00:00
auto get_select_query = [ & ] ( const DatabaseAndTableName & from_table , const String & fields , bool enable_splitting , String limit = " " )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
String query ;
2021-09-11 21:09:34 +00:00
query + = " WITH " + task_partition . name + " AS partition_key " ;
2020-02-19 15:01:08 +00:00
query + = " SELECT " + fields + " FROM " + getQuotedTable ( from_table ) ;
2020-04-21 17:37:40 +00:00
2020-04-22 04:38:39 +00:00
if ( enable_splitting & & experimental_use_sample_offset )
query + = " SAMPLE 1/ " + toString ( number_of_splits ) + " OFFSET " + toString ( current_piece_number ) + " / " + toString ( number_of_splits ) ;
2020-04-21 17:37:40 +00:00
2020-02-19 15:01:08 +00:00
/// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
2021-09-11 21:09:34 +00:00
query + = " WHERE ( " + queryToString ( task_table . engine_push_partition_key_ast ) + " = partition_key) " ;
2020-02-18 13:39:22 +00:00
2020-04-22 04:38:39 +00:00
if ( enable_splitting & & ! experimental_use_sample_offset )
query + = " AND ( cityHash64( " + primary_key_comma_separated + " ) % " + toString ( number_of_splits ) + " = " + toString ( current_piece_number ) + " ) " ;
2020-02-18 13:39:22 +00:00
2020-02-19 15:01:08 +00:00
if ( ! task_table . where_condition_str . empty ( ) )
query + = " AND ( " + task_table . where_condition_str + " ) " ;
2020-04-21 17:37:40 +00:00
2020-02-19 15:01:08 +00:00
if ( ! limit . empty ( ) )
query + = " LIMIT " + limit ;
2017-10-13 19:13:41 +00:00
2021-04-22 23:54:57 +00:00
query + = " FORMAT Native " ;
2020-02-19 15:01:08 +00:00
ParserQuery p_query ( query . data ( ) + query . size ( ) ) ;
2020-04-15 20:28:05 +00:00
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
return parseQuery ( p_query , query , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-19 15:01:08 +00:00
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Load balancing
2020-02-18 13:39:22 +00:00
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed ( zookeeper , current_task_piece_status_path , is_unprioritized_task ) ;
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Processing {} " , current_task_piece_status_path ) ;
2017-10-13 19:13:41 +00:00
2020-02-21 16:00:50 +00:00
const String piece_status_path = partition_piece . getPartitionPieceShardsPath ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-21 16:00:50 +00:00
CleanStateClock clean_state_clock ( zookeeper , piece_is_dirty_flag_path , piece_is_dirty_cleaned_path ) ;
const bool is_clean = checkPartitionPieceIsClean ( zookeeper , clean_state_clock , piece_status_path ) ;
2017-10-13 19:13:41 +00:00
2020-02-20 17:26:20 +00:00
/// Do not start if partition piece is dirty, try to clean it
2020-02-21 16:00:50 +00:00
if ( is_clean )
2018-02-20 21:03:38 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} appears to be clean " , task_partition . name , current_piece_number ) ;
2020-02-18 13:39:22 +00:00
zookeeper - > createAncestors ( current_task_piece_status_path ) ;
2018-02-20 21:03:38 +00:00
}
2020-02-19 15:01:08 +00:00
else
2018-02-20 21:03:38 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} is dirty, try to drop it " , task_partition . name , current_piece_number ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
try
{
2020-02-20 18:58:00 +00:00
tryDropPartitionPiece ( task_partition , current_piece_number , zookeeper , clean_state_clock ) ;
2020-02-19 15:01:08 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred when clean partition " ) ;
}
2018-02-20 21:03:38 +00:00
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2018-02-20 21:03:38 +00:00
}
2020-02-19 15:01:08 +00:00
/// Create ephemeral node to mark that we are active and process the partition
2020-02-18 13:39:22 +00:00
zookeeper - > createAncestors ( current_task_piece_is_active_path ) ;
2020-02-19 15:01:08 +00:00
zkutil : : EphemeralNodeHolderPtr partition_task_node_holder ;
try
2017-10-13 19:13:41 +00:00
{
2020-02-18 13:39:22 +00:00
partition_task_node_holder = zkutil : : EphemeralNodeHolder : : create ( current_task_piece_is_active_path , * zookeeper , host_id ) ;
2020-02-19 15:01:08 +00:00
}
catch ( const Coordination : : Exception & e )
{
2020-06-12 15:09:12 +00:00
if ( e . code = = Coordination : : Error : : ZNODEEXISTS )
2017-10-13 19:13:41 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Someone is already processing {} " , current_task_piece_is_active_path ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Active ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
throw ;
}
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
/// Exit if task has been already processed;
/// create blocking node to signal cleaning up if it is abandoned
{
String status_data ;
2020-02-18 13:39:22 +00:00
if ( zookeeper - > tryGet ( current_task_piece_status_path , status_data ) )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
TaskStateWithOwner status = TaskStateWithOwner : : fromString ( status_data ) ;
if ( status . state = = TaskState : : Finished )
2017-10-13 19:13:41 +00:00
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Task {} has been successfully executed by {} " , current_task_piece_status_path , status . owner ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2017-10-13 19:13:41 +00:00
}
2020-02-20 17:26:20 +00:00
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Task {} has not been successfully finished by {}. Partition will be dropped and refilled. " , current_task_piece_status_path , status . owner ) ;
2018-02-20 21:03:38 +00:00
2020-02-19 15:01:08 +00:00
create_is_dirty_node ( clean_state_clock ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
2017-10-13 19:13:41 +00:00
}
2020-03-10 20:04:08 +00:00
2021-04-22 18:04:32 +00:00
/// Try create table (if not exists) on each shard
/// We have to create this table even in case that partition piece is empty
2021-06-03 23:58:47 +00:00
/// This is significant, because we will have simpler code
2021-04-22 18:04:32 +00:00
{
/// 1) Get columns description from any replica of destination cluster
/// 2) Change ENGINE, database and table name
/// 3) Create helping table on the whole destination cluster
auto & settings_push = task_cluster - > settings_push ;
auto connection = task_table . cluster_push - > getAnyShardInfo ( ) . pool - > get ( timeouts , & settings_push , true ) ;
String create_query = getRemoteCreateTable ( task_shard . task_table . table_push , * connection , settings_push ) ;
2021-04-29 22:56:41 +00:00
2021-04-22 18:04:32 +00:00
ParserCreateQuery parser_create_query ;
auto create_query_ast = parseQuery ( parser_create_query , create_query , settings_push . max_query_size , settings_push . max_parser_depth ) ;
/// Define helping table database and name for current partition piece
2021-06-03 23:58:47 +00:00
DatabaseAndTableName database_and_table_for_current_piece
{
task_table . table_push . first ,
task_table . table_push . second + " _piece_ " + toString ( current_piece_number )
} ;
2021-04-22 18:04:32 +00:00
2021-04-29 22:56:41 +00:00
2021-04-22 18:04:32 +00:00
auto new_engine_push_ast = task_table . engine_push_ast ;
if ( task_table . isReplicatedTable ( ) )
new_engine_push_ast = task_table . rewriteReplicatedCreateQueryToPlain ( ) ;
/// Take columns definition from destination table, new database and table name, and new engine (non replicated variant of MergeTree)
auto create_query_push_ast = rewriteCreateQueryStorage ( create_query_ast , database_and_table_for_current_piece , new_engine_push_ast ) ;
String query = queryToString ( create_query_push_ast ) ;
2021-06-03 15:50:44 +00:00
LOG_INFO ( log , " Create destination tables. Query: \n {} " , query ) ;
2021-06-03 23:58:47 +00:00
UInt64 shards = executeQueryOnCluster ( task_table . cluster_push , query , task_cluster - > settings_push , ClusterExecutionMode : : ON_EACH_NODE ) ;
2021-04-22 18:04:32 +00:00
LOG_INFO (
log ,
" Destination tables {} have been created on {} shards of {} " ,
getQuotedTable ( task_table . table_push ) ,
shards ,
task_table . cluster_push - > getShardCount ( ) ) ;
}
2020-03-10 20:04:08 +00:00
/// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
/// whether each shard have processed each partitition (and its pieces).
if ( partition_piece . is_absent_piece )
{
String state_finished = TaskStateWithOwner : : getData ( TaskState : : Finished , host_id ) ;
auto res = zookeeper - > tryCreate ( current_task_piece_status_path , state_finished , zkutil : : CreateMode : : Persistent ) ;
2020-06-12 15:09:12 +00:00
if ( res = = Coordination : : Error : : ZNODEEXISTS )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done. " , task_partition . name , current_piece_number ) ;
2020-06-12 15:09:12 +00:00
if ( res = = Coordination : : Error : : ZOK )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same. " , task_partition . name , current_piece_number ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2020-03-10 20:04:08 +00:00
}
2020-02-19 15:01:08 +00:00
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
String clean_start_status ;
2020-02-20 17:26:20 +00:00
if ( ! zookeeper - > tryGet ( partition_piece . getPartitionPieceCleanStartPath ( ) , clean_start_status ) | | clean_start_status ! = " ok " )
2018-03-05 00:47:25 +00:00
{
2020-02-20 17:26:20 +00:00
zookeeper - > createIfNotExists ( partition_piece . getPartitionPieceCleanStartPath ( ) , " " ) ;
auto checker = zkutil : : EphemeralNodeHolder : : create ( partition_piece . getPartitionPieceCleanStartPath ( ) + " /checker " ,
2020-02-18 13:39:22 +00:00
* zookeeper , host_id ) ;
2020-02-19 15:01:08 +00:00
// Maybe we are the first worker
2020-02-21 16:00:50 +00:00
ASTPtr query_select_ast = get_select_query ( split_table_for_current_piece , " count() " , /*enable_splitting*/ true ) ;
2020-02-19 15:01:08 +00:00
UInt64 count ;
{
2021-04-10 23:33:54 +00:00
auto local_context = Context : : createCopy ( context ) ;
2020-02-19 15:01:08 +00:00
// Use pull (i.e. readonly) settings, but fetch data from destination servers
2021-04-10 23:33:54 +00:00
local_context - > setSettings ( task_cluster - > settings_pull ) ;
local_context - > setSetting ( " skip_unavailable_shards " , true ) ;
2020-02-19 15:01:08 +00:00
2021-09-16 17:40:42 +00:00
Block block = getBlockWithAllStreamData ( InterpreterFactory : : get ( query_select_ast , local_context ) - > execute ( ) . pipeline ) ;
2020-02-19 15:01:08 +00:00
count = ( block ) ? block . safeGetByPosition ( 0 ) . column - > getUInt ( 0 ) : 0 ;
}
2019-08-20 18:05:01 +00:00
2020-02-19 15:01:08 +00:00
if ( count ! = 0 )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {}is not empty. In contains {} rows. " , task_partition . name , current_piece_number , count ) ;
2020-02-18 13:39:22 +00:00
Coordination : : Stat stat_shards { } ;
2020-02-20 17:26:20 +00:00
zookeeper - > get ( partition_piece . getPartitionPieceShardsPath ( ) , & stat_shards ) ;
2018-03-05 00:47:25 +00:00
2020-02-19 15:01:08 +00:00
/// NOTE: partition is still fresh if dirt discovery happens before cleaning
if ( stat_shards . numChildren = = 0 )
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " There are no workers for partition {} piece {}, but destination table contains {} rows. Partition will be dropped and refilled. " , task_partition . name , toString ( current_piece_number ) , count ) ;
2018-03-05 00:47:25 +00:00
2020-02-19 15:01:08 +00:00
create_is_dirty_node ( clean_state_clock ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
}
2020-02-20 17:26:20 +00:00
zookeeper - > set ( partition_piece . getPartitionPieceCleanStartPath ( ) , " ok " ) ;
2020-02-19 15:01:08 +00:00
}
/// At this point, we need to sync that the destination table is clean
/// before any actual work
2018-03-05 00:47:25 +00:00
2020-02-19 15:01:08 +00:00
/// Try start processing, create node about it
{
String start_state = TaskStateWithOwner : : getData ( TaskState : : Started , host_id ) ;
2020-07-31 21:23:16 +00:00
CleanStateClock new_clean_state_clock ( zookeeper , piece_is_dirty_flag_path , piece_is_dirty_cleaned_path ) ;
2020-02-19 15:01:08 +00:00
if ( clean_state_clock ! = new_clean_state_clock )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} clean state changed, cowardly bailing " , task_partition . name , toString ( current_piece_number ) ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
else if ( ! new_clean_state_clock . is_clean ( ) )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} is dirty and will be dropped and refilled " , task_partition . name , toString ( current_piece_number ) ) ;
2020-02-19 15:01:08 +00:00
create_is_dirty_node ( new_clean_state_clock ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
2020-02-18 13:39:22 +00:00
zookeeper - > create ( current_task_piece_status_path , start_state , zkutil : : CreateMode : : Persistent ) ;
2018-03-05 00:47:25 +00:00
}
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
/// Do the copying
{
bool inject_fault = false ;
if ( copy_fault_probability > 0 )
2017-11-09 18:06:36 +00:00
{
2020-02-19 15:01:08 +00:00
double value = std : : uniform_real_distribution < > ( 0 , 1 ) ( task_table . task_cluster . random_engine ) ;
inject_fault = value < copy_fault_probability ;
2017-11-09 18:06:36 +00:00
}
2020-02-19 15:01:08 +00:00
// Select all fields
2020-03-11 19:55:27 +00:00
ASTPtr query_select_ast = get_select_query ( task_shard . table_read_shard , " * " , /*enable_splitting*/ true , inject_fault ? " 1 " : " " ) ;
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Executing SELECT query and pull from {} : {} " , task_shard . getDescription ( ) , queryToString ( query_select_ast ) ) ;
2020-02-19 15:01:08 +00:00
ASTPtr query_insert_ast ;
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
String query ;
2021-04-22 23:54:57 +00:00
query + = " INSERT INTO " + getQuotedTable ( split_table_for_current_piece ) + " FORMAT Native " ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
ParserQuery p_query ( query . data ( ) + query . size ( ) ) ;
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
query_insert_ast = parseQuery ( p_query , query , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-19 15:01:08 +00:00
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Executing INSERT query: {} " , query ) ;
2020-02-19 15:01:08 +00:00
}
try
{
2021-04-10 23:33:54 +00:00
auto context_select = Context : : createCopy ( context ) ;
2020-08-01 22:43:43 +00:00
context_select - > setSettings ( task_cluster - > settings_pull ) ;
2021-04-10 23:33:54 +00:00
auto context_insert = Context : : createCopy ( context ) ;
2020-08-01 22:43:43 +00:00
context_insert - > setSettings ( task_cluster - > settings_push ) ;
2020-02-19 15:01:08 +00:00
/// Custom INSERT SELECT implementation
2021-09-16 17:40:42 +00:00
QueryPipeline input ;
QueryPipeline output ;
2017-10-13 19:13:41 +00:00
{
2021-04-10 23:33:54 +00:00
BlockIO io_select = InterpreterFactory : : get ( query_select_ast , context_select ) - > execute ( ) ;
BlockIO io_insert = InterpreterFactory : : get ( query_insert_ast , context_insert ) - > execute ( ) ;
2017-11-09 18:06:36 +00:00
2021-09-16 17:40:42 +00:00
output = std : : move ( io_insert . pipeline ) ;
2021-04-22 18:04:32 +00:00
/// Add converting actions to make it possible to copy blocks with slightly different schema
2021-09-03 17:29:36 +00:00
const auto & select_block = io_select . pipeline . getHeader ( ) ;
2021-09-16 17:40:42 +00:00
const auto & insert_block = output . getHeader ( ) ;
2021-04-22 18:04:32 +00:00
auto actions_dag = ActionsDAG : : makeConvertingActions (
select_block . getColumnsWithTypeAndName ( ) ,
insert_block . getColumnsWithTypeAndName ( ) ,
ActionsDAG : : MatchColumnsMode : : Position ) ;
auto actions = std : : make_shared < ExpressionActions > ( actions_dag , ExpressionActionsSettings : : fromContext ( getContext ( ) ) ) ;
2021-09-16 17:40:42 +00:00
QueryPipelineBuilder builder ;
builder . init ( std : : move ( io_select . pipeline ) ) ;
builder . addSimpleTransform ( [ & ] ( const Block & header )
2021-09-03 17:29:36 +00:00
{
return std : : make_shared < ExpressionTransform > ( header , actions ) ;
} ) ;
2021-09-16 17:40:42 +00:00
input = QueryPipelineBuilder : : getPipeline ( std : : move ( builder ) ) ;
2017-10-13 19:13:41 +00:00
}
2020-02-19 15:01:08 +00:00
/// Fail-fast optimization to abort copying when the current clean state expires
std : : future < Coordination : : ExistsResponse > future_is_dirty_checker ;
Stopwatch watch ( CLOCK_MONOTONIC_COARSE ) ;
constexpr UInt64 check_period_milliseconds = 500 ;
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copying data
auto cancel_check = [ & ] ( )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
if ( zookeeper - > expired ( ) )
throw Exception ( " ZooKeeper session is expired, cancel INSERT SELECT " , ErrorCodes : : UNFINISHED ) ;
2018-02-07 13:02:47 +00:00
2020-02-19 15:01:08 +00:00
if ( ! future_is_dirty_checker . valid ( ) )
2020-02-18 13:39:22 +00:00
future_is_dirty_checker = zookeeper - > asyncExists ( piece_is_dirty_flag_path ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// check_period_milliseconds should less than average insert time of single block
/// Otherwise, the insertion will slow a little bit
if ( watch . elapsedMilliseconds ( ) > = check_period_milliseconds )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
Coordination : : ExistsResponse status = future_is_dirty_checker . get ( ) ;
2017-11-09 18:06:36 +00:00
2020-06-12 15:09:12 +00:00
if ( status . error ! = Coordination : : Error : : ZNONODE )
2018-02-13 18:42:59 +00:00
{
2020-02-19 15:01:08 +00:00
LogicalClock dirt_discovery_epoch ( status . stat . mzxid ) ;
if ( dirt_discovery_epoch = = clean_state_clock . discovery_zxid )
return false ;
throw Exception ( " Partition is dirty, cancel INSERT SELECT " , ErrorCodes : : UNFINISHED ) ;
2017-10-13 19:13:41 +00:00
}
}
2020-02-19 15:01:08 +00:00
return false ;
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Update statistics
/// It is quite rough: bytes_copied don't take into account DROP PARTITION.
auto update_stats = [ & cluster_partition ] ( const Block & block )
{
cluster_partition . bytes_copied + = block . bytes ( ) ;
cluster_partition . rows_copied + = block . rows ( ) ;
cluster_partition . blocks_copied + = 1 ;
} ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
/// Main work is here
2021-09-03 17:29:36 +00:00
PullingPipelineExecutor pulling_executor ( input ) ;
PushingPipelineExecutor pushing_executor ( output ) ;
Block data ;
bool is_cancelled = false ;
while ( pulling_executor . pull ( data ) )
{
if ( cancel_check ( ) )
{
is_cancelled = true ;
pushing_executor . cancel ( ) ;
pushing_executor . cancel ( ) ;
break ;
}
pushing_executor . push ( data ) ;
update_stats ( data ) ;
}
if ( ! is_cancelled )
pushing_executor . finish ( ) ;
2017-11-09 18:06:36 +00:00
2020-02-19 15:01:08 +00:00
// Just in case
if ( future_is_dirty_checker . valid ( ) )
future_is_dirty_checker . get ( ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
if ( inject_fault )
throw Exception ( " Copy fault injection is activated " , ErrorCodes : : UNFINISHED ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred during copying, partition will be marked as dirty " ) ;
2020-03-12 16:48:28 +00:00
create_is_dirty_node ( clean_state_clock ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
2017-10-13 19:13:41 +00:00
}
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} copied. But not moved to original destination table. " , task_partition . name , toString ( current_piece_number ) ) ;
2020-02-21 16:00:50 +00:00
2020-02-19 15:01:08 +00:00
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
{
String state_finished = TaskStateWithOwner : : getData ( TaskState : : Finished , host_id ) ;
2020-02-18 13:39:22 +00:00
CleanStateClock new_clean_state_clock ( zookeeper , piece_is_dirty_flag_path , piece_is_dirty_cleaned_path ) ;
2020-02-19 15:01:08 +00:00
if ( clean_state_clock ! = new_clean_state_clock )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} clean state changed, cowardly bailing " , task_partition . name , toString ( current_piece_number ) ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
else if ( ! new_clean_state_clock . is_clean ( ) )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Partition {} piece {} became dirty and will be dropped and refilled " , task_partition . name , toString ( current_piece_number ) ) ;
2020-02-19 15:01:08 +00:00
create_is_dirty_node ( new_clean_state_clock ) ;
2020-03-18 13:25:49 +00:00
return TaskStatus : : Error ;
2020-02-19 15:01:08 +00:00
}
2020-02-18 13:39:22 +00:00
zookeeper - > set ( current_task_piece_status_path , state_finished , 0 ) ;
2020-02-19 15:01:08 +00:00
}
2018-02-13 18:42:59 +00:00
2020-03-18 13:25:49 +00:00
return TaskStatus : : Finished ;
2020-02-19 15:01:08 +00:00
}
2018-01-09 19:12:43 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : dropAndCreateLocalTable ( const ASTPtr & create_ast )
{
const auto & create = create_ast - > as < ASTCreateQuery & > ( ) ;
dropLocalTableIfExists ( { create . database , create . table } ) ;
2017-10-13 19:13:41 +00:00
2021-04-10 23:33:54 +00:00
InterpreterCreateQuery interpreter ( create_ast , getContext ( ) ) ;
2020-02-19 15:01:08 +00:00
interpreter . execute ( ) ;
}
2018-02-13 18:42:59 +00:00
2020-02-19 15:01:08 +00:00
void ClusterCopier : : dropLocalTableIfExists ( const DatabaseAndTableName & table_name ) const
{
auto drop_ast = std : : make_shared < ASTDropQuery > ( ) ;
drop_ast - > if_exists = true ;
drop_ast - > database = table_name . first ;
drop_ast - > table = table_name . second ;
2017-11-09 18:06:36 +00:00
2021-04-10 23:33:54 +00:00
InterpreterDropQuery interpreter ( drop_ast , getContext ( ) ) ;
2020-02-19 15:01:08 +00:00
interpreter . execute ( ) ;
}
2017-10-13 19:13:41 +00:00
2021-03-19 12:51:29 +00:00
void ClusterCopier : : dropHelpingTablesByPieceNumber ( const TaskTable & task_table , size_t current_piece_number )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Removing helping tables piece {} " , current_piece_number ) ;
2021-03-19 12:51:29 +00:00
DatabaseAndTableName original_table = task_table . table_push ;
DatabaseAndTableName helping_table
= DatabaseAndTableName ( original_table . first , original_table . second + " _piece_ " + toString ( current_piece_number ) ) ;
String query = " DROP TABLE IF EXISTS " + getQuotedTable ( helping_table ) ;
const ClusterPtr & cluster_push = task_table . cluster_push ;
Settings settings_push = task_cluster - > settings_push ;
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Execute distributed DROP TABLE: {} " , query ) ;
2021-03-19 12:51:29 +00:00
/// We have to drop partition_piece on each replica
2021-04-22 18:04:32 +00:00
UInt64 num_nodes = executeQueryOnCluster ( cluster_push , query , settings_push , ClusterExecutionMode : : ON_EACH_NODE ) ;
2021-03-19 12:51:29 +00:00
LOG_INFO ( log , " DROP TABLE query was successfully executed on {} nodes. " , toString ( num_nodes ) ) ;
}
2020-03-13 16:25:07 +00:00
void ClusterCopier : : dropHelpingTables ( const TaskTable & task_table )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Removing helping tables " ) ;
2020-03-13 16:25:07 +00:00
for ( size_t current_piece_number = 0 ; current_piece_number < task_table . number_of_splits ; + + current_piece_number )
{
2021-03-19 12:51:29 +00:00
dropHelpingTablesByPieceNumber ( task_table , current_piece_number ) ;
2020-03-13 16:25:07 +00:00
}
}
2020-03-18 18:35:58 +00:00
void ClusterCopier : : dropParticularPartitionPieceFromAllHelpingTables ( const TaskTable & task_table , const String & partition_name )
{
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Try drop partition partition from all helping tables. " ) ;
2020-03-18 18:35:58 +00:00
for ( size_t current_piece_number = 0 ; current_piece_number < task_table . number_of_splits ; + + current_piece_number )
{
DatabaseAndTableName original_table = task_table . table_push ;
DatabaseAndTableName helping_table = DatabaseAndTableName ( original_table . first , original_table . second + " _piece_ " + toString ( current_piece_number ) ) ;
2020-11-21 04:32:29 +00:00
String query = " ALTER TABLE " + getQuotedTable ( helping_table ) + ( ( partition_name = = " 'all' " ) ? " DROP PARTITION ID " : " DROP PARTITION " ) + partition_name ;
2020-03-18 18:35:58 +00:00
const ClusterPtr & cluster_push = task_table . cluster_push ;
Settings settings_push = task_cluster - > settings_push ;
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Execute distributed DROP PARTITION: {} " , query ) ;
2020-03-18 18:35:58 +00:00
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster (
cluster_push , query ,
2020-05-30 17:53:55 +00:00
settings_push ,
2020-03-18 18:35:58 +00:00
ClusterExecutionMode : : ON_EACH_NODE ) ;
2021-03-19 12:51:29 +00:00
LOG_INFO ( log , " DROP PARTITION query was successfully executed on {} nodes. " , toString ( num_nodes ) ) ;
2020-03-18 18:35:58 +00:00
}
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " All helping tables dropped partition {} " , partition_name ) ;
2020-03-18 18:35:58 +00:00
}
2021-07-14 13:17:30 +00:00
String ClusterCopier : : getRemoteCreateTable (
const DatabaseAndTableName & table , Connection & connection , const Settings & settings )
2020-02-19 15:01:08 +00:00
{
2021-04-10 23:33:54 +00:00
auto remote_context = Context : : createCopy ( context ) ;
remote_context - > setSettings ( settings ) ;
2020-12-10 22:05:02 +00:00
2020-02-19 15:01:08 +00:00
String query = " SHOW CREATE TABLE " + getQuotedTable ( table ) ;
2021-07-14 13:17:30 +00:00
Block block = getBlockWithAllStreamData (
2021-09-16 17:40:42 +00:00
QueryPipeline ( std : : make_shared < RemoteSource > (
std : : make_shared < RemoteQueryExecutor > ( connection , query , InterpreterShowCreateQuery : : getSampleBlock ( ) , remote_context ) , false , false ) ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
return typeid_cast < const ColumnString & > ( * block . safeGetByPosition ( 0 ) . column ) . getDataAt ( 0 ) . toString ( ) ;
}
2018-01-22 18:33:18 +00:00
2021-04-22 18:04:32 +00:00
2020-02-19 15:01:08 +00:00
ASTPtr ClusterCopier : : getCreateTableForPullShard ( const ConnectionTimeouts & timeouts , TaskShard & task_shard )
{
/// Fetch and parse (possibly) new definition
2020-03-18 03:27:32 +00:00
auto connection_entry = task_shard . info . pool - > get ( timeouts , & task_cluster - > settings_pull , true ) ;
2021-07-14 13:17:30 +00:00
String create_query_pull_str
= getRemoteCreateTable ( task_shard . task_table . table_pull , * connection_entry , task_cluster - > settings_pull ) ;
2020-02-19 15:01:08 +00:00
ParserCreateQuery parser_create_query ;
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
return parseQuery ( parser_create_query , create_query_pull_str , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-19 15:01:08 +00:00
}
2018-01-22 18:33:18 +00:00
2021-04-22 18:04:32 +00:00
2020-02-18 13:39:22 +00:00
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
2020-02-20 10:01:02 +00:00
void ClusterCopier : : createShardInternalTables ( const ConnectionTimeouts & timeouts ,
2020-02-21 16:00:50 +00:00
TaskShard & task_shard , bool create_split )
2017-10-13 19:13:41 +00:00
{
2020-02-19 15:01:08 +00:00
TaskTable & task_table = task_shard . task_table ;
2018-01-22 18:33:18 +00:00
2020-02-19 15:01:08 +00:00
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard . current_pull_table_create_query = getCreateTableForPullShard ( timeouts , task_shard ) ;
2018-01-22 18:33:18 +00:00
2020-02-19 15:01:08 +00:00
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
String read_shard_prefix = " .read_shard_ " + toString ( task_shard . indexInCluster ( ) ) + " . " ;
String split_shard_prefix = " .split. " ;
task_shard . table_read_shard = DatabaseAndTableName ( working_database_name , read_shard_prefix + task_table . table_id ) ;
2020-02-21 16:00:50 +00:00
task_shard . main_table_split_shard = DatabaseAndTableName ( working_database_name , split_shard_prefix + task_table . table_id ) ;
2021-06-15 19:55:21 +00:00
for ( const auto & piece_number : collections : : range ( 0 , task_table . number_of_splits ) )
2020-02-21 16:00:50 +00:00
{
task_shard . list_of_split_tables_on_shard [ piece_number ] =
DatabaseAndTableName ( working_database_name , split_shard_prefix + task_table . table_id + " _piece_ " + toString ( piece_number ) ) ;
}
2018-02-07 13:02:47 +00:00
2020-02-19 15:01:08 +00:00
/// Create special cluster with single shard
String shard_read_cluster_name = read_shard_prefix + task_table . cluster_pull_name ;
ClusterPtr cluster_pull_current_shard = task_table . cluster_pull - > getClusterWithSingleShard ( task_shard . indexInCluster ( ) ) ;
2021-04-10 23:33:54 +00:00
getContext ( ) - > setCluster ( shard_read_cluster_name , cluster_pull_current_shard ) ;
2018-01-22 18:33:18 +00:00
2020-02-19 15:01:08 +00:00
auto storage_shard_ast = createASTStorageDistributed ( shard_read_cluster_name , task_table . table_pull . first , task_table . table_pull . second ) ;
2018-05-14 14:12:33 +00:00
2021-04-22 22:32:16 +00:00
auto create_query_ast = removeAliasMaterializedAndTTLColumnsFromCreateQuery (
2021-04-22 23:54:57 +00:00
task_shard . current_pull_table_create_query ,
2021-04-22 22:32:16 +00:00
task_table . allow_to_copy_alias_and_materialized_columns ) ;
2018-01-22 18:33:18 +00:00
2020-02-21 16:00:50 +00:00
auto create_table_pull_ast = rewriteCreateQueryStorage ( create_query_ast , task_shard . table_read_shard , storage_shard_ast ) ;
2020-02-19 15:01:08 +00:00
dropAndCreateLocalTable ( create_table_pull_ast ) ;
2018-05-14 14:12:33 +00:00
2020-02-19 15:01:08 +00:00
if ( create_split )
2020-02-21 16:00:50 +00:00
{
auto create_table_split_piece_ast = rewriteCreateQueryStorage (
create_query_ast ,
task_shard . main_table_split_shard ,
task_table . main_engine_split_ast ) ;
2020-02-18 13:39:22 +00:00
dropAndCreateLocalTable ( create_table_split_piece_ast ) ;
2020-02-21 16:00:50 +00:00
2020-08-08 01:21:04 +00:00
/// Create auxiliary split tables for each piece
2021-06-15 19:55:21 +00:00
for ( const auto & piece_number : collections : : range ( 0 , task_table . number_of_splits ) )
2020-02-21 16:00:50 +00:00
{
const auto & storage_piece_split_ast = task_table . auxiliary_engine_split_asts [ piece_number ] ;
create_table_split_piece_ast = rewriteCreateQueryStorage (
create_query_ast ,
task_shard . list_of_split_tables_on_shard [ piece_number ] ,
storage_piece_split_ast ) ;
dropAndCreateLocalTable ( create_table_split_piece_ast ) ;
}
}
2018-01-22 18:33:18 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
std : : set < String > ClusterCopier : : getShardPartitions ( const ConnectionTimeouts & timeouts , TaskShard & task_shard )
2018-01-22 18:33:18 +00:00
{
2020-11-21 04:32:29 +00:00
std : : set < String > res ;
2020-02-19 15:01:08 +00:00
createShardInternalTables ( timeouts , task_shard , false ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
TaskTable & task_table = task_shard . task_table ;
2017-10-13 19:13:41 +00:00
2020-11-21 04:32:29 +00:00
const String & partition_name = queryToString ( task_table . engine_push_partition_key_ast ) ;
if ( partition_name = = " 'all' " )
{
res . emplace ( " 'all' " ) ;
return res ;
}
2020-02-19 15:01:08 +00:00
String query ;
{
WriteBufferFromOwnString wb ;
2020-11-21 04:32:29 +00:00
wb < < " SELECT DISTINCT " < < partition_name < < " AS partition FROM "
2020-02-19 15:01:08 +00:00
< < " " < < getQuotedTable ( task_shard . table_read_shard ) < < " ORDER BY partition DESC " ;
query = wb . str ( ) ;
}
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
ParserQuery parser_query ( query . data ( ) + query . size ( ) ) ;
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
ASTPtr query_ast = parseQuery ( parser_query , query , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-19 15:01:08 +00:00
2021-06-03 15:50:44 +00:00
LOG_INFO ( log , " Computing destination partition set, executing query: \n {} " , query ) ;
2020-02-19 15:01:08 +00:00
2021-04-10 23:33:54 +00:00
auto local_context = Context : : createCopy ( context ) ;
local_context - > setSettings ( task_cluster - > settings_pull ) ;
2021-09-16 17:40:42 +00:00
Block block = getBlockWithAllStreamData ( InterpreterFactory : : get ( query_ast , local_context ) - > execute ( ) . pipeline ) ;
2020-02-19 15:01:08 +00:00
if ( block )
{
ColumnWithTypeAndName & column = block . getByPosition ( 0 ) ;
task_shard . partition_key_column = column ;
for ( size_t i = 0 ; i < column . column - > size ( ) ; + + i )
{
WriteBufferFromOwnString wb ;
2021-03-09 14:46:52 +00:00
column . type - > getDefaultSerialization ( ) - > serializeTextQuoted ( * column . column , i , wb , FormatSettings ( ) ) ;
2020-02-19 15:01:08 +00:00
res . emplace ( wb . str ( ) ) ;
}
}
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " There are {} destination partitions in shard {} " , res . size ( ) , task_shard . getDescription ( ) ) ;
2017-10-13 19:13:41 +00:00
2020-02-19 15:01:08 +00:00
return res ;
}
2017-11-15 17:09:16 +00:00
2020-02-20 10:01:02 +00:00
bool ClusterCopier : : checkShardHasPartition ( const ConnectionTimeouts & timeouts ,
TaskShard & task_shard , const String & partition_quoted_name )
2018-01-22 18:33:18 +00:00
{
2020-02-19 15:01:08 +00:00
createShardInternalTables ( timeouts , task_shard , false ) ;
TaskTable & task_table = task_shard . task_table ;
2021-04-22 18:04:32 +00:00
WriteBufferFromOwnString ss ;
2021-09-11 21:09:34 +00:00
ss < < " WITH " + partition_quoted_name + " AS partition_key " ;
2021-04-22 18:04:32 +00:00
ss < < " SELECT 1 FROM " < < getQuotedTable ( task_shard . table_read_shard ) ;
2021-09-11 21:09:34 +00:00
ss < < " WHERE ( " < < queryToString ( task_table . engine_push_partition_key_ast ) < < " = partition_key) " ;
2020-02-19 15:01:08 +00:00
if ( ! task_table . where_condition_str . empty ( ) )
2021-04-22 18:04:32 +00:00
ss < < " AND ( " < < task_table . where_condition_str < < " ) " ;
ss < < " LIMIT 1 " ;
auto query = ss . str ( ) ;
2020-02-19 15:01:08 +00:00
ParserQuery parser_query ( query . data ( ) + query . size ( ) ) ;
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
ASTPtr query_ast = parseQuery ( parser_query , query , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-19 15:01:08 +00:00
2021-04-22 23:54:57 +00:00
LOG_INFO ( log , " Checking shard {} for partition {} existence, executing query: \n {} " ,
2021-04-22 18:04:32 +00:00
task_shard . getDescription ( ) , partition_quoted_name , query_ast - > formatForErrorMessage ( ) ) ;
2021-04-10 23:33:54 +00:00
auto local_context = Context : : createCopy ( context ) ;
local_context - > setSettings ( task_cluster - > settings_pull ) ;
2021-09-16 17:40:42 +00:00
auto pipeline = InterpreterFactory : : get ( query_ast , local_context ) - > execute ( ) . pipeline ;
PullingPipelineExecutor executor ( pipeline ) ;
Block block ;
executor . pull ( block ) ;
return block . rows ( ) ! = 0 ;
2018-01-22 18:33:18 +00:00
}
2017-10-13 19:13:41 +00:00
2020-02-18 13:39:22 +00:00
bool ClusterCopier : : checkPresentPartitionPiecesOnCurrentShard ( const ConnectionTimeouts & timeouts ,
2020-02-20 10:01:02 +00:00
TaskShard & task_shard , const String & partition_quoted_name , size_t current_piece_number )
2020-02-18 13:39:22 +00:00
{
createShardInternalTables ( timeouts , task_shard , false ) ;
TaskTable & task_table = task_shard . task_table ;
const size_t number_of_splits = task_table . number_of_splits ;
const String & primary_key_comma_separated = task_table . primary_key_comma_separated ;
2020-04-21 17:37:40 +00:00
UNUSED ( primary_key_comma_separated ) ;
2021-09-11 21:09:34 +00:00
std : : string query ;
query + = " WITH " + partition_quoted_name + " AS partition_key " ;
query + = " SELECT 1 FROM " + getQuotedTable ( task_shard . table_read_shard ) ;
2020-04-21 17:37:40 +00:00
if ( experimental_use_sample_offset )
query + = " SAMPLE 1/ " + toString ( number_of_splits ) + " OFFSET " + toString ( current_piece_number ) + " / " + toString ( number_of_splits ) ;
2021-09-11 21:09:34 +00:00
query + = " WHERE ( " + queryToString ( task_table . engine_push_partition_key_ast ) + " = partition_key) " ;
2020-04-21 17:37:40 +00:00
if ( ! experimental_use_sample_offset )
query + = " AND (cityHash64( " + primary_key_comma_separated + " ) % "
+ std : : to_string ( number_of_splits ) + " = " + std : : to_string ( current_piece_number ) + " ) " ;
2020-02-18 13:39:22 +00:00
if ( ! task_table . where_condition_str . empty ( ) )
query + = " AND ( " + task_table . where_condition_str + " ) " ;
query + = " LIMIT 1 " ;
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Checking shard {} for partition {} piece {} existence, executing query: \n \u001b [36m {} " , task_shard . getDescription ( ) , partition_quoted_name , std : : to_string ( current_piece_number ) , query ) ;
2020-02-18 13:39:22 +00:00
ParserQuery parser_query ( query . data ( ) + query . size ( ) ) ;
2021-04-10 23:33:54 +00:00
const auto & settings = getContext ( ) - > getSettingsRef ( ) ;
2020-04-15 20:28:05 +00:00
ASTPtr query_ast = parseQuery ( parser_query , query , settings . max_query_size , settings . max_parser_depth ) ;
2020-02-18 13:39:22 +00:00
2021-04-10 23:33:54 +00:00
auto local_context = Context : : createCopy ( context ) ;
local_context - > setSettings ( task_cluster - > settings_pull ) ;
2021-09-16 17:40:42 +00:00
auto pipeline = InterpreterFactory : : get ( query_ast , local_context ) - > execute ( ) . pipeline ;
PullingPipelineExecutor executor ( pipeline ) ;
Block result ;
executor . pull ( result ) ;
if ( result . rows ( ) ! = 0 )
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece number {} is PRESENT on shard {} " , partition_quoted_name , std : : to_string ( current_piece_number ) , task_shard . getDescription ( ) ) ;
2020-02-18 13:39:22 +00:00
else
2021-04-22 18:04:32 +00:00
LOG_INFO ( log , " Partition {} piece number {} is ABSENT on shard {} " , partition_quoted_name , std : : to_string ( current_piece_number ) , task_shard . getDescription ( ) ) ;
2021-09-16 17:40:42 +00:00
return result . rows ( ) ! = 0 ;
2020-02-18 13:39:22 +00:00
}
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
2020-02-18 13:39:22 +00:00
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
2020-02-19 15:01:08 +00:00
UInt64 ClusterCopier : : executeQueryOnCluster (
2020-02-18 13:39:22 +00:00
const ClusterPtr & cluster ,
const String & query ,
2020-05-30 17:53:55 +00:00
const Settings & current_settings ,
2021-04-22 18:04:32 +00:00
ClusterExecutionMode execution_mode ) const
2018-01-22 18:33:18 +00:00
{
2021-04-22 18:04:32 +00:00
ClusterPtr cluster_for_query = cluster ;
2020-03-10 20:04:08 +00:00
if ( execution_mode = = ClusterExecutionMode : : ON_EACH_NODE )
2021-04-22 18:04:32 +00:00
cluster_for_query = cluster - > getClusterWithReplicasAsShards ( current_settings ) ;
2020-02-19 15:01:08 +00:00
2021-04-22 18:04:32 +00:00
std : : vector < std : : shared_ptr < Connection > > connections ;
connections . reserve ( cluster - > getShardCount ( ) ) ;
2020-05-30 17:53:55 +00:00
2021-04-22 18:04:32 +00:00
std : : atomic < UInt64 > successfully_executed = 0 ;
2017-10-13 19:13:41 +00:00
2021-04-22 18:04:32 +00:00
for ( const auto & replicas : cluster_for_query - > getShardsAddresses ( ) )
{
2021-04-29 19:33:34 +00:00
for ( const auto & node : replicas )
2020-02-19 15:01:08 +00:00
{
2021-04-29 19:33:34 +00:00
try
{
connections . emplace_back ( std : : make_shared < Connection > (
node . host_name , node . port , node . default_database ,
node . user , node . password , node . cluster , node . cluster_secret ,
" ClusterCopier " , node . compression , node . secure
) ) ;
2021-04-22 18:04:32 +00:00
2021-04-29 19:33:34 +00:00
/// We execute only Alter, Create and Drop queries.
const auto header = Block { } ;
2021-04-22 18:04:32 +00:00
2021-04-29 19:33:34 +00:00
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std : : make_shared < RemoteQueryExecutor > (
2021-07-14 13:17:30 +00:00
* connections . back ( ) , query , header , getContext ( ) ,
/*throttler=*/ nullptr , Scalars ( ) , Tables ( ) , QueryProcessingStage : : Complete ) ;
2021-04-22 18:04:32 +00:00
2021-04-29 19:33:34 +00:00
try
{
remote_query_executor - > sendQuery ( ) ;
}
catch ( . . . )
{
2021-09-20 22:19:40 +00:00
LOG_WARNING ( log , " Node with address {} seems to be unreachable. " , node . host_name ) ;
2021-04-29 19:33:34 +00:00
continue ;
}
while ( true )
{
auto block = remote_query_executor - > read ( ) ;
if ( ! block )
break ;
}
remote_query_executor - > finish ( ) ;
+ + successfully_executed ;
break ;
2021-04-22 18:04:32 +00:00
}
catch ( . . . )
{
2021-04-29 19:33:34 +00:00
LOG_WARNING ( log , " An error occurred while processing query : \n {} " , query ) ;
tryLogCurrentException ( log ) ;
continue ;
2021-04-22 18:04:32 +00:00
}
2020-02-19 15:01:08 +00:00
}
2020-03-10 20:04:08 +00:00
}
2020-02-19 15:01:08 +00:00
2021-04-22 18:04:32 +00:00
return successfully_executed . load ( ) ;
2020-02-18 13:39:22 +00:00
}
2020-05-30 17:53:55 +00:00
2017-10-13 19:13:41 +00:00
}