2017-10-13 19:13:41 +00:00
# include "ClusterCopier.h"
2018-01-11 20:51:30 +00:00
# include <chrono>
2018-01-25 12:18:27 +00:00
2017-10-13 19:13:41 +00:00
# include <Poco/Util/XMLConfiguration.h>
# include <Poco/Logger.h>
# include <Poco/ConsoleChannel.h>
# include <Poco/FormattingChannel.h>
# include <Poco/PatternFormatter.h>
# include <Poco/UUIDGenerator.h>
# include <Poco/File.h>
2017-11-14 17:45:15 +00:00
# include <Poco/Process.h>
2018-01-11 20:51:30 +00:00
# include <Poco/FileChannel.h>
# include <Poco/SplitterChannel.h>
# include <Poco/Util/HelpFormatter.h>
2018-01-25 12:18:27 +00:00
# include <boost/algorithm/string.hpp>
# include <pcg_random.hpp>
2017-10-13 19:13:41 +00:00
2018-04-03 17:37:30 +00:00
# include <common/logger_useful.h>
# include <common/ThreadPool.h>
# include <daemon/OwnPatternFormatter.h>
2017-10-13 19:13:41 +00:00
# include <Common/Exception.h>
# include <Common/ZooKeeper/ZooKeeper.h>
2018-04-03 17:37:30 +00:00
# include <Common/ZooKeeper/KeeperException.h>
2017-10-13 19:13:41 +00:00
# include <Common/getFQDNOrHostName.h>
2018-03-14 19:07:57 +00:00
# include <Common/isLocalAddress.h>
2018-04-03 17:37:30 +00:00
# include <Common/typeid_cast.h>
# include <Common/ClickHouseRevision.h>
# include <Common/formatReadable.h>
2018-04-19 13:56:14 +00:00
# include <Common/DNSResolver.h>
2018-06-20 17:49:52 +00:00
# include <Common/CurrentThread.h>
2018-04-03 17:37:30 +00:00
# include <Common/escapeForFileName.h>
2018-06-05 19:46:49 +00:00
# include <Common/getNumberOfPhysicalCPUCores.h>
2017-10-13 19:13:41 +00:00
# include <Client/Connection.h>
# include <Interpreters/Context.h>
# include <Interpreters/Cluster.h>
2018-03-07 13:52:09 +00:00
# include <Interpreters/InterpreterFactory.h>
2018-02-15 18:54:12 +00:00
# include <Interpreters/InterpreterExistsQuery.h>
# include <Interpreters/InterpreterShowCreateQuery.h>
2017-11-14 17:45:15 +00:00
# include <Interpreters/InterpreterDropQuery.h>
# include <Interpreters/InterpreterCreateQuery.h>
2017-10-13 19:13:41 +00:00
# include <Columns/ColumnString.h>
# include <Columns/ColumnsNumber.h>
2018-03-14 19:07:57 +00:00
# include <DataTypes/DataTypeString.h>
2017-10-13 19:13:41 +00:00
# include <Parsers/ParserCreateQuery.h>
# include <Parsers/parseQuery.h>
2017-11-14 17:45:15 +00:00
# include <Parsers/ParserQuery.h>
# include <Parsers/ASTCreateQuery.h>
# include <Parsers/queryToString.h>
# include <Parsers/ASTDropQuery.h>
# include <Parsers/ASTLiteral.h>
2018-02-20 21:03:38 +00:00
# include <Parsers/ASTExpressionList.h>
2018-06-10 19:22:49 +00:00
# include <Formats/FormatSettings.h>
2017-10-13 19:13:41 +00:00
# include <DataStreams/RemoteBlockInputStream.h>
# include <DataStreams/SquashingBlockInputStream.h>
2018-03-14 19:07:57 +00:00
# include <DataStreams/AsynchronousBlockInputStream.h>
2017-10-13 19:13:41 +00:00
# include <DataStreams/copyData.h>
# include <DataStreams/NullBlockOutputStream.h>
2017-11-14 17:45:15 +00:00
# include <IO/Operators.h>
2017-10-13 19:13:41 +00:00
# include <IO/ReadBufferFromString.h>
# include <Functions/registerFunctions.h>
# include <TableFunctions/registerTableFunctions.h>
# include <AggregateFunctions/registerAggregateFunctions.h>
2018-01-11 20:51:30 +00:00
# include <Storages/registerStorages.h>
2018-03-14 19:07:57 +00:00
# include <Storages/StorageDistributed.h>
# include <Databases/DatabaseMemory.h>
2018-06-05 20:09:51 +00:00
# include <Common/StatusFile.h>
2018-03-29 20:21:01 +00:00
# include <daemon/OwnPatternFormatter.h>
2017-10-13 19:13:41 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ZOOKEEPER ;
extern const int BAD_ARGUMENTS ;
extern const int UNKNOWN_TABLE ;
2017-11-09 18:06:36 +00:00
extern const int UNFINISHED ;
2018-01-25 12:18:27 +00:00
extern const int UNKNOWN_ELEMENT_IN_CONFIG ;
2017-10-13 19:13:41 +00:00
}
using ConfigurationPtr = Poco : : AutoPtr < Poco : : Util : : AbstractConfiguration > ;
2018-01-25 12:18:27 +00:00
static ConfigurationPtr getConfigurationFromXMLString ( const std : : string & xml_data )
2017-10-13 19:13:41 +00:00
{
2018-01-25 12:18:27 +00:00
std : : stringstream ss ( xml_data ) ;
Poco : : XML : : InputSource input_source { ss } ;
return { new Poco : : Util : : XMLConfiguration { & input_source } } ;
2017-10-13 19:13:41 +00:00
}
namespace
{
2018-02-20 21:03:38 +00:00
2017-10-13 19:13:41 +00:00
using DatabaseAndTableName = std : : pair < String , String > ;
2018-02-20 21:03:38 +00:00
String getDatabaseDotTable ( const String & database , const String & table )
{
return backQuoteIfNeed ( database ) + " . " + backQuoteIfNeed ( table ) ;
}
String getDatabaseDotTable ( const DatabaseAndTableName & db_and_table )
{
return getDatabaseDotTable ( db_and_table . first , db_and_table . second ) ;
}
2017-10-13 19:13:41 +00:00
enum class TaskState
{
Started = 0 ,
Finished ,
Unknown
} ;
2017-11-14 17:45:15 +00:00
/// Used to mark status of shard partition tasks
2017-10-13 19:13:41 +00:00
struct TaskStateWithOwner
{
TaskStateWithOwner ( ) = default ;
TaskStateWithOwner ( TaskState state , const String & owner ) : state ( state ) , owner ( owner ) { }
TaskState state { TaskState : : Unknown } ;
String owner ;
2017-11-09 18:06:36 +00:00
static String getData ( TaskState state , const String & owner )
{
return TaskStateWithOwner ( state , owner ) . toString ( ) ;
}
2017-10-13 19:13:41 +00:00
String toString ( )
{
WriteBufferFromOwnString wb ;
2018-01-25 12:18:27 +00:00
wb < < static_cast < UInt32 > ( state ) < < " \n " < < escape < < owner ;
2017-10-13 19:13:41 +00:00
return wb . str ( ) ;
}
static TaskStateWithOwner fromString ( const String & data )
{
ReadBufferFromString rb ( data ) ;
TaskStateWithOwner res ;
UInt32 state ;
2018-01-25 12:18:27 +00:00
rb > > state > > " \n " > > escape > > res . owner ;
2017-10-13 19:13:41 +00:00
if ( state > = static_cast < int > ( TaskState : : Unknown ) )
throw Exception ( " Unknown state " + data , ErrorCodes : : LOGICAL_ERROR ) ;
res . state = static_cast < TaskState > ( state ) ;
return res ;
}
} ;
2017-11-14 13:13:24 +00:00
/// Hierarchical description of the tasks
2018-02-20 21:03:38 +00:00
struct ShardPartition ;
2017-11-09 18:06:36 +00:00
struct TaskShard ;
struct TaskTable ;
struct TaskCluster ;
2018-02-07 13:02:47 +00:00
struct ClusterPartition ;
2017-11-09 18:06:36 +00:00
2018-03-05 00:47:25 +00:00
using TasksPartition = std : : map < String , ShardPartition , std : : greater < > > ;
2017-11-09 18:06:36 +00:00
using ShardInfo = Cluster : : ShardInfo ;
2017-11-14 17:45:15 +00:00
using TaskShardPtr = std : : shared_ptr < TaskShard > ;
using TasksShard = std : : vector < TaskShardPtr > ;
2017-11-09 18:06:36 +00:00
using TasksTable = std : : list < TaskTable > ;
2018-03-05 00:47:25 +00:00
using ClusterPartitions = std : : map < String , ClusterPartition , std : : greater < > > ;
2018-02-07 13:02:47 +00:00
2018-02-08 11:07:58 +00:00
2018-02-20 21:03:38 +00:00
/// Just destination partition of a shard
struct ShardPartition
2017-10-13 19:13:41 +00:00
{
2018-02-20 21:03:38 +00:00
ShardPartition ( TaskShard & parent , const String & name_quoted_ ) : task_shard ( parent ) , name ( name_quoted_ ) { }
2017-10-13 19:13:41 +00:00
2017-11-14 13:13:24 +00:00
String getPartitionPath ( ) const ;
2017-11-09 18:06:36 +00:00
String getCommonPartitionIsDirtyPath ( ) const ;
2017-11-14 13:13:24 +00:00
String getPartitionActiveWorkersPath ( ) const ;
2017-11-09 18:06:36 +00:00
String getActiveWorkerPath ( ) const ;
2017-11-14 13:13:24 +00:00
String getPartitionShardsPath ( ) const ;
2017-11-09 18:06:36 +00:00
String getShardStatusPath ( ) const ;
2017-10-13 19:13:41 +00:00
TaskShard & task_shard ;
String name ;
} ;
2017-11-14 13:13:24 +00:00
struct ShardPriority
{
UInt8 is_remote = 1 ;
size_t hostname_difference = 0 ;
UInt8 random = 0 ;
2018-01-25 12:18:27 +00:00
static bool greaterPriority ( const ShardPriority & current , const ShardPriority & other )
2017-11-14 13:13:24 +00:00
{
2018-01-25 12:18:27 +00:00
return std : : forward_as_tuple ( current . is_remote , current . hostname_difference , current . random )
< std : : forward_as_tuple ( other . is_remote , other . hostname_difference , other . random ) ;
2017-11-14 13:13:24 +00:00
}
} ;
2017-10-13 19:13:41 +00:00
struct TaskShard
{
TaskShard ( TaskTable & parent , const ShardInfo & info_ ) : task_table ( parent ) , info ( info_ ) { }
TaskTable & task_table ;
ShardInfo info ;
2017-11-14 13:13:24 +00:00
UInt32 numberInCluster ( ) const { return info . shard_num ; }
UInt32 indexInCluster ( ) const { return info . shard_num - 1 ; }
2017-10-13 19:13:41 +00:00
2018-02-20 21:03:38 +00:00
String getDescription ( ) const ;
2018-03-01 18:33:27 +00:00
String getHostNameExample ( ) const ;
2017-11-14 13:13:24 +00:00
2018-03-05 00:47:25 +00:00
/// Used to sort clusters by their proximity
2017-11-14 13:13:24 +00:00
ShardPriority priority ;
2018-02-20 21:03:38 +00:00
/// Column with unique destination partitions (computed from engine_push_partition_key expr.) in the shard
ColumnWithTypeAndName partition_key_column ;
/// There is a task for each destination partition
TasksPartition partition_tasks ;
2018-03-05 00:47:25 +00:00
/// Which partitions have been checked for existence
/// If some partition from this lists is exists, it is in partition_tasks
std : : set < String > checked_partitions ;
2018-02-20 21:03:38 +00:00
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query ;
/// Internal distributed tables
DatabaseAndTableName table_read_shard ;
DatabaseAndTableName table_split_shard ;
} ;
2018-03-05 00:47:25 +00:00
/// Contains info about all shards that contain a partition
2018-02-20 21:03:38 +00:00
struct ClusterPartition
{
2018-03-05 00:47:25 +00:00
double elapsed_time_seconds = 0 ;
2018-02-20 21:03:38 +00:00
UInt64 bytes_copied = 0 ;
UInt64 rows_copied = 0 ;
2018-03-11 18:36:09 +00:00
UInt64 blocks_copied = 0 ;
2018-02-20 21:03:38 +00:00
size_t total_tries = 0 ;
2017-10-13 19:13:41 +00:00
} ;
2018-02-20 21:03:38 +00:00
2017-10-13 19:13:41 +00:00
struct TaskTable
{
TaskTable ( TaskCluster & parent , const Poco : : Util : : AbstractConfiguration & config , const String & prefix ,
const String & table_key ) ;
TaskCluster & task_cluster ;
2017-11-14 13:13:24 +00:00
String getPartitionPath ( const String & partition_name ) const ;
String getPartitionIsDirtyPath ( const String & partition_name ) const ;
2017-10-13 19:13:41 +00:00
String name_in_config ;
2018-02-07 13:02:47 +00:00
/// Used as task ID
String table_id ;
2017-10-13 19:13:41 +00:00
/// Source cluster and table
String cluster_pull_name ;
DatabaseAndTableName table_pull ;
/// Destination cluster and table
String cluster_push_name ;
DatabaseAndTableName table_push ;
/// Storage of destination table
String engine_push_str ;
ASTPtr engine_push_ast ;
2018-02-20 21:03:38 +00:00
ASTPtr engine_push_partition_key_ast ;
2017-10-13 19:13:41 +00:00
2018-03-05 00:47:25 +00:00
/// A Distributed table definition used to split data
2017-10-13 19:13:41 +00:00
String sharding_key_str ;
ASTPtr sharding_key_ast ;
2017-11-09 18:06:36 +00:00
ASTPtr engine_split_ast ;
2017-10-13 19:13:41 +00:00
/// Additional WHERE expression to filter input data
String where_condition_str ;
ASTPtr where_condition_ast ;
/// Resolved clusters
ClusterPtr cluster_pull ;
ClusterPtr cluster_push ;
2018-01-11 12:23:59 +00:00
/// Filter partitions that should be copied
bool has_enabled_partitions = false ;
2018-02-20 21:03:38 +00:00
Strings enabled_partitions ;
NameSet enabled_partitions_set ;
2018-01-11 12:23:59 +00:00
2017-10-13 19:13:41 +00:00
/// Prioritized list of shards
2017-11-14 17:45:15 +00:00
TasksShard all_shards ;
TasksShard local_shards ;
2017-10-13 19:13:41 +00:00
2018-02-07 13:02:47 +00:00
ClusterPartitions cluster_partitions ;
2018-02-08 11:07:58 +00:00
NameSet finished_cluster_partitions ;
2018-03-02 12:28:00 +00:00
/// Parition names to process in user-specified order
Strings ordered_partition_names ;
2018-02-07 13:02:47 +00:00
ClusterPartition & getClusterPartition ( const String & partition_name )
{
auto it = cluster_partitions . find ( partition_name ) ;
if ( it = = cluster_partitions . end ( ) )
throw Exception ( " There are no cluster partition " + partition_name + " in " + table_id , ErrorCodes : : LOGICAL_ERROR ) ;
return it - > second ;
}
Stopwatch watch ;
UInt64 bytes_copied = 0 ;
UInt64 rows_copied = 0 ;
2017-11-09 18:06:36 +00:00
2018-01-25 12:18:27 +00:00
template < typename RandomEngine >
void initShards ( RandomEngine & & random_engine ) ;
2017-10-13 19:13:41 +00:00
} ;
2018-02-20 21:03:38 +00:00
2017-10-13 19:13:41 +00:00
struct TaskCluster
{
2018-02-13 18:42:59 +00:00
TaskCluster ( const String & task_zookeeper_path_ , const String & default_local_database_ )
: task_zookeeper_path ( task_zookeeper_path_ ) , default_local_database ( default_local_database_ ) { }
void loadTasks ( const Poco : : Util : : AbstractConfiguration & config , const String & base_key = " " ) ;
2018-02-20 21:03:38 +00:00
/// Set (or update) settings and max_workers param
2018-02-13 18:42:59 +00:00
void reloadSettings ( const Poco : : Util : : AbstractConfiguration & config , const String & base_key = " " ) ;
2017-10-13 19:13:41 +00:00
/// Base node for all tasks. Its structure:
/// workers/ - directory with active workers (amount of them is less or equal max_workers)
/// description - node with task configuration
/// table_table1/ - directories with per-partition copying status
String task_zookeeper_path ;
2018-02-13 18:42:59 +00:00
/// Database used to create temporary Distributed tables
String default_local_database ;
2017-10-13 19:13:41 +00:00
/// Limits number of simultaneous workers
size_t max_workers = 0 ;
2018-02-07 13:02:47 +00:00
/// Base settings for pull and push
Settings settings_common ;
2017-10-13 19:13:41 +00:00
/// Settings used to fetch data
Settings settings_pull ;
/// Settings used to insert data
Settings settings_push ;
2018-02-13 18:42:59 +00:00
String clusters_prefix ;
2017-10-13 19:13:41 +00:00
/// Subtasks
TasksTable table_tasks ;
2018-01-25 12:18:27 +00:00
std : : random_device random_device ;
pcg64 random_engine ;
2017-10-13 19:13:41 +00:00
} ;
2018-03-25 00:15:52 +00:00
struct MultiTransactionInfo
{
int32_t code ;
2018-08-25 01:58:14 +00:00
Coordination : : Requests requests ;
Coordination : : Responses responses ;
2018-03-25 00:15:52 +00:00
} ;
2017-11-09 18:06:36 +00:00
/// Atomically checks that is_dirty node is not exists, and made the remaining op
/// Returns relative number of failed operation in the second field (the passed op has 0 index)
2018-03-25 00:15:52 +00:00
static MultiTransactionInfo checkNoNodeAndCommit (
2017-11-09 18:06:36 +00:00
const zkutil : : ZooKeeperPtr & zookeeper ,
const String & checking_node_path ,
2018-08-25 01:58:14 +00:00
Coordination : : RequestPtr & & op )
2017-11-09 18:06:36 +00:00
{
2018-03-25 00:15:52 +00:00
MultiTransactionInfo info ;
info . requests . emplace_back ( zkutil : : makeCreateRequest ( checking_node_path , " " , zkutil : : CreateMode : : Persistent ) ) ;
info . requests . emplace_back ( zkutil : : makeRemoveRequest ( checking_node_path , - 1 ) ) ;
info . requests . emplace_back ( std : : move ( op ) ) ;
info . code = zookeeper - > tryMulti ( info . requests , info . responses ) ;
2018-03-13 20:36:22 +00:00
return info ;
2017-11-09 18:06:36 +00:00
}
// Creates AST representing 'ENGINE = Distributed(cluster, db, table, [sharding_key])
2017-10-13 19:13:41 +00:00
std : : shared_ptr < ASTStorage > createASTStorageDistributed (
const String & cluster_name , const String & database , const String & table , const ASTPtr & sharding_key_ast = nullptr )
{
auto args = std : : make_shared < ASTExpressionList > ( ) ;
2018-02-26 03:37:08 +00:00
args - > children . emplace_back ( std : : make_shared < ASTLiteral > ( cluster_name ) ) ;
args - > children . emplace_back ( std : : make_shared < ASTIdentifier > ( database ) ) ;
args - > children . emplace_back ( std : : make_shared < ASTIdentifier > ( table ) ) ;
2017-10-13 19:13:41 +00:00
if ( sharding_key_ast )
args - > children . emplace_back ( sharding_key_ast ) ;
auto engine = std : : make_shared < ASTFunction > ( ) ;
engine - > name = " Distributed " ;
engine - > arguments = args ;
auto storage = std : : make_shared < ASTStorage > ( ) ;
storage - > set ( storage - > engine , engine ) ;
return storage ;
}
BlockInputStreamPtr squashStreamIntoOneBlock ( const BlockInputStreamPtr & stream )
{
return std : : make_shared < SquashingBlockInputStream > (
stream ,
std : : numeric_limits < size_t > : : max ( ) ,
std : : numeric_limits < size_t > : : max ( )
) ;
}
Block getBlockWithAllStreamData ( const BlockInputStreamPtr & stream )
{
return squashStreamIntoOneBlock ( stream ) - > read ( ) ;
}
2018-02-20 21:03:38 +00:00
/// Path getters
2017-10-13 19:13:41 +00:00
2017-11-14 13:13:24 +00:00
String TaskTable : : getPartitionPath ( const String & partition_name ) const
2017-10-13 19:13:41 +00:00
{
2018-02-20 21:03:38 +00:00
return task_cluster . task_zookeeper_path // root
+ " /tables/ " + table_id // tables/dst_cluster.merge.hits
+ " / " + escapeForFileName ( partition_name ) ; // 201701
2017-11-14 13:13:24 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getPartitionPath ( ) const
2017-11-14 13:13:24 +00:00
{
return task_shard . task_table . getPartitionPath ( name ) ;
2017-10-13 19:13:41 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getShardStatusPath ( ) const
2017-11-09 18:06:36 +00:00
{
2017-11-14 13:13:24 +00:00
// /root/table_test.hits/201701/1
return getPartitionPath ( ) + " /shards/ " + toString ( task_shard . numberInCluster ( ) ) ;
2017-11-09 18:06:36 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getPartitionShardsPath ( ) const
2017-11-09 18:06:36 +00:00
{
2017-11-14 13:13:24 +00:00
return getPartitionPath ( ) + " /shards " ;
2017-11-09 18:06:36 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getPartitionActiveWorkersPath ( ) const
2017-10-13 19:13:41 +00:00
{
2018-01-25 12:18:27 +00:00
return getPartitionPath ( ) + " /partition_active_workers " ;
2017-10-13 19:13:41 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getActiveWorkerPath ( ) const
2017-11-09 18:06:36 +00:00
{
2018-01-09 19:12:43 +00:00
return getPartitionActiveWorkersPath ( ) + " / " + toString ( task_shard . numberInCluster ( ) ) ;
2017-11-09 18:06:36 +00:00
}
2018-02-20 21:03:38 +00:00
String ShardPartition : : getCommonPartitionIsDirtyPath ( ) const
2017-11-09 18:06:36 +00:00
{
2017-11-14 13:13:24 +00:00
return getPartitionPath ( ) + " /is_dirty " ;
}
String TaskTable : : getPartitionIsDirtyPath ( const String & partition_name ) const
{
return getPartitionPath ( partition_name ) + " /is_dirty " ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
2018-02-20 21:03:38 +00:00
String DB : : TaskShard : : getDescription ( ) const
{
2018-03-01 18:33:27 +00:00
std : : stringstream ss ;
ss < < " N " < < numberInCluster ( )
2018-03-02 12:28:00 +00:00
< < " (having a replica " < < getHostNameExample ( )
< < " , pull table " + getDatabaseDotTable ( task_table . table_pull )
2018-03-01 18:33:27 +00:00
< < " of cluster " + task_table . cluster_pull_name < < " ) " ;
return ss . str ( ) ;
2018-02-20 21:03:38 +00:00
}
2018-03-01 18:33:27 +00:00
String DB : : TaskShard : : getHostNameExample ( ) const
{
auto & replicas = task_table . cluster_pull - > getShardsAddresses ( ) . at ( indexInCluster ( ) ) ;
return replicas . at ( 0 ) . readableString ( ) ;
}
2018-02-20 21:03:38 +00:00
static bool isExtedndedDefinitionStorage ( const ASTPtr & storage_ast )
{
const ASTStorage & storage = typeid_cast < const ASTStorage & > ( * storage_ast ) ;
return storage . partition_by | | storage . order_by | | storage . sample_by ;
}
static ASTPtr extractPartitionKey ( const ASTPtr & storage_ast )
{
String storage_str = queryToString ( storage_ast ) ;
const ASTStorage & storage = typeid_cast < const ASTStorage & > ( * storage_ast ) ;
const ASTFunction & engine = typeid_cast < const ASTFunction & > ( * storage . engine ) ;
if ( ! endsWith ( engine . name , " MergeTree " ) )
{
throw Exception ( " Unsupported engine was specified in " + storage_str + " , only *MergeTree engines are supported " ,
ErrorCodes : : BAD_ARGUMENTS ) ;
}
ASTPtr arguments_ast = engine . arguments - > clone ( ) ;
ASTs & arguments = typeid_cast < ASTExpressionList & > ( * arguments_ast ) . children ;
if ( isExtedndedDefinitionStorage ( storage_ast ) )
{
if ( storage . partition_by )
return storage . partition_by - > clone ( ) ;
static const char * all = " all " ;
2018-02-26 03:37:08 +00:00
return std : : make_shared < ASTLiteral > ( Field ( all , strlen ( all ) ) ) ;
2018-02-20 21:03:38 +00:00
}
else
{
bool is_replicated = startsWith ( engine . name , " Replicated " ) ;
size_t min_args = is_replicated ? 3 : 1 ;
if ( arguments . size ( ) < min_args )
throw Exception ( " Expected at least " + toString ( min_args ) + " arguments in " + storage_str , ErrorCodes : : BAD_ARGUMENTS ) ;
ASTPtr & month_arg = is_replicated ? arguments [ 2 ] : arguments [ 1 ] ;
return makeASTFunction ( " toYYYYMM " , month_arg - > clone ( ) ) ;
}
}
2017-11-14 13:13:24 +00:00
2017-10-13 19:13:41 +00:00
TaskTable : : TaskTable ( TaskCluster & parent , const Poco : : Util : : AbstractConfiguration & config , const String & prefix_ ,
const String & table_key )
: task_cluster ( parent )
{
String table_prefix = prefix_ + " . " + table_key + " . " ;
name_in_config = table_key ;
cluster_pull_name = config . getString ( table_prefix + " cluster_pull " ) ;
cluster_push_name = config . getString ( table_prefix + " cluster_push " ) ;
table_pull . first = config . getString ( table_prefix + " database_pull " ) ;
table_pull . second = config . getString ( table_prefix + " table_pull " ) ;
table_push . first = config . getString ( table_prefix + " database_push " ) ;
table_push . second = config . getString ( table_prefix + " table_push " ) ;
2018-02-07 13:02:47 +00:00
/// Used as node name in ZooKeeper
table_id = escapeForFileName ( cluster_push_name )
+ " . " + escapeForFileName ( table_push . first )
+ " . " + escapeForFileName ( table_push . second ) ;
2017-10-13 19:13:41 +00:00
engine_push_str = config . getString ( table_prefix + " engine " ) ;
{
ParserStorage parser_storage ;
2018-04-16 15:11:13 +00:00
engine_push_ast = parseQuery ( parser_storage , engine_push_str , 0 ) ;
2018-02-20 21:03:38 +00:00
engine_push_partition_key_ast = extractPartitionKey ( engine_push_ast ) ;
2017-10-13 19:13:41 +00:00
}
sharding_key_str = config . getString ( table_prefix + " sharding_key " ) ;
{
ParserExpressionWithOptionalAlias parser_expression ( false ) ;
2018-04-16 15:11:13 +00:00
sharding_key_ast = parseQuery ( parser_expression , sharding_key_str , 0 ) ;
2017-11-09 18:06:36 +00:00
engine_split_ast = createASTStorageDistributed ( cluster_push_name , table_push . first , table_push . second , sharding_key_ast ) ;
2017-10-13 19:13:41 +00:00
}
where_condition_str = config . getString ( table_prefix + " where_condition " , " " ) ;
if ( ! where_condition_str . empty ( ) )
{
ParserExpressionWithOptionalAlias parser_expression ( false ) ;
2018-04-16 15:11:13 +00:00
where_condition_ast = parseQuery ( parser_expression , where_condition_str , 0 ) ;
2017-10-13 19:13:41 +00:00
// Will use canonical expression form
where_condition_str = queryToString ( where_condition_ast ) ;
}
2018-01-11 12:23:59 +00:00
2018-01-25 12:18:27 +00:00
String enabled_partitions_prefix = table_prefix + " enabled_partitions " ;
has_enabled_partitions = config . has ( enabled_partitions_prefix ) ;
2018-01-11 12:23:59 +00:00
if ( has_enabled_partitions )
{
2018-01-25 12:18:27 +00:00
Strings keys ;
config . keys ( enabled_partitions_prefix , keys ) ;
if ( keys . empty ( ) )
{
/// Parse list of partition from space-separated string
String partitions_str = config . getString ( table_prefix + " enabled_partitions " ) ;
boost : : trim_if ( partitions_str , isWhitespaceASCII ) ;
2018-02-20 21:03:38 +00:00
boost : : split ( enabled_partitions , partitions_str , isWhitespaceASCII , boost : : token_compress_on ) ;
2018-01-25 12:18:27 +00:00
}
else
{
/// Parse sequence of <partition>...</partition>
for ( const String & key : keys )
{
if ( ! startsWith ( key , " partition " ) )
throw Exception ( " Unknown key " + key + " in " + enabled_partitions_prefix , ErrorCodes : : UNKNOWN_ELEMENT_IN_CONFIG ) ;
2018-02-20 21:03:38 +00:00
enabled_partitions . emplace_back ( config . getString ( enabled_partitions_prefix + " . " + key ) ) ;
2018-01-25 12:18:27 +00:00
}
}
2018-02-20 21:03:38 +00:00
std : : copy ( enabled_partitions . begin ( ) , enabled_partitions . end ( ) , std : : inserter ( enabled_partitions_set , enabled_partitions_set . begin ( ) ) ) ;
2018-01-11 12:23:59 +00:00
}
2017-10-13 19:13:41 +00:00
}
2017-11-14 17:45:15 +00:00
static ShardPriority getReplicasPriority ( const Cluster : : Addresses & replicas , const std : : string & local_hostname , UInt8 random )
2017-11-14 13:13:24 +00:00
{
ShardPriority res ;
if ( replicas . empty ( ) )
return res ;
res . is_remote = 1 ;
for ( auto & replica : replicas )
{
2018-04-19 13:56:14 +00:00
if ( isLocalAddress ( DNSResolver : : instance ( ) . resolveHost ( replica . host_name ) ) )
2017-11-14 13:13:24 +00:00
{
res . is_remote = 0 ;
break ;
}
}
res . hostname_difference = std : : numeric_limits < size_t > : : max ( ) ;
for ( auto & replica : replicas )
{
size_t difference = getHostNameDifference ( local_hostname , replica . host_name ) ;
res . hostname_difference = std : : min ( difference , res . hostname_difference ) ;
}
res . random = random ;
return res ;
}
2018-01-25 12:18:27 +00:00
template < typename RandomEngine >
void TaskTable : : initShards ( RandomEngine & & random_engine )
2017-10-13 19:13:41 +00:00
{
2017-11-14 13:13:24 +00:00
const String & fqdn_name = getFQDNOrHostName ( ) ;
2018-01-25 12:18:27 +00:00
std : : uniform_int_distribution < UInt8 > get_urand ( 0 , std : : numeric_limits < UInt8 > : : max ( ) ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 13:13:24 +00:00
// Compute the priority
for ( auto & shard_info : cluster_pull - > getShardsInfo ( ) )
2017-10-13 19:13:41 +00:00
{
2017-11-14 17:45:15 +00:00
TaskShardPtr task_shard = std : : make_shared < TaskShard > ( * this , shard_info ) ;
const auto & replicas = cluster_pull - > getShardsAddresses ( ) . at ( task_shard - > indexInCluster ( ) ) ;
2018-01-25 12:18:27 +00:00
task_shard - > priority = getReplicasPriority ( replicas , fqdn_name , get_urand ( random_engine ) ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 17:45:15 +00:00
all_shards . emplace_back ( task_shard ) ;
2017-11-14 13:13:24 +00:00
}
2017-10-13 19:13:41 +00:00
2017-11-14 13:13:24 +00:00
// Sort by priority
2017-11-14 17:45:15 +00:00
std : : sort ( all_shards . begin ( ) , all_shards . end ( ) ,
[ ] ( const TaskShardPtr & lhs , const TaskShardPtr & rhs )
{
2018-01-25 12:18:27 +00:00
return ShardPriority : : greaterPriority ( lhs - > priority , rhs - > priority ) ;
2017-11-14 17:45:15 +00:00
} ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 17:45:15 +00:00
// Cut local shards
auto it_first_remote = std : : lower_bound ( all_shards . begin ( ) , all_shards . end ( ) , 1 ,
[ ] ( const TaskShardPtr & lhs , UInt8 is_remote )
{
return lhs - > priority . is_remote < is_remote ;
} ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 17:45:15 +00:00
local_shards . assign ( all_shards . begin ( ) , it_first_remote ) ;
2017-10-13 19:13:41 +00:00
}
2018-02-13 18:42:59 +00:00
void DB : : TaskCluster : : loadTasks ( const Poco : : Util : : AbstractConfiguration & config , const String & base_key )
2017-10-13 19:13:41 +00:00
{
String prefix = base_key . empty ( ) ? " " : base_key + " . " ;
2018-02-13 18:42:59 +00:00
clusters_prefix = prefix + " remote_servers " ;
if ( ! config . has ( clusters_prefix ) )
throw Exception ( " You should specify list of clusters in " + clusters_prefix , ErrorCodes : : BAD_ARGUMENTS ) ;
Poco : : Util : : AbstractConfiguration : : Keys tables_keys ;
config . keys ( prefix + " tables " , tables_keys ) ;
for ( const auto & table_key : tables_keys )
{
table_tasks . emplace_back ( * this , config , prefix + " tables " , table_key ) ;
}
}
2017-10-13 19:13:41 +00:00
2018-02-13 18:42:59 +00:00
void DB : : TaskCluster : : reloadSettings ( const Poco : : Util : : AbstractConfiguration & config , const String & base_key )
{
String prefix = base_key . empty ( ) ? " " : base_key + " . " ;
2017-10-13 19:13:41 +00:00
max_workers = config . getUInt64 ( prefix + " max_workers " ) ;
2018-02-13 18:42:59 +00:00
settings_common = Settings ( ) ;
2017-10-13 19:13:41 +00:00
if ( config . has ( prefix + " settings " ) )
2018-02-07 13:02:47 +00:00
settings_common . loadSettingsFromConfig ( prefix + " settings " , config ) ;
2017-10-13 19:13:41 +00:00
2018-02-13 18:42:59 +00:00
settings_pull = settings_common ;
2017-10-13 19:13:41 +00:00
if ( config . has ( prefix + " settings_pull " ) )
settings_pull . loadSettingsFromConfig ( prefix + " settings_pull " , config ) ;
2018-02-13 18:42:59 +00:00
settings_push = settings_common ;
2017-10-13 19:13:41 +00:00
if ( config . has ( prefix + " settings_push " ) )
settings_push . loadSettingsFromConfig ( prefix + " settings_push " , config ) ;
2018-03-30 16:25:26 +00:00
auto set_default_value = [ ] ( auto & & setting , auto & & default_value )
{
setting = setting . changed ? setting . value : default_value ;
} ;
2018-02-13 18:42:59 +00:00
/// Override important settings
2018-03-11 00:15:26 +00:00
settings_pull . readonly = 1 ;
2018-02-13 18:42:59 +00:00
settings_push . insert_distributed_sync = 1 ;
2018-03-30 16:25:26 +00:00
set_default_value ( settings_pull . load_balancing , LoadBalancing : : NEAREST_HOSTNAME ) ;
set_default_value ( settings_pull . max_threads , 1 ) ;
set_default_value ( settings_pull . max_block_size , 8192UL ) ;
set_default_value ( settings_pull . preferred_block_size_bytes , 0 ) ;
set_default_value ( settings_push . insert_distributed_timeout , 0 ) ;
2017-10-13 19:13:41 +00:00
}
2018-02-13 18:42:59 +00:00
2017-10-13 19:13:41 +00:00
} // end of an anonymous namespace
class ClusterCopier
{
public :
2018-04-03 19:43:33 +00:00
ClusterCopier ( const String & task_path_ ,
2017-10-13 19:13:41 +00:00
const String & host_id_ ,
const String & proxy_database_name_ ,
Context & context_ )
:
task_zookeeper_path ( task_path_ ) ,
host_id ( host_id_ ) ,
working_database_name ( proxy_database_name_ ) ,
context ( context_ ) ,
log ( & Poco : : Logger : : get ( " ClusterCopier " ) )
{
}
void init ( )
{
2018-04-03 19:43:33 +00:00
auto zookeeper = context . getZooKeeper ( ) ;
2017-10-13 19:13:41 +00:00
2018-08-25 01:58:14 +00:00
task_description_watch_callback = [ this ] ( const Coordination : : WatchResponse & )
2018-02-13 18:42:59 +00:00
{
UInt64 version = + + task_descprtion_version ;
LOG_DEBUG ( log , " Task description should be updated, local version " < < version ) ;
} ;
2017-10-13 19:13:41 +00:00
2018-02-13 18:42:59 +00:00
task_description_path = task_zookeeper_path + " /description " ;
task_cluster = std : : make_unique < TaskCluster > ( task_zookeeper_path , working_database_name ) ;
2017-10-13 19:13:41 +00:00
2018-02-13 18:42:59 +00:00
reloadTaskDescription ( ) ;
task_cluster_initial_config = task_cluster_current_config ;
2017-11-09 18:06:36 +00:00
2018-02-13 18:42:59 +00:00
task_cluster - > loadTasks ( * task_cluster_initial_config ) ;
context . setClustersConfig ( task_cluster_initial_config , task_cluster - > clusters_prefix ) ;
2017-10-13 19:13:41 +00:00
/// Set up shards and their priority
2018-01-25 12:18:27 +00:00
task_cluster - > random_engine . seed ( task_cluster - > random_device ( ) ) ;
2017-10-13 19:13:41 +00:00
for ( auto & task_table : task_cluster - > table_tasks )
{
task_table . cluster_pull = context . getCluster ( task_table . cluster_pull_name ) ;
task_table . cluster_push = context . getCluster ( task_table . cluster_push_name ) ;
2018-01-25 12:18:27 +00:00
task_table . initShards ( task_cluster - > random_engine ) ;
2017-10-13 19:13:41 +00:00
}
2018-03-01 18:33:27 +00:00
LOG_DEBUG ( log , " Will process " < < task_cluster - > table_tasks . size ( ) < < " table tasks " ) ;
2017-10-13 19:13:41 +00:00
2018-03-01 18:33:27 +00:00
/// Do not initialize tables, will make deferred initialization in process()
2018-02-20 21:03:38 +00:00
2018-04-03 19:43:33 +00:00
zookeeper - > createAncestors ( getWorkersPathVersion ( ) + " / " ) ;
zookeeper - > createAncestors ( getWorkersPath ( ) + " / " ) ;
2018-03-01 18:33:27 +00:00
}
2018-03-02 12:28:00 +00:00
template < typename T >
2018-03-05 00:47:25 +00:00
decltype ( auto ) retry ( T & & func , size_t max_tries = 100 )
2018-03-01 18:33:27 +00:00
{
2018-03-02 12:28:00 +00:00
std : : exception_ptr exception ;
for ( size_t try_number = 1 ; try_number < = max_tries ; + + try_number )
{
try
{
return func ( ) ;
}
catch ( . . . )
2017-10-13 19:13:41 +00:00
{
2018-03-02 12:28:00 +00:00
exception = std : : current_exception ( ) ;
if ( try_number < max_tries )
2017-10-13 19:13:41 +00:00
{
2018-03-02 12:28:00 +00:00
tryLogCurrentException ( log , " Will retry " ) ;
std : : this_thread : : sleep_for ( default_sleep_time ) ;
2017-10-13 19:13:41 +00:00
}
2018-03-02 12:28:00 +00:00
}
}
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
std : : rethrow_exception ( exception ) ;
2018-06-03 20:39:06 +00:00
}
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
void discoverShardPartitions ( const TaskShardPtr & task_shard )
{
TaskTable & task_table = task_shard - > task_table ;
2017-10-13 19:13:41 +00:00
2018-03-05 00:47:25 +00:00
LOG_INFO ( log , " Discover partitions of shard " < < task_shard - > getDescription ( ) ) ;
2018-02-20 21:03:38 +00:00
2018-03-02 12:28:00 +00:00
auto get_partitions = [ & ] ( ) { return getShardPartitions ( * task_shard ) ; } ;
auto existing_partitions_names = retry ( get_partitions , 60 ) ;
Strings filtered_partitions_names ;
Strings missing_partitions ;
2018-02-20 21:03:38 +00:00
2018-03-01 18:33:27 +00:00
/// Check that user specified correct partition names
auto check_partition_format = [ ] ( const DataTypePtr & type , const String & partition_text_quoted )
2017-10-13 19:13:41 +00:00
{
2018-03-01 18:33:27 +00:00
MutableColumnPtr column_dummy = type - > createColumn ( ) ;
ReadBufferFromString rb ( partition_text_quoted ) ;
2018-02-20 21:03:38 +00:00
2018-03-01 18:33:27 +00:00
try
{
2018-06-08 02:24:27 +00:00
type - > deserializeTextQuoted ( * column_dummy , rb , FormatSettings ( ) ) ;
2018-03-01 18:33:27 +00:00
}
catch ( Exception & e )
{
throw Exception ( " Partition " + partition_text_quoted + " has incorrect format. " + e . displayText ( ) , ErrorCodes : : BAD_ARGUMENTS ) ;
}
} ;
2018-01-11 12:23:59 +00:00
2018-03-02 12:28:00 +00:00
if ( task_table . has_enabled_partitions )
2018-03-01 18:33:27 +00:00
{
2018-03-02 12:28:00 +00:00
/// Process partition in order specified by <enabled_partitions/>
for ( const String & partition_name : task_table . enabled_partitions )
2017-10-13 19:13:41 +00:00
{
2018-03-02 12:28:00 +00:00
/// Check that user specified correct partition names
check_partition_format ( task_shard - > partition_key_column . type , partition_name ) ;
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
auto it = existing_partitions_names . find ( partition_name ) ;
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
/// Do not process partition if it is not in enabled_partitions list
if ( it = = existing_partitions_names . end ( ) )
2018-02-20 21:03:38 +00:00
{
2018-03-02 12:28:00 +00:00
missing_partitions . emplace_back ( partition_name ) ;
continue ;
2018-02-20 21:03:38 +00:00
}
2018-03-02 12:28:00 +00:00
filtered_partitions_names . emplace_back ( * it ) ;
2018-03-01 18:33:27 +00:00
}
2018-02-07 13:02:47 +00:00
2018-03-02 12:28:00 +00:00
for ( const String & partition_name : existing_partitions_names )
2018-03-01 18:33:27 +00:00
{
2018-03-02 12:28:00 +00:00
if ( ! task_table . enabled_partitions_set . count ( partition_name ) )
{
LOG_DEBUG ( log , " Partition " < < partition_name < < " will not be processed, since it is not in "
< < " enabled_partitions of " < < task_table . table_id ) ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
}
}
2018-03-02 12:28:00 +00:00
else
{
for ( const String & partition_name : existing_partitions_names )
filtered_partitions_names . emplace_back ( partition_name ) ;
}
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
for ( const String & partition_name : filtered_partitions_names )
2018-03-05 00:47:25 +00:00
{
2018-03-02 12:28:00 +00:00
task_shard - > partition_tasks . emplace ( partition_name , ShardPartition ( * task_shard , partition_name ) ) ;
2018-03-05 00:47:25 +00:00
task_shard - > checked_partitions . emplace ( partition_name , true ) ;
}
2017-10-13 19:13:41 +00:00
2018-03-02 12:28:00 +00:00
if ( ! missing_partitions . empty ( ) )
{
std : : stringstream ss ;
for ( const String & missing_partition : missing_partitions )
ss < < " " < < missing_partition ;
2018-03-01 18:33:27 +00:00
2018-03-02 12:28:00 +00:00
LOG_WARNING ( log , " There are no " < < missing_partitions . size ( ) < < " partitions from enabled_partitions in shard "
< < task_shard - > getDescription ( ) < < " : " < < ss . str ( ) ) ;
}
LOG_DEBUG ( log , " Will copy " < < task_shard - > partition_tasks . size ( ) < < " partitions from shard " < < task_shard - > getDescription ( ) ) ;
}
2018-03-01 18:33:27 +00:00
2018-03-02 12:28:00 +00:00
/// Compute set of partitions, assume set of partitions aren't changed during the processing
void discoverTablePartitions ( TaskTable & task_table , size_t num_threads = 0 )
{
/// Fetch partitions list from a shard
2018-03-01 18:33:27 +00:00
{
2018-03-02 12:28:00 +00:00
ThreadPool thread_pool ( num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores ( ) ) ;
2018-03-01 18:33:27 +00:00
for ( const TaskShardPtr & task_shard : task_table . all_shards )
2018-03-02 12:28:00 +00:00
thread_pool . schedule ( [ this , task_shard ] ( ) { discoverShardPartitions ( task_shard ) ; } ) ;
2018-03-01 18:33:27 +00:00
LOG_DEBUG ( log , " Waiting for " < < thread_pool . active ( ) < < " setup jobs " ) ;
thread_pool . wait ( ) ;
2017-10-13 19:13:41 +00:00
}
}
2018-02-13 18:42:59 +00:00
void reloadTaskDescription ( )
{
2018-04-03 19:43:33 +00:00
auto zookeeper = context . getZooKeeper ( ) ;
2018-03-06 14:36:40 +00:00
task_description_watch_zookeeper = zookeeper ;
2018-02-13 18:42:59 +00:00
String task_config_str ;
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2018-02-13 18:42:59 +00:00
int code ;
2018-03-06 14:36:40 +00:00
zookeeper - > tryGetWatch ( task_description_path , task_config_str , & stat , task_description_watch_callback , & code ) ;
2018-03-24 00:45:04 +00:00
if ( code )
2018-02-13 18:42:59 +00:00
throw Exception ( " Can't get description node " + task_description_path , ErrorCodes : : BAD_ARGUMENTS ) ;
LOG_DEBUG ( log , " Loading description, zxid= " < < task_descprtion_current_stat . czxid ) ;
auto config = getConfigurationFromXMLString ( task_config_str ) ;
/// Setup settings
task_cluster - > reloadSettings ( * config ) ;
context . getSettingsRef ( ) = task_cluster - > settings_common ;
task_cluster_current_config = config ;
task_descprtion_current_stat = stat ;
}
void updateConfigIfNeeded ( )
{
UInt64 version_to_update = task_descprtion_version ;
2018-03-06 14:36:40 +00:00
bool is_outdated_version = task_descprtion_current_version ! = version_to_update ;
bool is_expired_session = ! task_description_watch_zookeeper | | task_description_watch_zookeeper - > expired ( ) ;
if ( ! is_outdated_version & & ! is_expired_session )
2018-02-13 18:42:59 +00:00
return ;
2018-02-14 23:01:34 +00:00
LOG_DEBUG ( log , " Updating task description " ) ;
reloadTaskDescription ( ) ;
2018-02-13 18:42:59 +00:00
task_descprtion_current_version = version_to_update ;
}
2018-02-08 11:07:58 +00:00
void process ( )
2017-10-13 19:13:41 +00:00
{
2018-02-08 11:07:58 +00:00
for ( TaskTable & task_table : task_cluster - > table_tasks )
2017-10-13 19:13:41 +00:00
{
2018-03-02 12:28:00 +00:00
LOG_INFO ( log , " Process table task " < < task_table . table_id < < " with "
< < task_table . all_shards . size ( ) < < " shards, " < < task_table . local_shards . size ( ) < < " of them are local ones " ) ;
2017-11-09 18:06:36 +00:00
2018-02-08 11:07:58 +00:00
if ( task_table . all_shards . empty ( ) )
continue ;
2018-03-02 12:28:00 +00:00
/// Discover partitions of each shard and total set of partitions
if ( ! task_table . has_enabled_partitions )
2017-11-14 13:13:24 +00:00
{
2018-03-02 12:28:00 +00:00
/// If there are no specified enabled_partitions, we must discover them manually
discoverTablePartitions ( task_table ) ;
2017-11-14 13:13:24 +00:00
2018-03-02 12:28:00 +00:00
/// After partitions of each shard are initialized, initialize cluster partitions
for ( const TaskShardPtr & task_shard : task_table . all_shards )
2017-11-14 13:13:24 +00:00
{
2018-03-05 00:47:25 +00:00
for ( const auto & partition_elem : task_shard - > partition_tasks )
2018-03-02 12:28:00 +00:00
{
2018-03-05 00:47:25 +00:00
const String & partition_name = partition_elem . first ;
task_table . cluster_partitions . emplace ( partition_name , ClusterPartition { } ) ;
2018-03-02 12:28:00 +00:00
}
2018-02-08 11:07:58 +00:00
}
2017-11-14 13:13:24 +00:00
2018-03-02 12:28:00 +00:00
for ( auto & partition_elem : task_table . cluster_partitions )
2018-02-08 11:07:58 +00:00
{
2018-03-02 12:28:00 +00:00
const String & partition_name = partition_elem . first ;
2018-02-07 13:02:47 +00:00
2018-03-05 00:47:25 +00:00
for ( const TaskShardPtr & task_shard : task_table . all_shards )
task_shard - > checked_partitions . emplace ( partition_name ) ;
2018-02-07 13:02:47 +00:00
2018-03-02 12:28:00 +00:00
task_table . ordered_partition_names . emplace_back ( partition_name ) ;
2018-02-07 13:02:47 +00:00
}
2018-03-02 12:28:00 +00:00
}
else
{
/// If enabled_partitions are specified, assume that each shard has all partitions
/// We will refine partition set of each shard in future
2018-02-08 11:07:58 +00:00
2018-03-02 12:28:00 +00:00
for ( const String & partition_name : task_table . enabled_partitions )
2018-02-07 13:02:47 +00:00
{
2018-03-02 12:28:00 +00:00
task_table . cluster_partitions . emplace ( partition_name , ClusterPartition { } ) ;
task_table . ordered_partition_names . emplace_back ( partition_name ) ;
2018-02-07 13:02:47 +00:00
}
2017-11-14 13:13:24 +00:00
}
2018-02-08 11:07:58 +00:00
task_table . watch . restart ( ) ;
/// Retry table processing
2018-03-06 14:36:40 +00:00
bool table_is_done = false ;
for ( size_t num_table_tries = 0 ; num_table_tries < max_table_tries ; + + num_table_tries )
2018-02-08 11:07:58 +00:00
{
2018-03-06 14:36:40 +00:00
if ( tryProcessTable ( task_table ) )
{
table_is_done = true ;
break ;
}
2018-02-08 11:07:58 +00:00
}
if ( ! table_is_done )
{
throw Exception ( " Too many tries to process table " + task_table . table_id + " . Abort remaining execution " ,
ErrorCodes : : UNFINISHED ) ;
}
}
2017-11-14 13:13:24 +00:00
}
2018-01-09 19:12:43 +00:00
/// Disables DROP PARTITION commands that used to clear data after errors
void setSafeMode ( bool is_safe_mode_ = true )
{
is_safe_mode = is_safe_mode_ ;
}
void setCopyFaultProbability ( double copy_fault_probability_ )
{
copy_fault_probability = copy_fault_probability_ ;
}
2018-03-05 00:47:25 +00:00
protected :
String getWorkersPath ( ) const
{
return task_cluster - > task_zookeeper_path + " /task_active_workers " ;
}
2018-03-14 20:19:25 +00:00
String getWorkersPathVersion ( ) const
{
return getWorkersPath ( ) + " _version " ;
}
2018-03-05 00:47:25 +00:00
String getCurrentWorkerNodePath ( ) const
{
return getWorkersPath ( ) + " / " + host_id ;
}
zkutil : : EphemeralNodeHolder : : Ptr createTaskWorkerNodeAndWaitIfNeed ( const zkutil : : ZooKeeperPtr & zookeeper ,
2018-03-14 20:19:25 +00:00
const String & description , bool unprioritized )
2018-03-05 00:47:25 +00:00
{
2018-03-14 20:19:25 +00:00
std : : chrono : : milliseconds current_sleep_time = default_sleep_time ;
static constexpr std : : chrono : : milliseconds max_sleep_time ( 30000 ) ; // 30 sec
if ( unprioritized )
std : : this_thread : : sleep_for ( current_sleep_time ) ;
String workers_version_path = getWorkersPathVersion ( ) ;
String workers_path = getWorkersPath ( ) ;
String current_worker_path = getCurrentWorkerNodePath ( ) ;
2018-04-09 17:25:37 +00:00
size_t num_bad_version_errors = 0 ;
2018-03-05 00:47:25 +00:00
while ( true )
{
2018-04-09 17:25:37 +00:00
updateConfigIfNeeded ( ) ;
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2018-03-14 20:19:25 +00:00
zookeeper - > get ( workers_version_path , & stat ) ;
auto version = stat . version ;
zookeeper - > get ( workers_path , & stat ) ;
2018-03-05 00:47:25 +00:00
if ( static_cast < size_t > ( stat . numChildren ) > = task_cluster - > max_workers )
{
LOG_DEBUG ( log , " Too many workers ( " < < stat . numChildren < < " , maximum " < < task_cluster - > max_workers < < " ) "
< < " . Postpone processing " < < description ) ;
2018-04-09 17:25:37 +00:00
if ( unprioritized )
current_sleep_time = std : : min ( max_sleep_time , current_sleep_time + default_sleep_time ) ;
std : : this_thread : : sleep_for ( current_sleep_time ) ;
num_bad_version_errors = 0 ;
2018-03-05 00:47:25 +00:00
}
else
{
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
ops . emplace_back ( zkutil : : makeSetRequest ( workers_version_path , description , version ) ) ;
ops . emplace_back ( zkutil : : makeCreateRequest ( current_worker_path , description , zkutil : : CreateMode : : Ephemeral ) ) ;
2018-08-25 01:58:14 +00:00
Coordination : : Responses responses ;
2018-03-25 00:15:52 +00:00
auto code = zookeeper - > tryMulti ( ops , responses ) ;
2018-03-14 20:19:25 +00:00
2018-08-25 01:58:14 +00:00
if ( code = = Coordination : : ZOK | | code = = Coordination : : ZNODEEXISTS )
2018-03-14 20:19:25 +00:00
return std : : make_shared < zkutil : : EphemeralNodeHolder > ( current_worker_path , * zookeeper , false , false , description ) ;
2018-08-25 01:58:14 +00:00
if ( code = = Coordination : : ZBADVERSION )
2018-03-14 20:19:25 +00:00
{
2018-04-09 17:25:37 +00:00
+ + num_bad_version_errors ;
/// Try to make fast retries
if ( num_bad_version_errors > 3 )
{
LOG_DEBUG ( log , " A concurrent worker has just been added, will check free worker slots again " ) ;
2018-05-08 12:56:32 +00:00
std : : chrono : : milliseconds random_sleep_time ( std : : uniform_int_distribution < int > ( 1 , 1000 ) ( task_cluster - > random_engine ) ) ;
std : : this_thread : : sleep_for ( random_sleep_time ) ;
num_bad_version_errors = 0 ;
2018-04-09 17:25:37 +00:00
}
2018-03-14 20:19:25 +00:00
}
else
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code ) ;
2018-03-05 00:47:25 +00:00
}
}
}
2017-11-14 13:13:24 +00:00
/** Checks that the whole partition of a table was copied. We should do it carefully due to dirty lock.
* State of some task could be changed during the processing .
* We have to ensure that all shards have the finished state and there are no dirty flag .
* Moreover , we have to check status twice and check zxid , because state could be changed during the checking .
*/
2017-11-14 17:45:15 +00:00
bool checkPartitionIsDone ( const TaskTable & task_table , const String & partition_name , const TasksShard & shards_with_partition )
2017-11-14 13:13:24 +00:00
{
LOG_DEBUG ( log , " Check that all shards processed partition " < < partition_name < < " successfully " ) ;
2018-04-03 19:43:33 +00:00
auto zookeeper = context . getZooKeeper ( ) ;
2017-11-14 13:13:24 +00:00
Strings status_paths ;
for ( auto & shard : shards_with_partition )
{
2018-02-20 21:03:38 +00:00
ShardPartition & task_shard_partition = shard - > partition_tasks . find ( partition_name ) - > second ;
2017-11-14 13:13:24 +00:00
status_paths . emplace_back ( task_shard_partition . getShardStatusPath ( ) ) ;
}
std : : vector < int64_t > zxid1 , zxid2 ;
try
{
2018-05-08 12:56:32 +00:00
std : : vector < zkutil : : ZooKeeper : : FutureGet > get_futures ;
2017-11-14 13:13:24 +00:00
for ( const String & path : status_paths )
2018-03-30 16:25:26 +00:00
get_futures . emplace_back ( zookeeper - > asyncGet ( path ) ) ;
// Check that state is Finished and remember zxid
for ( auto & future : get_futures )
2017-10-13 19:13:41 +00:00
{
2018-03-30 16:25:26 +00:00
auto res = future . get ( ) ;
2018-05-08 12:56:32 +00:00
TaskStateWithOwner status = TaskStateWithOwner : : fromString ( res . data ) ;
2017-11-14 13:13:24 +00:00
if ( status . state ! = TaskState : : Finished )
2017-10-13 19:13:41 +00:00
{
2018-05-08 12:56:32 +00:00
LOG_INFO ( log , " The task " < < res . data < < " is being rewritten by " < < status . owner < < " . Partition will be rechecked " ) ;
2017-11-14 13:13:24 +00:00
return false ;
2017-10-13 19:13:41 +00:00
}
2018-03-30 16:25:26 +00:00
zxid1 . push_back ( res . stat . pzxid ) ;
2017-11-14 13:13:24 +00:00
}
// Check that partition is not dirty
if ( zookeeper - > exists ( task_table . getPartitionIsDirtyPath ( partition_name ) ) )
{
LOG_INFO ( log , " Partition " < < partition_name < < " become dirty " ) ;
return false ;
}
2018-03-30 16:25:26 +00:00
get_futures . clear ( ) ;
for ( const String & path : status_paths )
get_futures . emplace_back ( zookeeper - > asyncGet ( path ) ) ;
2017-11-14 13:13:24 +00:00
// Remember zxid of states again
2018-03-30 16:25:26 +00:00
for ( auto & future : get_futures )
2017-11-14 13:13:24 +00:00
{
2018-03-30 16:25:26 +00:00
auto res = future . get ( ) ;
zxid2 . push_back ( res . stat . pzxid ) ;
2017-11-14 13:13:24 +00:00
}
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & e )
2017-11-14 13:13:24 +00:00
{
LOG_INFO ( log , " A ZooKeeper error occurred while checking partition " < < partition_name
2018-03-26 18:39:28 +00:00
< < " . Will recheck the partition. Error: " < < e . displayText ( ) ) ;
2017-11-14 13:13:24 +00:00
return false ;
}
// If all task is finished and zxid is not changed then partition could not become dirty again
for ( size_t shard_num = 0 ; shard_num < status_paths . size ( ) ; + + shard_num )
{
if ( zxid1 [ shard_num ] ! = zxid2 [ shard_num ] )
{
LOG_INFO ( log , " The task " < < status_paths [ shard_num ] < < " is being modified now. Partition will be rechecked " ) ;
return false ;
2017-10-13 19:13:41 +00:00
}
}
2017-11-14 13:13:24 +00:00
LOG_INFO ( log , " Partition " < < partition_name < < " is copied successfully " ) ;
return true ;
2017-10-13 19:13:41 +00:00
}
2018-02-20 21:03:38 +00:00
/// Removes MATERIALIZED and ALIAS columns from create table query
2018-02-07 13:02:47 +00:00
static ASTPtr removeAliasColumnsFromCreateQuery ( const ASTPtr & query_ast )
{
const ASTs & column_asts = typeid_cast < ASTCreateQuery & > ( * query_ast ) . columns - > children ;
auto new_columns = std : : make_shared < ASTExpressionList > ( ) ;
for ( const ASTPtr & column_ast : column_asts )
{
const ASTColumnDeclaration & column = typeid_cast < const ASTColumnDeclaration & > ( * column_ast ) ;
if ( ! column . default_specifier . empty ( ) )
{
2018-03-12 13:47:01 +00:00
ColumnDefaultKind kind = columnDefaultKindFromString ( column . default_specifier ) ;
if ( kind = = ColumnDefaultKind : : Materialized | | kind = = ColumnDefaultKind : : Alias )
2018-02-07 13:02:47 +00:00
continue ;
}
new_columns - > children . emplace_back ( column_ast - > clone ( ) ) ;
}
ASTPtr new_query_ast = query_ast - > clone ( ) ;
ASTCreateQuery & new_query = typeid_cast < ASTCreateQuery & > ( * new_query_ast ) ;
new_query . columns = new_columns . get ( ) ;
new_query . children . at ( 0 ) = std : : move ( new_columns ) ;
return new_query_ast ;
}
2018-02-20 21:03:38 +00:00
/// Replaces ENGINE and table name in a create query
2018-02-07 13:02:47 +00:00
std : : shared_ptr < ASTCreateQuery > rewriteCreateQueryStorage ( const ASTPtr & create_query_ast , const DatabaseAndTableName & new_table , const ASTPtr & new_storage_ast )
2017-10-13 19:13:41 +00:00
{
2018-02-07 13:02:47 +00:00
ASTCreateQuery & create = typeid_cast < ASTCreateQuery & > ( * create_query_ast ) ;
2017-10-13 19:13:41 +00:00
auto res = std : : make_shared < ASTCreateQuery > ( create ) ;
if ( create . storage = = nullptr | | new_storage_ast = = nullptr )
throw Exception ( " Storage is not specified " , ErrorCodes : : LOGICAL_ERROR ) ;
res - > database = new_table . first ;
res - > table = new_table . second ;
res - > children . clear ( ) ;
res - > set ( res - > columns , create . columns - > clone ( ) ) ;
res - > set ( res - > storage , new_storage_ast - > clone ( ) ) ;
return res ;
}
2018-02-20 21:03:38 +00:00
bool tryDropPartition ( ShardPartition & task_partition , const zkutil : : ZooKeeperPtr & zookeeper )
2017-11-09 18:06:36 +00:00
{
2018-01-09 19:12:43 +00:00
if ( is_safe_mode )
throw Exception ( " DROP PARTITION is prohibited in safe mode " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2017-11-09 18:06:36 +00:00
TaskTable & task_table = task_partition . task_shard . task_table ;
2017-11-14 13:13:24 +00:00
String current_shards_path = task_partition . getPartitionShardsPath ( ) ;
2018-01-09 19:12:43 +00:00
String current_partition_active_workers_dir = task_partition . getPartitionActiveWorkersPath ( ) ;
2017-11-09 18:06:36 +00:00
String is_dirty_flag_path = task_partition . getCommonPartitionIsDirtyPath ( ) ;
String dirt_cleaner_path = is_dirty_flag_path + " /cleaner " ;
zkutil : : EphemeralNodeHolder : : Ptr cleaner_holder ;
try
{
cleaner_holder = zkutil : : EphemeralNodeHolder : : create ( dirt_cleaner_path , * zookeeper , host_id ) ;
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & e )
2017-11-09 18:06:36 +00:00
{
2018-08-25 01:58:14 +00:00
if ( e . code = = Coordination : : ZNODEEXISTS )
2017-11-09 18:06:36 +00:00
{
LOG_DEBUG ( log , " Partition " < < task_partition . name < < " is cleaning now by somebody, sleep " ) ;
std : : this_thread : : sleep_for ( default_sleep_time ) ;
return false ;
}
throw ;
}
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2018-01-09 19:12:43 +00:00
if ( zookeeper - > exists ( current_partition_active_workers_dir , & stat ) )
2017-11-09 18:06:36 +00:00
{
if ( stat . numChildren ! = 0 )
{
LOG_DEBUG ( log , " Partition " < < task_partition . name < < " contains " < < stat . numChildren < < " active workers, sleep " ) ;
std : : this_thread : : sleep_for ( default_sleep_time ) ;
return false ;
}
}
/// Remove all status nodes
zookeeper - > tryRemoveRecursive ( current_shards_path ) ;
2017-11-14 16:59:45 +00:00
String query = " ALTER TABLE " + getDatabaseDotTable ( task_table . table_push ) ;
query + = " DROP PARTITION " + task_partition . name + " " ;
/// TODO: use this statement after servers will be updated up to 1.1.54310
// query += " DROP PARTITION ID '" + task_partition.name + "'";
2017-11-09 18:06:36 +00:00
ClusterPtr & cluster_push = task_table . cluster_push ;
2018-01-09 19:12:43 +00:00
Settings settings_push = task_cluster - > settings_push ;
/// It is important, DROP PARTITION must be done synchronously
settings_push . replication_alter_partitions_sync = 2 ;
2017-11-09 18:06:36 +00:00
LOG_DEBUG ( log , " Execute distributed DROP PARTITION: " < < query ) ;
/// Limit number of max executing replicas to 1
2018-02-07 13:02:47 +00:00
size_t num_shards = executeQueryOnCluster ( cluster_push , query , nullptr , & settings_push , PoolMode : : GET_ONE , 1 ) ;
2017-11-09 18:06:36 +00:00
if ( num_shards < cluster_push - > getShardCount ( ) )
{
LOG_INFO ( log , " DROP PARTITION wasn't successfully executed on " < < cluster_push - > getShardCount ( ) - num_shards < < " shards " ) ;
return false ;
}
/// Remove the locking node
2018-08-25 01:58:14 +00:00
Coordination : : Requests requests ;
2018-04-01 17:37:09 +00:00
requests . emplace_back ( zkutil : : makeRemoveRequest ( dirt_cleaner_path , - 1 ) ) ;
requests . emplace_back ( zkutil : : makeRemoveRequest ( is_dirty_flag_path , - 1 ) ) ;
zookeeper - > multi ( requests ) ;
2017-11-14 16:59:45 +00:00
LOG_INFO ( log , " Partition " < < task_partition . name < < " was dropped on cluster " < < task_table . cluster_push_name ) ;
2017-11-09 18:06:36 +00:00
return true ;
}
2018-02-13 18:42:59 +00:00
2018-03-05 00:47:25 +00:00
static constexpr size_t max_table_tries = 1000 ;
static constexpr size_t max_shard_partition_tries = 600 ;
bool tryProcessTable ( TaskTable & task_table )
2017-10-13 19:13:41 +00:00
{
2018-04-09 17:25:37 +00:00
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false ;
2018-03-05 00:47:25 +00:00
/// Process each partition that is present in cluster
for ( const String & partition_name : task_table . ordered_partition_names )
{
if ( ! task_table . cluster_partitions . count ( partition_name ) )
throw Exception ( " There are no expected partition " + partition_name + " . It is a bug " , ErrorCodes : : LOGICAL_ERROR ) ;
ClusterPartition & cluster_partition = task_table . cluster_partitions [ partition_name ] ;
Stopwatch watch ;
TasksShard expected_shards ;
size_t num_failed_shards = 0 ;
+ + cluster_partition . total_tries ;
LOG_DEBUG ( log , " Processing partition " < < partition_name < < " for the whole cluster " ) ;
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
for ( const TaskShardPtr & shard : task_table . all_shards )
{
/// Does shard have a node with current partition?
if ( shard - > partition_tasks . count ( partition_name ) = = 0 )
{
/// If not, did we check existence of that partition previously?
if ( shard - > checked_partitions . count ( partition_name ) = = 0 )
{
auto check_shard_has_partition = [ & ] ( ) { return checkShardHasPartition ( * shard , partition_name ) ; } ;
bool has_partition = retry ( check_shard_has_partition ) ;
shard - > checked_partitions . emplace ( partition_name ) ;
if ( has_partition )
{
shard - > partition_tasks . emplace ( partition_name , ShardPartition ( * shard , partition_name ) ) ;
LOG_DEBUG ( log , " Discovered partition " < < partition_name < < " in shard " < < shard - > getDescription ( ) ) ;
}
else
{
LOG_DEBUG ( log , " Found that shard " < < shard - > getDescription ( ) < < " does not contain current partition " < < partition_name ) ;
continue ;
}
}
else
{
/// We have already checked that partition, but did not discover it
2018-04-09 17:25:37 +00:00
previous_shard_is_instantly_finished = true ;
2018-03-05 00:47:25 +00:00
continue ;
}
}
auto it_shard_partition = shard - > partition_tasks . find ( partition_name ) ;
if ( it_shard_partition = = shard - > partition_tasks . end ( ) )
throw Exception ( " There are no such partition in a shard. This is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
auto & partition = it_shard_partition - > second ;
expected_shards . emplace_back ( shard ) ;
2018-03-26 18:39:28 +00:00
/// Do not sleep if there is a sequence of already processed shards to increase startup
2018-05-08 12:56:32 +00:00
bool is_unprioritized_task = ! previous_shard_is_instantly_finished & & shard - > priority . is_remote ;
2018-03-05 00:47:25 +00:00
PartitionTaskStatus task_status = PartitionTaskStatus : : Error ;
2018-03-26 18:39:28 +00:00
bool was_error = false ;
2018-03-05 00:47:25 +00:00
for ( size_t try_num = 0 ; try_num < max_shard_partition_tries ; + + try_num )
{
2018-05-08 12:56:32 +00:00
task_status = tryProcessPartitionTask ( partition , is_unprioritized_task ) ;
2018-03-05 00:47:25 +00:00
/// Exit if success
if ( task_status = = PartitionTaskStatus : : Finished )
break ;
2018-03-26 18:39:28 +00:00
was_error = true ;
2018-03-05 00:47:25 +00:00
/// Skip if the task is being processed by someone
if ( task_status = = PartitionTaskStatus : : Active )
break ;
/// Repeat on errors
std : : this_thread : : sleep_for ( default_sleep_time ) ;
}
if ( task_status = = PartitionTaskStatus : : Error )
+ + num_failed_shards ;
2018-03-26 18:39:28 +00:00
previous_shard_is_instantly_finished = ! was_error ;
2018-03-05 00:47:25 +00:00
}
cluster_partition . elapsed_time_seconds + = watch . elapsedSeconds ( ) ;
/// Check that whole cluster partition is done
/// Firstly check number failed partition tasks, than look into ZooKeeper and ensure that each partition is done
bool partition_is_done = num_failed_shards = = 0 ;
try
{
partition_is_done = partition_is_done & & checkPartitionIsDone ( task_table , partition_name , expected_shards ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
partition_is_done = false ;
}
if ( partition_is_done )
{
task_table . finished_cluster_partitions . emplace ( partition_name ) ;
task_table . bytes_copied + = cluster_partition . bytes_copied ;
task_table . rows_copied + = cluster_partition . rows_copied ;
double elapsed = cluster_partition . elapsed_time_seconds ;
LOG_INFO ( log , " It took " < < std : : fixed < < std : : setprecision ( 2 ) < < elapsed < < " seconds to copy partition " < < partition_name
2018-03-11 18:36:09 +00:00
< < " : " < < formatReadableSizeWithDecimalSuffix ( cluster_partition . bytes_copied ) < < " uncompressed bytes "
< < " , " < < formatReadableQuantity ( cluster_partition . rows_copied ) < < " rows "
< < " and " < < cluster_partition . blocks_copied < < " source blocks are copied " ) ;
2018-03-05 00:47:25 +00:00
if ( cluster_partition . rows_copied )
{
LOG_INFO ( log , " Average partition speed: "
< < formatReadableSizeWithDecimalSuffix ( cluster_partition . bytes_copied / elapsed ) < < " per second. " ) ;
}
if ( task_table . rows_copied )
{
LOG_INFO ( log , " Average table " < < task_table . table_id < < " speed: "
2018-03-11 18:36:09 +00:00
< < formatReadableSizeWithDecimalSuffix ( task_table . bytes_copied / elapsed ) < < " per second. " ) ;
2018-03-05 00:47:25 +00:00
}
}
}
size_t required_partitions = task_table . cluster_partitions . size ( ) ;
size_t finished_partitions = task_table . finished_cluster_partitions . size ( ) ;
bool table_is_done = finished_partitions > = required_partitions ;
if ( ! table_is_done )
{
LOG_INFO ( log , " Table " + task_table . table_id + " is not processed yet. "
< < " Copied " < < finished_partitions < < " of " < < required_partitions < < " , will retry " ) ;
}
return table_is_done ;
}
/// Execution status of a task
enum class PartitionTaskStatus
{
Active ,
Finished ,
Error ,
} ;
2018-05-08 12:56:32 +00:00
PartitionTaskStatus tryProcessPartitionTask ( ShardPartition & task_partition , bool is_unprioritized_task )
2017-10-13 19:13:41 +00:00
{
2018-03-05 00:47:25 +00:00
PartitionTaskStatus res ;
2018-02-13 18:42:59 +00:00
2017-11-09 18:06:36 +00:00
try
{
2018-05-08 12:56:32 +00:00
res = processPartitionTaskImpl ( task_partition , is_unprioritized_task ) ;
2017-11-09 18:06:36 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred while processing partition " + task_partition . name ) ;
2018-03-05 00:47:25 +00:00
res = PartitionTaskStatus : : Error ;
2017-11-09 18:06:36 +00:00
}
2018-02-13 18:42:59 +00:00
/// At the end of each task check if the config is updated
try
{
updateConfigIfNeeded ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " An error occurred while updating the config " ) ;
}
return res ;
2017-11-09 18:06:36 +00:00
}
2018-05-08 12:56:32 +00:00
PartitionTaskStatus processPartitionTaskImpl ( ShardPartition & task_partition , bool is_unprioritized_task )
2017-11-09 18:06:36 +00:00
{
TaskShard & task_shard = task_partition . task_shard ;
TaskTable & task_table = task_shard . task_table ;
2018-02-07 13:02:47 +00:00
ClusterPartition & cluster_partition = task_table . getClusterPartition ( task_partition . name ) ;
2017-11-09 18:06:36 +00:00
2018-04-03 19:43:33 +00:00
auto zookeeper = context . getZooKeeper ( ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
String is_dirty_flag_path = task_partition . getCommonPartitionIsDirtyPath ( ) ;
String current_task_is_active_path = task_partition . getActiveWorkerPath ( ) ;
String current_task_status_path = task_partition . getShardStatusPath ( ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 16:59:45 +00:00
/// Auxiliary functions:
/// Creates is_dirty node to initialize DROP PARTITION
auto create_is_dirty_node = [ & ] ( )
{
auto code = zookeeper - > tryCreate ( is_dirty_flag_path , current_task_status_path , zkutil : : CreateMode : : Persistent ) ;
2018-08-25 01:58:14 +00:00
if ( code & & code ! = Coordination : : ZNODEEXISTS )
throw Coordination : : Exception ( code , is_dirty_flag_path ) ;
2017-11-14 16:59:45 +00:00
} ;
/// Returns SELECT query filtering current partition and applying user filter
2018-01-09 19:12:43 +00:00
auto get_select_query = [ & ] ( const DatabaseAndTableName & from_table , const String & fields , String limit = " " )
2017-11-14 16:59:45 +00:00
{
String query ;
query + = " SELECT " + fields + " FROM " + getDatabaseDotTable ( from_table ) ;
2018-03-11 18:36:09 +00:00
/// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
2018-09-21 10:46:58 +00:00
query + = " WHERE ( " + queryToString ( task_table . engine_push_partition_key_ast ) + " = ( " + task_partition . name + " AS partition_key)) " ;
2017-11-14 16:59:45 +00:00
if ( ! task_table . where_condition_str . empty ( ) )
query + = " AND ( " + task_table . where_condition_str + " ) " ;
2018-01-09 19:12:43 +00:00
if ( ! limit . empty ( ) )
query + = " LIMIT " + limit ;
2017-11-14 16:59:45 +00:00
ParserQuery p_query ( query . data ( ) + query . size ( ) ) ;
2018-04-16 15:11:13 +00:00
return parseQuery ( p_query , query , 0 ) ;
2017-11-14 16:59:45 +00:00
} ;
2017-10-13 19:13:41 +00:00
/// Load balancing
2018-05-08 12:56:32 +00:00
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed ( zookeeper , current_task_status_path , is_unprioritized_task ) ;
2017-11-09 18:06:36 +00:00
LOG_DEBUG ( log , " Processing " < < current_task_status_path ) ;
/// Do not start if partition is dirty, try to clean it
if ( zookeeper - > exists ( is_dirty_flag_path ) )
{
LOG_DEBUG ( log , " Partition " < < task_partition . name < < " is dirty, try to drop it " ) ;
try
{
tryDropPartition ( task_partition , zookeeper ) ;
}
catch ( . . . )
{
2018-05-08 12:56:32 +00:00
tryLogCurrentException ( log , " An error occurred when clean partition " ) ;
2017-11-09 18:06:36 +00:00
}
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
2018-01-09 19:12:43 +00:00
/// Create ephemeral node to mark that we are active and process the partition
2017-11-09 18:06:36 +00:00
zookeeper - > createAncestors ( current_task_is_active_path ) ;
2017-10-13 19:13:41 +00:00
zkutil : : EphemeralNodeHolderPtr partition_task_node_holder ;
try
{
2017-11-09 18:06:36 +00:00
partition_task_node_holder = zkutil : : EphemeralNodeHolder : : create ( current_task_is_active_path , * zookeeper , host_id ) ;
2017-10-13 19:13:41 +00:00
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & e )
2017-10-13 19:13:41 +00:00
{
2018-08-25 01:58:14 +00:00
if ( e . code = = Coordination : : ZNODEEXISTS )
2017-10-13 19:13:41 +00:00
{
2017-11-09 18:06:36 +00:00
LOG_DEBUG ( log , " Someone is already processing " < < current_task_is_active_path ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Active ;
2017-10-13 19:13:41 +00:00
}
throw ;
}
2017-11-09 18:06:36 +00:00
/// Exit if task has been already processed, create blocking node if it is abandoned
{
String status_data ;
if ( zookeeper - > tryGet ( current_task_status_path , status_data ) )
{
TaskStateWithOwner status = TaskStateWithOwner : : fromString ( status_data ) ;
if ( status . state = = TaskState : : Finished )
{
LOG_DEBUG ( log , " Task " < < current_task_status_path < < " has been successfully executed by " < < status . owner ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Finished ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
// Task is abandoned, initialize DROP PARTITION
LOG_DEBUG ( log , " Task " < < current_task_status_path < < " has not been successfully finished by " < < status . owner ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 17:45:15 +00:00
create_is_dirty_node ( ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-11-09 18:06:36 +00:00
}
}
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
zookeeper - > createAncestors ( current_task_status_path ) ;
2017-10-13 19:13:41 +00:00
2018-02-20 21:03:38 +00:00
/// We need to update table definitions for each partition, it could be changed after ALTER
createShardInternalTables ( task_shard ) ;
2017-10-13 19:13:41 +00:00
2017-11-14 16:59:45 +00:00
/// Check that destination partition is empty if we are first worker
/// NOTE: this check is incorrect if pull and push tables have different partition key!
{
2018-02-20 21:03:38 +00:00
ASTPtr query_select_ast = get_select_query ( task_shard . table_split_shard , " count() " ) ;
2017-11-14 16:59:45 +00:00
UInt64 count ;
{
Context local_context = context ;
// Use pull (i.e. readonly) settings, but fetch data from destination servers
2018-01-11 20:51:30 +00:00
local_context . getSettingsRef ( ) = task_cluster - > settings_pull ;
local_context . getSettingsRef ( ) . skip_unavailable_shards = true ;
2017-11-14 16:59:45 +00:00
2018-03-07 13:52:09 +00:00
Block block = getBlockWithAllStreamData ( InterpreterFactory : : get ( query_select_ast , local_context ) - > execute ( ) . in ) ;
2017-11-14 16:59:45 +00:00
count = ( block ) ? block . safeGetByPosition ( 0 ) . column - > getUInt ( 0 ) : 0 ;
}
if ( count ! = 0 )
{
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat_shards ;
2017-11-14 16:59:45 +00:00
zookeeper - > get ( task_partition . getPartitionShardsPath ( ) , & stat_shards ) ;
if ( stat_shards . numChildren = = 0 )
{
LOG_WARNING ( log , " There are no any workers for partition " < < task_partition . name
< < " , but destination table contains " < < count < < " rows "
< < " . Partition will be dropped and refilled. " ) ;
create_is_dirty_node ( ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-11-14 16:59:45 +00:00
}
}
}
2017-11-09 18:06:36 +00:00
/// Try start processing, create node about it
2017-10-13 19:13:41 +00:00
{
2017-11-09 18:06:36 +00:00
String start_state = TaskStateWithOwner : : getData ( TaskState : : Started , host_id ) ;
2018-03-24 00:45:04 +00:00
auto op_create = zkutil : : makeCreateRequest ( current_task_status_path , start_state , zkutil : : CreateMode : : Persistent ) ;
2018-03-25 00:15:52 +00:00
MultiTransactionInfo info = checkNoNodeAndCommit ( zookeeper , is_dirty_flag_path , std : : move ( op_create ) ) ;
2017-11-09 18:06:36 +00:00
2018-03-24 00:45:04 +00:00
if ( info . code )
2017-10-13 19:13:41 +00:00
{
2018-03-25 00:15:52 +00:00
zkutil : : KeeperMultiException exception ( info . code , info . requests , info . responses ) ;
if ( exception . getPathForFirstFailedOp ( ) = = is_dirty_flag_path )
2017-11-09 18:06:36 +00:00
{
LOG_INFO ( log , " Partition " < < task_partition . name < < " is dirty and will be dropped and refilled " ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-11-09 18:06:36 +00:00
}
2018-03-25 00:15:52 +00:00
throw exception ;
2017-10-13 19:13:41 +00:00
}
}
/// Try create table (if not exists) on each shard
{
2018-02-20 21:03:38 +00:00
auto create_query_push_ast = rewriteCreateQueryStorage ( task_shard . current_pull_table_create_query , task_table . table_push , task_table . engine_push_ast ) ;
2017-11-09 18:06:36 +00:00
typeid_cast < ASTCreateQuery & > ( * create_query_push_ast ) . if_not_exists = true ;
String query = queryToString ( create_query_push_ast ) ;
2017-10-13 19:13:41 +00:00
2018-02-20 21:03:38 +00:00
LOG_DEBUG ( log , " Create destination tables. Query: " < < query ) ;
2018-02-13 18:42:59 +00:00
size_t shards = executeQueryOnCluster ( task_table . cluster_push , query , create_query_push_ast , & task_cluster - > settings_push ,
PoolMode : : GET_MANY ) ;
2018-02-20 21:03:38 +00:00
LOG_DEBUG ( log , " Destination tables " < < getDatabaseDotTable ( task_table . table_push ) < < " have been created on " < < shards
< < " shards of " < < task_table . cluster_push - > getShardCount ( ) ) ;
2017-10-13 19:13:41 +00:00
}
2017-11-09 18:06:36 +00:00
/// Do the copying
2017-10-13 19:13:41 +00:00
{
2018-01-09 19:12:43 +00:00
bool inject_fault = false ;
if ( copy_fault_probability > 0 )
{
2018-05-08 12:56:32 +00:00
double value = std : : uniform_real_distribution < > ( 0 , 1 ) ( task_table . task_cluster . random_engine ) ;
2018-01-09 19:12:43 +00:00
inject_fault = value < copy_fault_probability ;
}
2017-11-14 16:59:45 +00:00
// Select all fields
2018-02-20 21:03:38 +00:00
ASTPtr query_select_ast = get_select_query ( task_shard . table_read_shard , " * " , inject_fault ? " 1 " : " " ) ;
2018-01-09 19:12:43 +00:00
2018-03-01 18:33:27 +00:00
LOG_DEBUG ( log , " Executing SELECT query and pull from " < < task_shard . getDescription ( )
< < " : " < < queryToString ( query_select_ast ) ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
ASTPtr query_insert_ast ;
{
String query ;
2018-02-20 21:03:38 +00:00
query + = " INSERT INTO " + getDatabaseDotTable ( task_shard . table_split_shard ) + " VALUES " ;
2017-11-09 18:06:36 +00:00
ParserQuery p_query ( query . data ( ) + query . size ( ) ) ;
2018-04-16 15:11:13 +00:00
query_insert_ast = parseQuery ( p_query , query , 0 ) ;
2017-11-09 18:06:36 +00:00
LOG_DEBUG ( log , " Executing INSERT query: " < < query ) ;
}
2017-10-13 19:13:41 +00:00
try
{
2017-11-09 18:06:36 +00:00
/// Custom INSERT SELECT implementation
Context context_select = context ;
context_select . getSettingsRef ( ) = task_cluster - > settings_pull ;
Context context_insert = context ;
context_insert . getSettingsRef ( ) = task_cluster - > settings_push ;
2018-03-14 19:07:57 +00:00
BlockInputStreamPtr input ;
BlockOutputStreamPtr output ;
{
BlockIO io_select = InterpreterFactory : : get ( query_select_ast , context_select ) - > execute ( ) ;
BlockIO io_insert = InterpreterFactory : : get ( query_insert_ast , context_insert ) - > execute ( ) ;
2018-03-30 16:25:26 +00:00
input = io_select . in ;
2018-03-14 19:07:57 +00:00
output = io_insert . out ;
}
2017-11-09 18:06:36 +00:00
2018-08-25 01:58:14 +00:00
std : : future < Coordination : : ExistsResponse > future_is_dirty_checker ;
2017-11-09 18:06:36 +00:00
Stopwatch watch ( CLOCK_MONOTONIC_COARSE ) ;
constexpr size_t check_period_milliseconds = 500 ;
/// Will asynchronously check that ZooKeeper connection and is_dirty flag appearing while copy data
auto cancel_check = [ & ] ( )
{
if ( zookeeper - > expired ( ) )
throw Exception ( " ZooKeeper session is expired, cancel INSERT SELECT " , ErrorCodes : : UNFINISHED ) ;
2018-06-04 15:26:20 +00:00
if ( ! future_is_dirty_checker . valid ( ) )
future_is_dirty_checker = zookeeper - > asyncExists ( is_dirty_flag_path ) ;
2018-03-26 18:39:28 +00:00
/// check_period_milliseconds should less than average insert time of single block
/// Otherwise, the insertion will slow a little bit
if ( watch . elapsedMilliseconds ( ) > = check_period_milliseconds )
2017-11-09 18:06:36 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : ExistsResponse status = future_is_dirty_checker . get ( ) ;
2017-11-09 18:06:36 +00:00
2018-08-25 01:58:14 +00:00
if ( status . error ! = Coordination : : ZNONODE )
2017-11-09 18:06:36 +00:00
throw Exception ( " Partition is dirty, cancel INSERT SELECT " , ErrorCodes : : UNFINISHED ) ;
}
return false ;
} ;
2018-02-07 13:02:47 +00:00
/// Update statistics
/// It is quite rough: bytes_copied don't take into account DROP PARTITION.
2018-03-11 18:36:09 +00:00
auto update_stats = [ & cluster_partition ] ( const Block & block )
2018-02-07 13:02:47 +00:00
{
2018-03-11 18:36:09 +00:00
cluster_partition . bytes_copied + = block . bytes ( ) ;
cluster_partition . rows_copied + = block . rows ( ) ;
cluster_partition . blocks_copied + = 1 ;
} ;
2018-02-07 13:02:47 +00:00
2017-11-09 18:06:36 +00:00
/// Main work is here
2018-03-14 19:07:57 +00:00
copyData ( * input , * output , cancel_check , update_stats ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
// Just in case
2018-06-04 15:26:20 +00:00
if ( future_is_dirty_checker . valid ( ) )
future_is_dirty_checker . get ( ) ;
2018-01-09 19:12:43 +00:00
if ( inject_fault )
throw Exception ( " Copy fault injection is activated " , ErrorCodes : : UNFINISHED ) ;
2017-10-13 19:13:41 +00:00
}
catch ( . . . )
{
2017-11-09 18:06:36 +00:00
tryLogCurrentException ( log , " An error occurred during copying, partition will be marked as dirty " ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-10-13 19:13:41 +00:00
}
}
2017-11-09 18:06:36 +00:00
/// Finalize the processing, change state of current partition task (and also check is_dirty flag)
2017-10-13 19:13:41 +00:00
{
2017-11-09 18:06:36 +00:00
String state_finished = TaskStateWithOwner : : getData ( TaskState : : Finished , host_id ) ;
2018-03-24 00:45:04 +00:00
auto op_set = zkutil : : makeSetRequest ( current_task_status_path , state_finished , 0 ) ;
2018-03-25 00:15:52 +00:00
MultiTransactionInfo info = checkNoNodeAndCommit ( zookeeper , is_dirty_flag_path , std : : move ( op_set ) ) ;
2017-11-09 18:06:36 +00:00
2018-03-24 00:45:04 +00:00
if ( info . code )
2017-11-09 18:06:36 +00:00
{
2018-03-25 00:15:52 +00:00
zkutil : : KeeperMultiException exception ( info . code , info . requests , info . responses ) ;
if ( exception . getPathForFirstFailedOp ( ) = = is_dirty_flag_path )
2017-11-09 18:06:36 +00:00
LOG_INFO ( log , " Partition " < < task_partition . name < < " became dirty and will be dropped and refilled " ) ;
else
2018-01-22 18:55:19 +00:00
LOG_INFO ( log , " Someone made the node abandoned. Will refill partition. " < < zkutil : : ZooKeeper : : error2string ( info . code ) ) ;
2017-11-09 18:06:36 +00:00
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Error ;
2017-11-09 18:06:36 +00:00
}
2017-10-13 19:13:41 +00:00
}
2017-11-14 17:45:15 +00:00
LOG_INFO ( log , " Partition " < < task_partition . name < < " copied " ) ;
2018-03-05 00:47:25 +00:00
return PartitionTaskStatus : : Finished ;
2017-10-13 19:13:41 +00:00
}
void dropAndCreateLocalTable ( const ASTPtr & create_ast )
{
auto & create = typeid_cast < ASTCreateQuery & > ( * create_ast ) ;
dropLocalTableIfExists ( { create . database , create . table } ) ;
InterpreterCreateQuery interpreter ( create_ast , context ) ;
interpreter . execute ( ) ;
}
void dropLocalTableIfExists ( const DatabaseAndTableName & table_name ) const
{
auto drop_ast = std : : make_shared < ASTDropQuery > ( ) ;
drop_ast - > if_exists = true ;
drop_ast - > database = table_name . first ;
drop_ast - > table = table_name . second ;
InterpreterDropQuery interpreter ( drop_ast , context ) ;
interpreter . execute ( ) ;
}
String getRemoteCreateTable ( const DatabaseAndTableName & table , Connection & connection , const Settings * settings = nullptr )
{
String query = " SHOW CREATE TABLE " + getDatabaseDotTable ( table ) ;
2018-02-15 18:54:12 +00:00
Block block = getBlockWithAllStreamData ( std : : make_shared < RemoteBlockInputStream > (
connection , query , InterpreterShowCreateQuery : : getSampleBlock ( ) , context , settings ) ) ;
2017-10-13 19:13:41 +00:00
2018-01-11 20:51:30 +00:00
return typeid_cast < const ColumnString & > ( * block . safeGetByPosition ( 0 ) . column ) . getDataAt ( 0 ) . toString ( ) ;
2017-10-13 19:13:41 +00:00
}
2018-02-20 21:03:38 +00:00
ASTPtr getCreateTableForPullShard ( TaskShard & task_shard )
{
/// Fetch and parse (possibly) new definition
auto connection_entry = task_shard . info . pool - > get ( & task_cluster - > settings_pull ) ;
String create_query_pull_str = getRemoteCreateTable ( task_shard . task_table . table_pull , * connection_entry ,
& task_cluster - > settings_pull ) ;
ParserCreateQuery parser_create_query ;
2018-04-16 15:11:13 +00:00
return parseQuery ( parser_create_query , create_query_pull_str , 0 ) ;
2018-02-20 21:03:38 +00:00
}
2018-03-01 18:33:27 +00:00
void createShardInternalTables ( TaskShard & task_shard , bool create_split = true )
2018-02-20 21:03:38 +00:00
{
TaskTable & task_table = task_shard . task_table ;
/// We need to update table definitions for each part, it could be changed after ALTER
task_shard . current_pull_table_create_query = getCreateTableForPullShard ( task_shard ) ;
/// Create local Distributed tables:
/// a table fetching data from current shard and a table inserting data to the whole destination cluster
String read_shard_prefix = " .read_shard_ " + toString ( task_shard . indexInCluster ( ) ) + " . " ;
String split_shard_prefix = " .split. " ;
task_shard . table_read_shard = DatabaseAndTableName ( working_database_name , read_shard_prefix + task_table . table_id ) ;
task_shard . table_split_shard = DatabaseAndTableName ( working_database_name , split_shard_prefix + task_table . table_id ) ;
/// Create special cluster with single shard
String shard_read_cluster_name = read_shard_prefix + task_table . cluster_pull_name ;
ClusterPtr cluster_pull_current_shard = task_table . cluster_pull - > getClusterWithSingleShard ( task_shard . indexInCluster ( ) ) ;
context . setCluster ( shard_read_cluster_name , cluster_pull_current_shard ) ;
auto storage_shard_ast = createASTStorageDistributed ( shard_read_cluster_name , task_table . table_pull . first , task_table . table_pull . second ) ;
const auto & storage_split_ast = task_table . engine_split_ast ;
auto create_query_ast = removeAliasColumnsFromCreateQuery ( task_shard . current_pull_table_create_query ) ;
auto create_table_pull_ast = rewriteCreateQueryStorage ( create_query_ast , task_shard . table_read_shard , storage_shard_ast ) ;
auto create_table_split_ast = rewriteCreateQueryStorage ( create_query_ast , task_shard . table_split_shard , storage_split_ast ) ;
dropAndCreateLocalTable ( create_table_pull_ast ) ;
2018-03-01 18:33:27 +00:00
if ( create_split )
dropAndCreateLocalTable ( create_table_split_ast ) ;
2018-02-20 21:03:38 +00:00
}
std : : set < String > getShardPartitions ( TaskShard & task_shard )
2017-10-13 19:13:41 +00:00
{
2018-03-01 18:33:27 +00:00
createShardInternalTables ( task_shard , false ) ;
2018-02-20 21:03:38 +00:00
TaskTable & task_table = task_shard . task_table ;
String query ;
2017-10-13 19:13:41 +00:00
{
WriteBufferFromOwnString wb ;
2018-02-20 21:03:38 +00:00
wb < < " SELECT DISTINCT " < < queryToString ( task_table . engine_push_partition_key_ast ) < < " AS partition FROM "
< < " " < < getDatabaseDotTable ( task_shard . table_read_shard ) < < " ORDER BY partition DESC " ;
query = wb . str ( ) ;
2017-10-13 19:13:41 +00:00
}
2018-02-20 21:03:38 +00:00
ParserQuery parser_query ( query . data ( ) + query . size ( ) ) ;
2018-04-16 15:11:13 +00:00
ASTPtr query_ast = parseQuery ( parser_query , query , 0 ) ;
2018-02-20 21:03:38 +00:00
2018-03-05 00:47:25 +00:00
LOG_DEBUG ( log , " Computing destination partition set, executing query: " < < query ) ;
2018-02-20 21:03:38 +00:00
Context local_context = context ;
2018-03-01 18:33:27 +00:00
local_context . setSettings ( task_cluster - > settings_pull ) ;
2018-03-07 13:52:09 +00:00
Block block = getBlockWithAllStreamData ( InterpreterFactory : : get ( query_ast , local_context ) - > execute ( ) . in ) ;
2018-02-20 21:03:38 +00:00
std : : set < String > res ;
2017-10-13 19:13:41 +00:00
if ( block )
{
2018-02-20 21:03:38 +00:00
ColumnWithTypeAndName & column = block . getByPosition ( 0 ) ;
task_shard . partition_key_column = column ;
for ( size_t i = 0 ; i < column . column - > size ( ) ; + + i )
2017-10-13 19:13:41 +00:00
{
2018-02-20 21:03:38 +00:00
WriteBufferFromOwnString wb ;
2018-06-08 02:24:27 +00:00
column . type - > serializeTextQuoted ( * column . column , i , wb , FormatSettings ( ) ) ;
2018-02-20 21:03:38 +00:00
res . emplace ( wb . str ( ) ) ;
2017-10-13 19:13:41 +00:00
}
}
2018-02-20 21:03:38 +00:00
LOG_DEBUG ( log , " There are " < < res . size ( ) < < " destination partitions in shard " < < task_shard . getDescription ( ) ) ;
2017-10-13 19:13:41 +00:00
return res ;
}
2018-03-05 00:47:25 +00:00
bool checkShardHasPartition ( TaskShard & task_shard , const String & partition_quoted_name )
{
createShardInternalTables ( task_shard , false ) ;
TaskTable & task_table = task_shard . task_table ;
String query ;
{
WriteBufferFromOwnString wb ;
wb < < " SELECT 1 "
< < " FROM " < < getDatabaseDotTable ( task_shard . table_read_shard )
< < " WHERE " < < queryToString ( task_table . engine_push_partition_key_ast ) < < " = " < < partition_quoted_name
< < " LIMIT 1 " ;
query = wb . str ( ) ;
}
LOG_DEBUG ( log , " Checking shard " < < task_shard . getDescription ( ) < < " for partition "
< < partition_quoted_name < < " existence, executing query: " < < query ) ;
ParserQuery parser_query ( query . data ( ) + query . size ( ) ) ;
2018-04-16 15:11:13 +00:00
ASTPtr query_ast = parseQuery ( parser_query , query , 0 ) ;
2018-03-05 00:47:25 +00:00
Context local_context = context ;
local_context . setSettings ( task_cluster - > settings_pull ) ;
2018-03-11 18:36:09 +00:00
return InterpreterFactory : : get ( query_ast , local_context ) - > execute ( ) . in - > read ( ) . rows ( ) ! = 0 ;
2018-03-05 00:47:25 +00:00
}
2017-11-09 18:06:36 +00:00
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
2018-02-15 18:54:12 +00:00
* Returns number of shards for which at least one replica executed query successfully
*/
2017-11-09 18:06:36 +00:00
size_t executeQueryOnCluster (
const ClusterPtr & cluster ,
const String & query ,
const ASTPtr & query_ast_ = nullptr ,
const Settings * settings = nullptr ,
PoolMode pool_mode = PoolMode : : GET_ALL ,
size_t max_successful_executions_per_shard = 0 ) const
2017-10-13 19:13:41 +00:00
{
auto num_shards = cluster - > getShardsInfo ( ) . size ( ) ;
2017-11-09 18:06:36 +00:00
std : : vector < size_t > per_shard_num_successful_replicas ( num_shards , 0 ) ;
ASTPtr query_ast ;
if ( query_ast_ = = nullptr )
{
ParserQuery p_query ( query . data ( ) + query . size ( ) ) ;
2018-04-16 15:11:13 +00:00
query_ast = parseQuery ( p_query , query , 0 ) ;
2017-11-09 18:06:36 +00:00
}
else
query_ast = query_ast_ ;
2017-10-13 19:13:41 +00:00
/// We need to execute query on one replica at least
auto do_for_shard = [ & ] ( size_t shard_index )
{
const Cluster : : ShardInfo & shard = cluster - > getShardsInfo ( ) . at ( shard_index ) ;
2017-11-09 18:06:36 +00:00
size_t & num_successful_executions = per_shard_num_successful_replicas . at ( shard_index ) ;
num_successful_executions = 0 ;
auto increment_and_check_exit = [ & ] ( )
{
+ + num_successful_executions ;
return max_successful_executions_per_shard & & num_successful_executions > = max_successful_executions_per_shard ;
} ;
2017-10-13 19:13:41 +00:00
2018-02-07 13:02:47 +00:00
size_t num_replicas = cluster - > getShardsAddresses ( ) . at ( shard_index ) . size ( ) ;
size_t num_local_replicas = shard . getLocalNodeCount ( ) ;
size_t num_remote_replicas = num_replicas - num_local_replicas ;
2017-10-13 19:13:41 +00:00
/// In that case we don't have local replicas, but do it just in case
2018-02-07 13:02:47 +00:00
for ( size_t i = 0 ; i < num_local_replicas ; + + i )
2017-10-13 19:13:41 +00:00
{
2017-11-09 18:06:36 +00:00
auto interpreter = InterpreterFactory : : get ( query_ast , context ) ;
interpreter - > execute ( ) ;
if ( increment_and_check_exit ( ) )
return ;
2017-10-13 19:13:41 +00:00
}
/// Will try to make as many as possible queries
if ( shard . hasRemoteConnections ( ) )
{
2018-02-13 18:42:59 +00:00
Settings current_settings = settings ? * settings : task_cluster - > settings_common ;
2018-02-07 13:02:47 +00:00
current_settings . max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1 ;
2018-02-20 21:03:38 +00:00
auto connections = shard . pool - > getMany ( & current_settings , pool_mode ) ;
2017-10-13 19:13:41 +00:00
for ( auto & connection : connections )
{
2018-02-13 18:42:59 +00:00
if ( connection . isNull ( ) )
continue ;
try
2017-10-13 19:13:41 +00:00
{
2018-02-22 11:40:23 +00:00
/// CREATE TABLE and DROP PARTITION queries return empty block
RemoteBlockInputStream stream { * connection , query , Block { } , context , & current_settings } ;
NullBlockOutputStream output { Block { } } ;
2018-02-13 18:42:59 +00:00
copyData ( stream , output ) ;
2017-11-09 18:06:36 +00:00
2018-02-13 18:42:59 +00:00
if ( increment_and_check_exit ( ) )
return ;
}
2018-08-26 01:36:41 +00:00
catch ( const Exception & )
2018-02-13 18:42:59 +00:00
{
LOG_INFO ( log , getCurrentExceptionMessage ( false , true ) ) ;
2017-10-13 19:13:41 +00:00
}
}
}
} ;
2017-11-09 18:06:36 +00:00
{
ThreadPool thread_pool ( std : : min ( num_shards , getNumberOfPhysicalCPUCores ( ) ) ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
for ( size_t shard_index = 0 ; shard_index < num_shards ; + + shard_index )
thread_pool . schedule ( [ = ] { do_for_shard ( shard_index ) ; } ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
thread_pool . wait ( ) ;
}
size_t successful_shards = 0 ;
for ( size_t num_replicas : per_shard_num_successful_replicas )
successful_shards + = ( num_replicas > 0 ) ;
2017-10-13 19:13:41 +00:00
2017-11-09 18:06:36 +00:00
return successful_shards ;
2017-10-13 19:13:41 +00:00
}
private :
String task_zookeeper_path ;
2018-02-13 18:42:59 +00:00
String task_description_path ;
2017-10-13 19:13:41 +00:00
String host_id ;
String working_database_name ;
2018-03-06 14:36:40 +00:00
/// Auto update config stuff
2018-02-13 18:42:59 +00:00
UInt64 task_descprtion_current_version = 1 ;
std : : atomic < UInt64 > task_descprtion_version { 1 } ;
2018-08-25 01:58:14 +00:00
Coordination : : WatchCallback task_description_watch_callback ;
2018-03-06 14:36:40 +00:00
/// ZooKeeper session used to set the callback
zkutil : : ZooKeeperPtr task_description_watch_zookeeper ;
2018-02-13 18:42:59 +00:00
ConfigurationPtr task_cluster_initial_config ;
ConfigurationPtr task_cluster_current_config ;
2018-08-25 01:58:14 +00:00
Coordination : : Stat task_descprtion_current_stat ;
2018-01-09 19:12:43 +00:00
2017-10-13 19:13:41 +00:00
std : : unique_ptr < TaskCluster > task_cluster ;
2018-02-13 18:42:59 +00:00
bool is_safe_mode = false ;
double copy_fault_probability = 0.0 ;
2017-10-13 19:13:41 +00:00
Context & context ;
Poco : : Logger * log ;
2017-11-09 18:06:36 +00:00
std : : chrono : : milliseconds default_sleep_time { 1000 } ;
2017-10-13 19:13:41 +00:00
} ;
2018-01-22 18:33:18 +00:00
/// ClusterCopierApp
void ClusterCopierApp : : initialize ( Poco : : Util : : Application & self )
2017-10-13 19:13:41 +00:00
{
2018-01-22 18:33:18 +00:00
is_help = config ( ) . has ( " help " ) ;
if ( is_help )
return ;
config_xml_path = config ( ) . getString ( " config-file " ) ;
task_path = config ( ) . getString ( " task-path " ) ;
log_level = config ( ) . getString ( " log-level " , " debug " ) ;
is_safe_mode = config ( ) . has ( " safe-mode " ) ;
if ( config ( ) . has ( " copy-fault-probability " ) )
copy_fault_probability = std : : max ( std : : min ( config ( ) . getDouble ( " copy-fault-probability " ) , 1.0 ) , 0.0 ) ;
base_dir = ( config ( ) . has ( " base-dir " ) ) ? config ( ) . getString ( " base-dir " ) : Poco : : Path : : current ( ) ;
2018-02-07 13:02:47 +00:00
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco : : Timestamp ( ) . epochTime ( ) ;
auto pid = Poco : : Process : : id ( ) ;
process_id = std : : to_string ( DateLUT : : instance ( ) . toNumYYYYMMDDhhmmss ( timestamp ) ) + " _ " + std : : to_string ( pid ) ;
2018-01-22 18:33:18 +00:00
host_id = escapeForFileName ( getFQDNOrHostName ( ) ) + ' # ' + process_id ;
process_path = Poco : : Path ( base_dir + " /clickhouse-copier_ " + process_id ) . absolute ( ) . toString ( ) ;
Poco : : File ( process_path ) . createDirectories ( ) ;
2018-05-14 14:12:33 +00:00
/// Override variables for BaseDaemon
if ( config ( ) . has ( " log-level " ) )
config ( ) . setString ( " logger.level " , config ( ) . getString ( " log-level " ) ) ;
if ( config ( ) . has ( " base-dir " ) | | ! config ( ) . has ( " logger.log " ) )
config ( ) . setString ( " logger.log " , process_path + " /log.log " ) ;
2018-01-22 18:33:18 +00:00
2018-05-14 14:12:33 +00:00
if ( config ( ) . has ( " base-dir " ) | | ! config ( ) . has ( " logger.errorlog " ) )
config ( ) . setString ( " logger.errorlog " , process_path + " /log.err.log " ) ;
Base : : initialize ( self ) ;
2018-01-22 18:33:18 +00:00
}
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
void ClusterCopierApp : : handleHelp ( const std : : string & , const std : : string & )
{
Poco : : Util : : HelpFormatter helpFormatter ( options ( ) ) ;
helpFormatter . setCommand ( commandName ( ) ) ;
helpFormatter . setHeader ( " Copies tables from one cluster to another " ) ;
helpFormatter . setUsage ( " --config-file <config-file> --task-path <task-path> " ) ;
helpFormatter . format ( std : : cerr ) ;
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
stopOptionsProcessing ( ) ;
}
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
void ClusterCopierApp : : defineOptions ( Poco : : Util : : OptionSet & options )
{
2018-05-14 14:12:33 +00:00
Base : : defineOptions ( options ) ;
2018-02-07 13:02:47 +00:00
2018-01-22 18:33:18 +00:00
options . addOption ( Poco : : Util : : Option ( " task-path " , " " , " path to task in ZooKeeper " )
. argument ( " task-path " ) . binding ( " task-path " ) ) ;
options . addOption ( Poco : : Util : : Option ( " safe-mode " , " " , " disables ALTER DROP PARTITION in case of errors " )
. binding ( " safe-mode " ) ) ;
options . addOption ( Poco : : Util : : Option ( " copy-fault-probability " , " " , " the copying fails with specified probability (used to test partition state recovering) " )
. argument ( " copy-fault-probability " ) . binding ( " copy-fault-probability " ) ) ;
options . addOption ( Poco : : Util : : Option ( " log-level " , " " , " sets log level " )
. argument ( " log-level " ) . binding ( " log-level " ) ) ;
options . addOption ( Poco : : Util : : Option ( " base-dir " , " " , " base directory for copiers, consequitive copier launches will populate /base-dir/launch_id/* directories " )
. argument ( " base-dir " ) . binding ( " base-dir " ) ) ;
using Me = std : : decay_t < decltype ( * this ) > ;
options . addOption ( Poco : : Util : : Option ( " help " , " " , " produce this help message " ) . binding ( " help " )
. callback ( Poco : : Util : : OptionCallback < Me > ( this , & Me : : handleHelp ) ) ) ;
}
2017-10-13 19:13:41 +00:00
2017-11-15 17:09:16 +00:00
2018-01-22 18:33:18 +00:00
void ClusterCopierApp : : mainImpl ( )
{
StatusFile status_file ( process_path + " /status " ) ;
2017-11-14 13:13:24 +00:00
2018-05-14 14:12:33 +00:00
auto log = & logger ( ) ;
2018-01-22 18:33:18 +00:00
LOG_INFO ( log , " Starting clickhouse-copier ( "
< < " id " < < process_id < < " , "
< < " host_id " < < host_id < < " , "
< < " path " < < process_path < < " , "
< < " revision " < < ClickHouseRevision : : get ( ) < < " ) " ) ;
2017-11-15 17:09:16 +00:00
2018-01-22 18:33:18 +00:00
auto context = std : : make_unique < Context > ( Context : : createGlobal ( ) ) ;
SCOPE_EXIT ( context - > shutdown ( ) ) ;
2017-10-13 19:13:41 +00:00
2018-05-14 14:12:33 +00:00
context - > setConfig ( loaded_config . configuration ) ;
2018-01-22 18:33:18 +00:00
context - > setGlobalContext ( * context ) ;
context - > setApplicationType ( Context : : ApplicationType : : LOCAL ) ;
context - > setPath ( process_path ) ;
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
registerFunctions ( ) ;
registerAggregateFunctions ( ) ;
registerTableFunctions ( ) ;
registerStorages ( ) ;
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
static const std : : string default_database = " _local " ;
context - > addDatabase ( default_database , std : : make_shared < DatabaseMemory > ( default_database ) ) ;
context - > setCurrentDatabase ( default_database ) ;
2017-10-13 19:13:41 +00:00
2018-06-20 17:49:52 +00:00
/// Initialize query scope just in case.
CurrentThread : : QueryScope query_scope ( * context ) ;
2018-05-08 12:56:32 +00:00
auto copier = std : : make_unique < ClusterCopier > ( task_path , host_id , default_database , * context ) ;
2018-01-22 18:33:18 +00:00
copier - > setSafeMode ( is_safe_mode ) ;
copier - > setCopyFaultProbability ( copy_fault_probability ) ;
copier - > init ( ) ;
copier - > process ( ) ;
}
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
int ClusterCopierApp : : main ( const std : : vector < std : : string > & )
{
if ( is_help )
return 0 ;
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
try
{
mainImpl ( ) ;
}
catch ( . . . )
{
2018-02-20 21:03:38 +00:00
tryLogCurrentException ( & Poco : : Logger : : root ( ) , __PRETTY_FUNCTION__ ) ;
2018-01-22 18:33:18 +00:00
auto code = getCurrentExceptionCode ( ) ;
2017-10-13 19:13:41 +00:00
2018-01-22 18:33:18 +00:00
return ( code ) ? code : - 1 ;
}
return 0 ;
}
2017-10-13 19:13:41 +00:00
}
int mainEntryClickHouseClusterCopier ( int argc , char * * argv )
{
try
{
DB : : ClusterCopierApp app ;
2017-11-15 17:09:16 +00:00
return app . run ( argc , argv ) ;
2017-10-13 19:13:41 +00:00
}
catch ( . . . )
{
std : : cerr < < DB : : getCurrentExceptionMessage ( true ) < < " \n " ;
auto code = DB : : getCurrentExceptionCode ( ) ;
2017-11-14 17:45:15 +00:00
return ( code ) ? code : - 1 ;
2017-10-13 19:13:41 +00:00
}
}