2020-02-19 15:01:08 +00:00
# include "ClusterCopierApp.h"
2020-07-04 13:54:24 +00:00
# include <Common/StatusFile.h>
2020-08-25 18:15:36 +00:00
# include <Common/TerminalSize.h>
2023-02-03 10:54:49 +00:00
# include <IO/ConnectionTimeouts.h>
2020-10-29 03:39:43 +00:00
# include <Formats/registerFormats.h>
2022-04-27 15:05:45 +00:00
# include <Common/scope_guard_safe.h>
2020-08-25 18:15:36 +00:00
# include <unistd.h>
2021-05-16 22:06:09 +00:00
# include <filesystem>
2020-07-04 13:54:24 +00:00
2021-05-16 22:06:09 +00:00
namespace fs = std : : filesystem ;
2020-02-19 15:01:08 +00:00
namespace DB
{
/// ClusterCopierApp
void ClusterCopierApp : : initialize ( Poco : : Util : : Application & self )
{
is_help = config ( ) . has ( " help " ) ;
if ( is_help )
return ;
config_xml_path = config ( ) . getString ( " config-file " ) ;
task_path = config ( ) . getString ( " task-path " ) ;
2021-04-22 18:04:32 +00:00
log_level = config ( ) . getString ( " log-level " , " info " ) ;
2020-02-19 15:01:08 +00:00
is_safe_mode = config ( ) . has ( " safe-mode " ) ;
2021-04-29 19:16:51 +00:00
is_status_mode = config ( ) . has ( " status " ) ;
2020-02-19 15:01:08 +00:00
if ( config ( ) . has ( " copy-fault-probability " ) )
copy_fault_probability = std : : max ( std : : min ( config ( ) . getDouble ( " copy-fault-probability " ) , 1.0 ) , 0.0 ) ;
2020-03-16 21:05:38 +00:00
if ( config ( ) . has ( " move-fault-probability " ) )
move_fault_probability = std : : max ( std : : min ( config ( ) . getDouble ( " move-fault-probability " ) , 1.0 ) , 0.0 ) ;
2021-05-16 22:06:09 +00:00
base_dir = ( config ( ) . has ( " base-dir " ) ) ? config ( ) . getString ( " base-dir " ) : fs : : current_path ( ) . string ( ) ;
2020-04-21 17:37:40 +00:00
2021-10-09 12:31:13 +00:00
max_table_tries = std : : max < size_t > ( config ( ) . getUInt ( " max-table-tries " , 3 ) , 1 ) ;
max_shard_partition_tries = std : : max < size_t > ( config ( ) . getUInt ( " max-shard-partition-tries " , 3 ) , 1 ) ;
max_shard_partition_piece_tries_for_alter = std : : max < size_t > ( config ( ) . getUInt ( " max-shard-partition-piece-tries-for-alter " , 10 ) , 1 ) ;
retry_delay_ms = std : : chrono : : milliseconds ( std : : max < size_t > ( config ( ) . getUInt ( " retry-delay-ms " , 1000 ) , 100 ) ) ;
2020-04-21 17:37:40 +00:00
if ( config ( ) . has ( " experimental-use-sample-offset " ) )
experimental_use_sample_offset = config ( ) . getBool ( " experimental-use-sample-offset " ) ;
2020-02-19 15:01:08 +00:00
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco : : Timestamp ( ) . epochTime ( ) ;
auto curr_pid = Poco : : Process : : id ( ) ;
process_id = std : : to_string ( DateLUT : : instance ( ) . toNumYYYYMMDDhhmmss ( timestamp ) ) + " _ " + std : : to_string ( curr_pid ) ;
host_id = escapeForFileName ( getFQDNOrHostName ( ) ) + ' # ' + process_id ;
2021-05-24 16:03:09 +00:00
process_path = fs : : weakly_canonical ( fs : : path ( base_dir ) / ( " clickhouse-copier_ " + process_id ) ) ;
2021-05-16 22:06:09 +00:00
fs : : create_directories ( process_path ) ;
2020-02-19 15:01:08 +00:00
/// Override variables for BaseDaemon
if ( config ( ) . has ( " log-level " ) )
config ( ) . setString ( " logger.level " , config ( ) . getString ( " log-level " ) ) ;
if ( config ( ) . has ( " base-dir " ) | | ! config ( ) . has ( " logger.log " ) )
2021-05-16 22:06:09 +00:00
config ( ) . setString ( " logger.log " , fs : : path ( process_path ) / " log.log " ) ;
2020-02-19 15:01:08 +00:00
if ( config ( ) . has ( " base-dir " ) | | ! config ( ) . has ( " logger.errorlog " ) )
2021-05-16 22:06:09 +00:00
config ( ) . setString ( " logger.errorlog " , fs : : path ( process_path ) / " log.err.log " ) ;
2020-02-19 15:01:08 +00:00
Base : : initialize ( self ) ;
}
void ClusterCopierApp : : handleHelp ( const std : : string & , const std : : string & )
{
2020-08-25 18:15:36 +00:00
uint16_t terminal_width = 0 ;
if ( isatty ( STDIN_FILENO ) )
terminal_width = getTerminalWidth ( ) ;
2020-03-23 02:12:31 +00:00
Poco : : Util : : HelpFormatter help_formatter ( options ( ) ) ;
2020-08-25 18:15:36 +00:00
if ( terminal_width )
help_formatter . setWidth ( terminal_width ) ;
2020-03-23 02:12:31 +00:00
help_formatter . setCommand ( commandName ( ) ) ;
help_formatter . setHeader ( " Copies tables from one cluster to another " ) ;
help_formatter . setUsage ( " --config-file <config-file> --task-path <task-path> " ) ;
help_formatter . format ( std : : cerr ) ;
2020-02-19 15:01:08 +00:00
stopOptionsProcessing ( ) ;
}
void ClusterCopierApp : : defineOptions ( Poco : : Util : : OptionSet & options )
{
Base : : defineOptions ( options ) ;
options . addOption ( Poco : : Util : : Option ( " task-path " , " " , " path to task in ZooKeeper " )
2020-02-11 18:34:48 +00:00
. argument ( " task-path " ) . binding ( " task-path " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " task-file " , " " , " path to task file for uploading in ZooKeeper to task-path " )
2020-02-11 18:34:48 +00:00
. argument ( " task-file " ) . binding ( " task-file " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " task-upload-force " , " " , " Force upload task-file even node already exists " )
2020-02-11 18:34:48 +00:00
. argument ( " task-upload-force " ) . binding ( " task-upload-force " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " safe-mode " , " " , " disables ALTER DROP PARTITION in case of errors " )
2020-02-11 18:34:48 +00:00
. binding ( " safe-mode " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " copy-fault-probability " , " " , " the copying fails with specified probability (used to test partition state recovering) " )
2020-02-11 18:34:48 +00:00
. argument ( " copy-fault-probability " ) . binding ( " copy-fault-probability " ) ) ;
2020-03-16 21:05:38 +00:00
options . addOption ( Poco : : Util : : Option ( " move-fault-probability " , " " , " the moving fails with specified probability (used to test partition state recovering) " )
. argument ( " move-fault-probability " ) . binding ( " move-fault-probability " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " log-level " , " " , " sets log level " )
2020-02-11 18:34:48 +00:00
. argument ( " log-level " ) . binding ( " log-level " ) ) ;
2020-02-19 15:01:08 +00:00
options . addOption ( Poco : : Util : : Option ( " base-dir " , " " , " base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories " )
2020-02-11 18:34:48 +00:00
. argument ( " base-dir " ) . binding ( " base-dir " ) ) ;
2020-04-21 17:37:40 +00:00
options . addOption ( Poco : : Util : : Option ( " experimental-use-sample-offset " , " " , " Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k " )
. argument ( " experimental-use-sample-offset " ) . binding ( " experimental-use-sample-offset " ) ) ;
2021-04-29 19:16:51 +00:00
options . addOption ( Poco : : Util : : Option ( " status " , " " , " Get for status for current execution " ) . binding ( " status " ) ) ;
2020-02-19 15:01:08 +00:00
2021-10-09 12:31:13 +00:00
options . addOption ( Poco : : Util : : Option ( " max-table-tries " , " " , " Number of tries for the copy table task " )
. argument ( " max-table-tries " ) . binding ( " max-table-tries " ) ) ;
options . addOption ( Poco : : Util : : Option ( " max-shard-partition-tries " , " " , " Number of tries for the copy one partition task " )
. argument ( " max-shard-partition-tries " ) . binding ( " max-shard-partition-tries " ) ) ;
options . addOption ( Poco : : Util : : Option ( " max-shard-partition-piece-tries-for-alter " , " " , " Number of tries for final ALTER ATTACH to destination table " )
. argument ( " max-shard-partition-piece-tries-for-alter " ) . binding ( " max-shard-partition-piece-tries-for-alter " ) ) ;
options . addOption ( Poco : : Util : : Option ( " retry-delay-ms " , " " , " Delay between task retries " )
. argument ( " retry-delay-ms " ) . binding ( " retry-delay-ms " ) ) ;
2020-02-19 15:01:08 +00:00
using Me = std : : decay_t < decltype ( * this ) > ;
options . addOption ( Poco : : Util : : Option ( " help " , " " , " produce this help message " ) . binding ( " help " )
2020-02-11 18:34:48 +00:00
. callback ( Poco : : Util : : OptionCallback < Me > ( this , & Me : : handleHelp ) ) ) ;
2020-02-19 15:01:08 +00:00
}
void ClusterCopierApp : : mainImpl ( )
{
2021-04-29 19:16:51 +00:00
/// Status command
{
if ( is_status_mode )
{
SharedContextHolder shared_context = Context : : createShared ( ) ;
auto context = Context : : createGlobal ( shared_context . get ( ) ) ;
context - > makeGlobalContext ( ) ;
SCOPE_EXIT_SAFE ( context - > shutdown ( ) ) ;
auto zookeeper = context - > getZooKeeper ( ) ;
auto status_json = zookeeper - > get ( task_path + " /status " ) ;
LOG_INFO ( & logger ( ) , " {} " , status_json ) ;
std : : cout < < status_json < < std : : endl ;
context - > resetZooKeeper ( ) ;
return ;
}
}
2020-07-04 13:54:24 +00:00
StatusFile status_file ( process_path + " /status " , StatusFile : : write_full_info ) ;
2020-02-19 15:01:08 +00:00
ThreadStatus thread_status ;
2020-05-18 08:08:55 +00:00
auto * log = & logger ( ) ;
2020-09-17 12:15:05 +00:00
LOG_INFO ( log , " Starting clickhouse-copier (id {}, host_id {}, path {}, revision {}) " , process_id , host_id , process_path , ClickHouseRevision : : getVersionRevision ( ) ) ;
2020-02-19 15:01:08 +00:00
2020-04-17 09:47:40 +00:00
SharedContextHolder shared_context = Context : : createShared ( ) ;
2021-04-10 23:33:54 +00:00
auto context = Context : : createGlobal ( shared_context . get ( ) ) ;
2020-02-19 15:01:08 +00:00
context - > makeGlobalContext ( ) ;
2021-04-04 09:23:40 +00:00
SCOPE_EXIT_SAFE ( context - > shutdown ( ) ) ;
2020-02-19 15:01:08 +00:00
context - > setConfig ( loaded_config . configuration ) ;
context - > setApplicationType ( Context : : ApplicationType : : LOCAL ) ;
2020-07-26 16:23:08 +00:00
context - > setPath ( process_path + " / " ) ;
2020-02-19 15:01:08 +00:00
registerFunctions ( ) ;
registerAggregateFunctions ( ) ;
registerTableFunctions ( ) ;
registerStorages ( ) ;
registerDictionaries ( ) ;
2022-11-19 08:09:24 +00:00
registerDisks ( /* global_skip_access_check= */ true ) ;
2020-10-29 03:39:43 +00:00
registerFormats ( ) ;
2020-02-19 15:01:08 +00:00
static const std : : string default_database = " _local " ;
2021-04-10 23:33:54 +00:00
DatabaseCatalog : : instance ( ) . attachDatabase ( default_database , std : : make_shared < DatabaseMemory > ( default_database , context ) ) ;
2020-02-19 15:01:08 +00:00
context - > setCurrentDatabase ( default_database ) ;
2021-11-15 07:32:30 +00:00
/// Disable queries logging, since:
/// - There are bits that is not allowed for global context, like adding factories info (for the query_log)
/// - And anyway it is useless for copier.
context - > setSetting ( " log_queries " , false ) ;
auto local_context = Context : : createCopy ( context ) ;
2020-02-19 15:01:08 +00:00
/// Initialize query scope just in case.
2021-11-15 07:32:30 +00:00
CurrentThread : : QueryScope query_scope ( local_context ) ;
2020-02-19 15:01:08 +00:00
2021-11-15 07:32:30 +00:00
auto copier = std : : make_unique < ClusterCopier > (
task_path , host_id , default_database , local_context , log ) ;
2020-02-19 15:01:08 +00:00
copier - > setSafeMode ( is_safe_mode ) ;
copier - > setCopyFaultProbability ( copy_fault_probability ) ;
2020-03-16 21:05:38 +00:00
copier - > setMoveFaultProbability ( move_fault_probability ) ;
2021-10-09 12:31:13 +00:00
copier - > setMaxTableTries ( max_table_tries ) ;
copier - > setMaxShardPartitionTries ( max_shard_partition_tries ) ;
copier - > setMaxShardPartitionPieceTriesForAlter ( max_shard_partition_piece_tries_for_alter ) ;
copier - > setRetryDelayMs ( retry_delay_ms ) ;
2020-04-21 17:37:40 +00:00
copier - > setExperimentalUseSampleOffset ( experimental_use_sample_offset ) ;
2020-02-19 15:01:08 +00:00
auto task_file = config ( ) . getString ( " task-file " , " " ) ;
if ( ! task_file . empty ( ) )
copier - > uploadTaskDescription ( task_path , task_file , config ( ) . getBool ( " task-upload-force " , false ) ) ;
copier - > init ( ) ;
copier - > process ( ConnectionTimeouts : : getTCPTimeoutsWithoutFailover ( context - > getSettingsRef ( ) ) ) ;
/// Reset ZooKeeper before removing ClusterCopier.
/// Otherwise zookeeper watch can call callback which use already removed ClusterCopier object.
context - > resetZooKeeper ( ) ;
}
int ClusterCopierApp : : main ( const std : : vector < std : : string > & )
{
if ( is_help )
return 0 ;
try
{
mainImpl ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( & Poco : : Logger : : root ( ) , __PRETTY_FUNCTION__ ) ;
auto code = getCurrentExceptionCode ( ) ;
return ( code ) ? code : - 1 ;
}
return 0 ;
}
}
# pragma GCC diagnostic ignored "-Wunused-function"
# pragma GCC diagnostic ignored "-Wmissing-declarations"
int mainEntryClickHouseClusterCopier ( int argc , char * * argv )
{
try
{
DB : : ClusterCopierApp app ;
return app . run ( argc , argv ) ;
}
catch ( . . . )
{
std : : cerr < < DB : : getCurrentExceptionMessage ( true ) < < " \n " ;
auto code = DB : : getCurrentExceptionCode ( ) ;
return ( code ) ? code : - 1 ;
}
}