2017-04-13 13:42:29 +00:00
# include <Interpreters/DDLWorker.h>
2017-04-13 16:12:56 +00:00
# include <Parsers/ASTAlterQuery.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTDropQuery.h>
# include <Parsers/ASTOptimizeQuery.h>
2017-04-21 12:39:28 +00:00
# include <Parsers/ASTQueryWithOnCluster.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTQueryWithTableAndOutput.h>
2017-04-25 15:21:03 +00:00
# include <Parsers/ParserQuery.h>
# include <Parsers/parseQuery.h>
# include <Parsers/queryToString.h>
2017-04-17 17:04:31 +00:00
# include <IO/WriteHelpers.h>
# include <IO/ReadHelpers.h>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
2017-04-25 15:21:03 +00:00
# include <Storages/IStorage.h>
2020-03-12 09:00:09 +00:00
# include <Storages/StorageDistributed.h>
2019-01-23 14:48:50 +00:00
# include <DataStreams/IBlockInputStream.h>
2017-04-13 13:42:29 +00:00
# include <Interpreters/executeQuery.h>
2017-04-13 16:12:56 +00:00
# include <Interpreters/Cluster.h>
2018-10-24 15:31:07 +00:00
# include <Interpreters/AddDefaultDatabaseVisitor.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2020-01-26 09:49:53 +00:00
# include <Access/AccessRightsElement.h>
2020-06-20 22:44:52 +00:00
# include <Access/ContextAccess.h>
2018-04-19 13:56:14 +00:00
# include <Common/DNSResolver.h>
2018-01-22 15:56:30 +00:00
# include <Common/Macros.h>
2017-04-27 15:19:11 +00:00
# include <Common/setThreadName.h>
# include <Common/Stopwatch.h>
2017-09-09 23:17:38 +00:00
# include <Common/randomSeed.h>
2020-03-12 14:36:54 +00:00
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/ZooKeeper/KeeperException.h>
# include <Common/isLocalAddress.h>
# include <Common/quoteString.h>
2017-04-13 16:12:56 +00:00
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeArray.h>
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnArray.h>
2019-06-26 14:52:20 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
2017-04-19 14:21:27 +00:00
# include <Poco/Timestamp.h>
2020-09-02 22:35:47 +00:00
# include <Poco/Net/NetException.h>
2020-03-12 14:36:54 +00:00
# include <common/sleep.h>
2020-09-02 22:35:47 +00:00
# include <common/getFQDNOrHostName.h>
2017-09-09 23:17:38 +00:00
# include <random>
# include <pcg_random.hpp>
2017-04-13 13:42:29 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED ;
2018-10-25 09:40:30 +00:00
extern const int LOGICAL_ERROR ;
2017-04-17 17:04:31 +00:00
extern const int UNKNOWN_FORMAT_VERSION ;
2017-04-25 15:21:03 +00:00
extern const int INCONSISTENT_CLUSTER_DEFINITION ;
2017-04-27 15:19:11 +00:00
extern const int TIMEOUT_EXCEEDED ;
2017-07-26 19:31:32 +00:00
extern const int UNKNOWN_TYPE_OF_QUERY ;
2017-08-02 14:42:35 +00:00
extern const int UNFINISHED ;
2018-02-28 13:23:40 +00:00
extern const int QUERY_IS_PROHIBITED ;
2017-04-13 13:42:29 +00:00
}
2017-07-28 16:14:49 +00:00
namespace
{
struct HostID
{
String host_name ;
UInt16 port ;
HostID ( ) = default ;
2017-09-07 21:04:48 +00:00
explicit HostID ( const Cluster : : Address & address )
2017-07-28 16:14:49 +00:00
: host_name ( address . host_name ) , port ( address . port ) { }
static HostID fromString ( const String & host_port_str )
{
HostID res ;
2019-01-21 19:45:26 +00:00
std : : tie ( res . host_name , res . port ) = Cluster : : Address : : fromString ( host_port_str ) ;
2017-07-28 16:14:49 +00:00
return res ;
}
String toString ( ) const
{
return Cluster : : Address : : toString ( host_name , port ) ;
}
String readableString ( ) const
{
return host_name + " : " + DB : : toString ( port ) ;
}
2017-09-07 14:38:35 +00:00
bool isLocalAddress ( UInt16 clickhouse_port ) const
2017-07-28 16:14:49 +00:00
{
2017-08-01 14:41:00 +00:00
try
{
2018-04-19 13:56:14 +00:00
return DB : : isLocalAddress ( DNSResolver : : instance ( ) . resolveAddress ( host_name , port ) , clickhouse_port ) ;
2017-08-01 14:41:00 +00:00
}
2018-08-10 04:02:56 +00:00
catch ( const Poco : : Net : : NetException & )
2017-08-01 14:41:00 +00:00
{
/// Avoid "Host not found" exceptions
return false ;
}
2017-07-28 16:14:49 +00:00
}
static String applyToString ( const HostID & host_id )
{
return host_id . toString ( ) ;
}
} ;
}
2017-04-17 17:04:31 +00:00
struct DDLLogEntry
{
String query ;
2017-07-28 16:14:49 +00:00
std : : vector < HostID > hosts ;
2017-05-30 11:49:17 +00:00
String initiator ; // optional
2017-04-17 17:04:31 +00:00
2017-06-01 09:22:22 +00:00
static constexpr int CURRENT_VERSION = 1 ;
2017-04-17 17:04:31 +00:00
String toString ( )
{
2017-07-31 21:39:24 +00:00
WriteBufferFromOwnString wb ;
2017-07-28 16:14:49 +00:00
Strings host_id_strings ( hosts . size ( ) ) ;
std : : transform ( hosts . begin ( ) , hosts . end ( ) , host_id_strings . begin ( ) , HostID : : applyToString ) ;
2017-07-31 21:39:24 +00:00
auto version = CURRENT_VERSION ;
wb < < " version: " < < version < < " \n " ;
wb < < " query: " < < escape < < query < < " \n " ;
2017-07-28 16:14:49 +00:00
wb < < " hosts: " < < host_id_strings < < " \n " ;
2017-07-31 21:39:24 +00:00
wb < < " initiator: " < < initiator < < " \n " ;
return wb . str ( ) ;
2017-04-17 17:04:31 +00:00
}
void parse ( const String & data )
{
ReadBufferFromString rb ( data ) ;
2017-05-30 11:49:17 +00:00
int version ;
rb > > " version: " > > version > > " \n " ;
2017-04-17 17:04:31 +00:00
if ( version ! = CURRENT_VERSION )
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : UNKNOWN_FORMAT_VERSION , " Unknown DDLLogEntry format version: {} " , version ) ;
2017-04-17 17:04:31 +00:00
2017-07-28 16:14:49 +00:00
Strings host_id_strings ;
2017-05-31 14:01:08 +00:00
rb > > " query: " > > escape > > query > > " \n " ;
2017-07-28 16:14:49 +00:00
rb > > " hosts: " > > host_id_strings > > " \n " ;
2017-05-30 11:49:17 +00:00
if ( ! rb . eof ( ) )
rb > > " initiator: " > > initiator > > " \n " ;
else
initiator . clear ( ) ;
2017-04-17 17:04:31 +00:00
assertEOF ( rb ) ;
2017-07-28 16:14:49 +00:00
hosts . resize ( host_id_strings . size ( ) ) ;
std : : transform ( host_id_strings . begin ( ) , host_id_strings . end ( ) , hosts . begin ( ) , HostID : : fromString ) ;
2017-04-17 17:04:31 +00:00
}
} ;
2017-04-13 13:42:29 +00:00
2017-07-26 19:31:32 +00:00
struct DDLTask
2017-04-25 15:21:03 +00:00
{
2017-07-28 16:14:49 +00:00
/// Stages of task lifetime correspond ordering of these data fields:
/// Stage 1: parse entry
2017-07-26 19:31:32 +00:00
String entry_name ;
2017-07-28 16:14:49 +00:00
String entry_path ;
DDLLogEntry entry ;
/// Stage 2: resolve host_id and check that
HostID host_id ;
String host_id_str ;
2017-07-26 19:31:32 +00:00
2017-07-28 16:14:49 +00:00
/// Stage 3.1: parse query
2017-07-26 19:31:32 +00:00
ASTPtr query ;
ASTQueryWithOnCluster * query_on_cluster = nullptr ;
2017-07-28 16:14:49 +00:00
/// Stage 3.2: check cluster and find the host in cluster
2017-07-26 19:31:32 +00:00
String cluster_name ;
ClusterPtr cluster ;
2017-07-28 16:14:49 +00:00
Cluster : : Address address_in_cluster ;
2017-07-26 19:31:32 +00:00
size_t host_shard_num ;
size_t host_replica_num ;
2017-07-28 16:14:49 +00:00
/// Stage 3.3: execute query
ExecutionStatus execution_status ;
bool was_executed = false ;
2017-07-26 19:31:32 +00:00
2017-07-28 16:14:49 +00:00
/// Stage 4: commit results to ZooKeeper
2017-07-26 19:31:32 +00:00
} ;
2017-04-25 15:21:03 +00:00
2020-08-18 19:02:07 +00:00
namespace
{
/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
* and highlights your poor understanding of distributed systems .
*
* It ' s only correct if all the operations that are performed under lock
* are atomically checking that the lock still holds
* or if we ensure that these operations will be undone if lock is lost
* ( due to ZooKeeper session loss ) that ' s very difficult to achieve .
*
* It ' s Ok if every operation that we perform under lock is actually operation in ZooKeeper .
*
* In 1 % of cases when you can correctly use Lock , the logic is complex enough , so you don ' t need this class .
*
* TLDR : Don ' t use this code .
* We only have a few cases of it ' s usage and it will be removed .
*/
class ZooKeeperLock
2017-08-02 14:42:35 +00:00
{
2020-08-18 19:02:07 +00:00
public :
/// lock_prefix - path where the ephemeral lock node will be created
/// lock_name - the name of the ephemeral lock node
ZooKeeperLock (
const zkutil : : ZooKeeperPtr & zookeeper_ ,
const std : : string & lock_prefix_ ,
const std : : string & lock_name_ ,
const std : : string & lock_message_ = " " )
:
zookeeper ( zookeeper_ ) ,
lock_path ( lock_prefix_ + " / " + lock_name_ ) ,
lock_message ( lock_message_ ) ,
log ( & Poco : : Logger : : get ( " zkutil::Lock " ) )
{
zookeeper - > createIfNotExists ( lock_prefix_ , " " ) ;
}
~ ZooKeeperLock ( )
{
try
{
unlock ( ) ;
}
catch ( . . . )
{
DB : : tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
void unlock ( )
{
Coordination : : Stat stat ;
std : : string dummy ;
bool result = zookeeper - > tryGet ( lock_path , dummy , & stat ) ;
if ( result & & stat . ephemeralOwner = = zookeeper - > getClientID ( ) )
zookeeper - > remove ( lock_path , - 1 ) ;
else
LOG_WARNING ( log , " Lock is lost. It is normal if session was expired. Path: {}/{} " , lock_path , lock_message ) ;
}
bool tryLock ( )
{
std : : string dummy ;
Coordination : : Error code = zookeeper - > tryCreate ( lock_path , lock_message , zkutil : : CreateMode : : Ephemeral , dummy ) ;
if ( code = = Coordination : : Error : : ZNODEEXISTS )
{
return false ;
}
else if ( code = = Coordination : : Error : : ZOK )
{
return true ;
}
else
{
throw Coordination : : Exception ( code ) ;
}
}
private :
zkutil : : ZooKeeperPtr zookeeper ;
std : : string lock_path ;
std : : string lock_message ;
Poco : : Logger * log ;
} ;
std : : unique_ptr < ZooKeeperLock > createSimpleZooKeeperLock (
const zkutil : : ZooKeeperPtr & zookeeper , const String & lock_prefix , const String & lock_name , const String & lock_message )
2017-08-02 14:42:35 +00:00
{
2020-08-18 19:02:07 +00:00
return std : : make_unique < ZooKeeperLock > ( zookeeper , lock_prefix , lock_name , lock_message ) ;
}
2017-08-02 14:42:35 +00:00
}
2017-04-25 15:21:03 +00:00
static bool isSupportedAlterType ( int type )
{
2019-06-14 03:06:08 +00:00
static const std : : unordered_set < int > unsupported_alter_types {
ASTAlterCommand : : ATTACH_PARTITION ,
ASTAlterCommand : : REPLACE_PARTITION ,
ASTAlterCommand : : FETCH_PARTITION ,
ASTAlterCommand : : FREEZE_PARTITION ,
ASTAlterCommand : : FREEZE_ALL ,
ASTAlterCommand : : NO_TYPE ,
2017-04-25 15:21:03 +00:00
} ;
2019-06-14 03:06:08 +00:00
return unsupported_alter_types . count ( type ) = = 0 ;
2017-04-25 15:21:03 +00:00
}
2020-09-02 22:35:47 +00:00
DDLWorker : : DDLWorker ( int pool_size_ , const std : : string & zk_root_dir , Context & context_ , const Poco : : Util : : AbstractConfiguration * config , const String & prefix )
: context ( context_ )
, log ( & Poco : : Logger : : get ( " DDLWorker " ) )
, pool_size ( pool_size_ )
, worker_pool ( pool_size_ )
2017-04-13 16:12:56 +00:00
{
2020-09-02 22:35:47 +00:00
last_tasks . reserve ( pool_size ) ;
2017-04-27 15:19:11 +00:00
queue_dir = zk_root_dir ;
if ( queue_dir . back ( ) = = ' / ' )
queue_dir . resize ( queue_dir . size ( ) - 1 ) ;
2017-04-13 13:42:29 +00:00
2017-08-01 14:41:00 +00:00
if ( config )
{
2017-08-14 05:44:04 +00:00
task_max_lifetime = config - > getUInt64 ( prefix + " .task_max_lifetime " , static_cast < UInt64 > ( task_max_lifetime ) ) ;
cleanup_delay_period = config - > getUInt64 ( prefix + " .cleanup_delay_period " , static_cast < UInt64 > ( cleanup_delay_period ) ) ;
2019-03-12 12:06:17 +00:00
max_tasks_in_queue = std : : max < UInt64 > ( 1 , config - > getUInt64 ( prefix + " .max_tasks_in_queue " , max_tasks_in_queue ) ) ;
2018-02-01 13:52:29 +00:00
if ( config - > has ( prefix + " .profile " ) )
context . setSetting ( " profile " , config - > getString ( prefix + " .profile " ) ) ;
}
2018-03-11 00:15:26 +00:00
if ( context . getSettingsRef ( ) . readonly )
2018-02-01 13:52:29 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries Set appropriate system_profile or distributed_ddl.profile to fix this. " ) ;
2017-08-01 14:41:00 +00:00
}
2017-07-26 19:31:32 +00:00
host_fqdn = getFQDNOrHostName ( ) ;
host_fqdn_id = Cluster : : Address : : toString ( host_fqdn , context . getTCPPort ( ) ) ;
2017-04-13 13:42:29 +00:00
2019-02-22 18:36:36 +00:00
main_thread = ThreadFromGlobalPool ( & DDLWorker : : runMainThread , this ) ;
cleanup_thread = ThreadFromGlobalPool ( & DDLWorker : : runCleanupThread , this ) ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
2017-04-13 13:42:29 +00:00
DDLWorker : : ~ DDLWorker ( )
{
stop_flag = true ;
2019-02-22 18:36:36 +00:00
queue_updated_event - > set ( ) ;
cleanup_event - > set ( ) ;
2020-09-02 22:35:47 +00:00
worker_pool . wait ( ) ;
2019-02-22 18:36:36 +00:00
main_thread . join ( ) ;
cleanup_thread . join ( ) ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
2019-02-22 16:14:48 +00:00
DDLWorker : : ZooKeeperPtr DDLWorker : : tryGetZooKeeper ( ) const
{
std : : lock_guard lock ( zookeeper_mutex ) ;
return current_zookeeper ;
}
DDLWorker : : ZooKeeperPtr DDLWorker : : getAndSetZooKeeper ( )
{
std : : lock_guard lock ( zookeeper_mutex ) ;
if ( ! current_zookeeper | | current_zookeeper - > expired ( ) )
current_zookeeper = context . getZooKeeper ( ) ;
return current_zookeeper ;
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : recoverZooKeeper ( )
{
LOG_DEBUG ( log , " Recovering ZooKeeper session after: {} " , getCurrentExceptionMessage ( false ) ) ;
2019-02-22 16:14:48 +00:00
2020-09-02 22:35:47 +00:00
while ( ! stop_flag )
{
try
{
getAndSetZooKeeper ( ) ;
break ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
sleepForSeconds ( 5 ) ;
}
}
}
DDLTaskPtr DDLWorker : : initAndCheckTask ( const String & entry_name , String & out_reason , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2017-07-28 16:14:49 +00:00
String node_data ;
2017-08-01 14:41:00 +00:00
String entry_path = queue_dir + " / " + entry_name ;
if ( ! zookeeper - > tryGet ( entry_path , node_data ) )
2017-07-28 16:14:49 +00:00
{
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
2017-08-12 20:00:00 +00:00
out_reason = " The task was deleted " ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
2017-04-13 16:12:56 +00:00
2017-08-01 14:41:00 +00:00
auto task = std : : make_unique < DDLTask > ( ) ;
task - > entry_name = entry_name ;
task - > entry_path = entry_path ;
2017-07-28 16:14:49 +00:00
try
{
2017-08-01 14:41:00 +00:00
task - > entry . parse ( node_data ) ;
2017-07-28 16:14:49 +00:00
}
catch ( . . . )
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
2019-01-22 19:56:53 +00:00
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
2019-02-28 07:27:30 +00:00
/// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
2017-04-13 16:12:56 +00:00
2017-08-01 14:41:00 +00:00
tryLogCurrentException ( log , " Cannot parse DDL task " + entry_name + " , will try to send error status " ) ;
2017-04-13 16:12:56 +00:00
2017-07-28 16:14:49 +00:00
String status = ExecutionStatus : : fromCurrentException ( ) . serializeText ( ) ;
2017-08-01 14:41:00 +00:00
try
{
2019-02-22 16:14:48 +00:00
createStatusDirs ( entry_path , zookeeper ) ;
2017-08-01 14:41:00 +00:00
zookeeper - > tryCreate ( entry_path + " /finished/ " + host_fqdn_id , status , zkutil : : CreateMode : : Persistent ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " Can't report the task has invalid format " ) ;
}
2017-04-27 15:19:11 +00:00
2017-08-12 20:00:00 +00:00
out_reason = " Incorrect task format " ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
bool host_in_hostlist = false ;
2017-08-01 14:41:00 +00:00
for ( const HostID & host : task - > entry . hosts )
2017-07-28 16:14:49 +00:00
{
2018-11-06 14:42:30 +00:00
auto maybe_secure_port = context . getTCPPortSecure ( ) ;
2018-11-19 15:09:19 +00:00
2018-11-18 22:04:11 +00:00
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port = ( maybe_secure_port & & host . isLocalAddress ( * maybe_secure_port ) )
| | host . isLocalAddress ( context . getTCPPort ( ) ) ;
2018-11-06 16:38:01 +00:00
if ( ! is_local_port )
2017-04-18 15:44:31 +00:00
continue ;
2017-04-17 17:04:31 +00:00
2017-07-28 16:14:49 +00:00
if ( host_in_hostlist )
2017-07-26 19:31:32 +00:00
{
2017-07-28 16:14:49 +00:00
/// This check could be slow a little bit
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " There are two the same ClickHouse instances in task {}: {} and {}. Will use the first one only. " , entry_name , task - > host_id . readableString ( ) , host . readableString ( ) ) ;
2017-07-26 19:31:32 +00:00
}
2017-07-28 16:14:49 +00:00
else
2017-07-26 19:31:32 +00:00
{
2017-07-28 16:14:49 +00:00
host_in_hostlist = true ;
2017-08-01 14:41:00 +00:00
task - > host_id = host ;
task - > host_id_str = host . toString ( ) ;
2017-07-28 16:14:49 +00:00
}
}
2017-07-27 11:30:27 +00:00
2020-09-02 22:35:47 +00:00
if ( ! host_in_hostlist )
{
2017-08-12 20:00:00 +00:00
out_reason = " There is no a local address in host list " ;
2020-09-02 22:35:47 +00:00
return { } ;
}
2017-08-01 14:41:00 +00:00
2020-09-02 22:35:47 +00:00
return task ;
2017-07-28 16:14:49 +00:00
}
2017-07-27 11:30:27 +00:00
2017-08-02 14:42:35 +00:00
static void filterAndSortQueueNodes ( Strings & all_nodes )
{
all_nodes . erase ( std : : remove_if ( all_nodes . begin ( ) , all_nodes . end ( ) , [ ] ( const String & s ) { return ! startsWith ( s , " query- " ) ; } ) , all_nodes . end ( ) ) ;
std : : sort ( all_nodes . begin ( ) , all_nodes . end ( ) ) ;
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : scheduleTasks ( )
2017-07-28 16:14:49 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Scheduling tasks " ) ;
2019-02-22 16:14:48 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
2017-07-27 11:30:27 +00:00
2019-02-22 18:36:36 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir , nullptr , queue_updated_event ) ;
2017-08-02 14:42:35 +00:00
filterAndSortQueueNodes ( queue_nodes ) ;
2017-07-28 16:14:49 +00:00
if ( queue_nodes . empty ( ) )
return ;
2017-07-26 19:31:32 +00:00
2020-09-02 22:35:47 +00:00
bool server_startup = last_tasks . empty ( ) ;
2017-07-28 16:14:49 +00:00
auto begin_node = server_startup
? queue_nodes . begin ( )
2020-09-02 22:35:47 +00:00
: std : : upper_bound ( queue_nodes . begin ( ) , queue_nodes . end ( ) , last_tasks . back ( ) ) ;
2017-07-28 16:14:49 +00:00
for ( auto it = begin_node ; it ! = queue_nodes . end ( ) ; + + it )
{
String entry_name = * it ;
2020-09-02 22:35:47 +00:00
String reason ;
auto task = initAndCheckTask ( entry_name , reason , zookeeper ) ;
if ( ! task )
2017-07-26 19:31:32 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Will not execute task {}: {} " , entry_name , reason ) ;
saveTask ( entry_name ) ;
continue ;
2017-07-26 19:31:32 +00:00
}
2020-09-02 22:35:47 +00:00
bool already_processed = zookeeper - > exists ( task - > entry_path + " /finished/ " + task - > host_id_str ) ;
if ( ! server_startup & & ! task - > was_executed & & already_processed )
2017-04-17 17:04:31 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Server expects that DDL task {} should be processed, but it was already processed according to ZK " ,
entry_name ) ;
2017-04-17 17:04:31 +00:00
}
2017-07-26 19:31:32 +00:00
if ( ! already_processed )
2017-04-17 17:04:31 +00:00
{
2020-09-02 22:35:47 +00:00
worker_pool . scheduleOrThrowOnError ( [ this , task_ptr = task . release ( ) ] ( )
2017-04-17 17:04:31 +00:00
{
2020-09-17 18:07:14 +00:00
setThreadName ( " DDLWorkerExec " ) ;
2020-09-02 22:35:47 +00:00
enqueueTask ( DDLTaskPtr ( task_ptr ) ) ;
} ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-27 15:19:11 +00:00
else
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Task {} ({}) has been already processed " , entry_name , task - > entry . query ) ;
2017-07-28 16:14:49 +00:00
}
2020-09-02 22:35:47 +00:00
saveTask ( entry_name ) ;
2017-08-01 14:41:00 +00:00
if ( stop_flag )
break ;
2017-07-28 16:14:49 +00:00
}
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : saveTask ( const String & entry_name )
{
if ( last_tasks . size ( ) = = pool_size )
{
last_tasks . erase ( last_tasks . begin ( ) ) ;
}
last_tasks . emplace_back ( entry_name ) ;
}
2017-07-28 16:14:49 +00:00
/// Parses query and resolves cluster and host in cluster
void DDLWorker : : parseQueryAndResolveHost ( DDLTask & task )
{
{
const char * begin = task . entry . query . data ( ) ;
const char * end = begin + task . entry . query . size ( ) ;
ParserQuery parser_query ( end ) ;
String description ;
2020-04-15 20:28:05 +00:00
task . query = parseQuery ( parser_query , begin , end , description , 0 , context . getSettingsRef ( ) . max_parser_depth ) ;
2017-07-28 16:14:49 +00:00
}
2019-03-11 12:49:39 +00:00
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
2017-07-28 16:14:49 +00:00
if ( ! task . query | | ! ( task . query_on_cluster = dynamic_cast < ASTQueryWithOnCluster * > ( task . query . get ( ) ) ) )
2018-09-28 14:53:20 +00:00
throw Exception ( " Received unknown DDL query " , ErrorCodes : : UNKNOWN_TYPE_OF_QUERY ) ;
2017-07-28 16:14:49 +00:00
task . cluster_name = task . query_on_cluster - > cluster ;
task . cluster = context . tryGetCluster ( task . cluster_name ) ;
if ( ! task . cluster )
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" DDL task {} contains current host {} in cluster {}, but there are no such cluster here. " ,
task . entry_name , task . host_id . readableString ( ) , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
2017-08-02 20:33:29 +00:00
/// Try to find host from task host list in cluster
2017-07-28 16:14:49 +00:00
/// At the first, try find exact match (host name and ports should be literally equal)
/// If the attempt fails, try find it resolving host name of each instance
2017-08-12 20:00:00 +00:00
const auto & shards = task . cluster - > getShardsAddresses ( ) ;
2017-07-28 16:14:49 +00:00
bool found_exact_match = false ;
2020-06-08 06:25:01 +00:00
String default_database ;
2017-07-28 16:14:49 +00:00
for ( size_t shard_num = 0 ; shard_num < shards . size ( ) ; + + shard_num )
2017-08-11 19:56:32 +00:00
{
2017-07-28 16:14:49 +00:00
for ( size_t replica_num = 0 ; replica_num < shards [ shard_num ] . size ( ) ; + + replica_num )
{
const Cluster : : Address & address = shards [ shard_num ] [ replica_num ] ;
if ( address . host_name = = task . host_id . host_name & & address . port = = task . host_id . port )
{
if ( found_exact_match )
{
2020-06-09 03:13:27 +00:00
if ( default_database = = address . default_database )
2020-06-08 06:25:01 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" There are two exactly the same ClickHouse instances {} in cluster {} " ,
address . readableString ( ) , task . cluster_name ) ;
2020-06-09 03:13:27 +00:00
}
2020-06-17 08:44:52 +00:00
else
2020-06-08 06:25:01 +00:00
{
2020-06-17 08:44:52 +00:00
/* Circular replication is used.
* It is when every physical node contains
* replicas of different shards of the same table .
* To distinguish one replica from another on the same node ,
* every shard is placed into separate database .
* */
2020-06-08 06:25:01 +00:00
is_circular_replicated = true ;
2020-06-16 18:07:41 +00:00
auto * query_with_table = dynamic_cast < ASTQueryWithTableAndOutput * > ( task . query . get ( ) ) ;
2020-06-17 08:44:52 +00:00
if ( ! query_with_table | | query_with_table - > database . empty ( ) )
2020-06-08 06:25:01 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" For a distributed DDL on circular replicated cluster its table name must be qualified by database name. " ) ;
2020-06-08 06:25:01 +00:00
}
2020-06-09 03:13:27 +00:00
if ( default_database = = query_with_table - > database )
return ;
2020-06-08 06:25:01 +00:00
}
2017-07-28 16:14:49 +00:00
}
found_exact_match = true ;
task . host_shard_num = shard_num ;
task . host_replica_num = replica_num ;
task . address_in_cluster = address ;
2020-06-08 06:25:01 +00:00
default_database = address . default_database ;
2017-07-28 16:14:49 +00:00
}
}
2017-08-11 19:56:32 +00:00
}
2017-07-28 16:14:49 +00:00
if ( found_exact_match )
return ;
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Not found the exact match of host {} from task {} in cluster {} definition. Will try to find it using host name resolving. " , task . host_id . readableString ( ) , task . entry_name , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
bool found_via_resolving = false ;
for ( size_t shard_num = 0 ; shard_num < shards . size ( ) ; + + shard_num )
2017-08-11 20:20:15 +00:00
{
2017-07-28 16:14:49 +00:00
for ( size_t replica_num = 0 ; replica_num < shards [ shard_num ] . size ( ) ; + + replica_num )
{
const Cluster : : Address & address = shards [ shard_num ] [ replica_num ] ;
2019-07-08 01:43:41 +00:00
if ( auto resolved = address . getResolvedAddress ( ) ;
resolved & & ( isLocalAddress ( * resolved , context . getTCPPort ( ) )
| | ( context . getTCPPortSecure ( ) & & isLocalAddress ( * resolved , * context . getTCPPortSecure ( ) ) ) ) )
2017-07-28 16:14:49 +00:00
{
if ( found_via_resolving )
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" There are two the same ClickHouse instances in cluster {} : {} and {} " ,
task . cluster_name , task . address_in_cluster . readableString ( ) , address . readableString ( ) ) ;
2017-07-28 16:14:49 +00:00
}
else
{
found_via_resolving = true ;
task . host_shard_num = shard_num ;
task . host_replica_num = replica_num ;
task . address_in_cluster = address ;
}
}
2017-04-27 15:19:11 +00:00
}
2017-08-11 20:20:15 +00:00
}
2017-04-17 17:04:31 +00:00
2017-07-28 16:14:49 +00:00
if ( ! found_via_resolving )
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" Not found host {} in definition of cluster {} " ,
task . host_id . readableString ( ) , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
}
else
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Resolved host {} from task {} as host {} in definition of cluster {} " , task . host_id . readableString ( ) , task . entry_name , task . address_in_cluster . readableString ( ) , task . cluster_name ) ;
2017-04-13 16:12:56 +00:00
}
2017-04-13 13:42:29 +00:00
}
2017-04-25 15:21:03 +00:00
2017-07-27 18:44:55 +00:00
bool DDLWorker : : tryExecuteQuery ( const String & query , const DDLTask & task , ExecutionStatus & status )
2017-04-25 15:21:03 +00:00
{
2017-07-28 16:14:49 +00:00
/// Add special comment at the start of query to easily identify DDL-produced queries in query_log
2017-08-11 20:20:15 +00:00
String query_prefix = " /* ddl_entry= " + task . entry_name + " */ " ;
2017-07-27 18:44:55 +00:00
String query_to_execute = query_prefix + query ;
ReadBufferFromString istr ( query_to_execute ) ;
String dummy_string ;
WriteBufferFromString ostr ( dummy_string ) ;
2017-04-25 15:21:03 +00:00
try
{
2020-09-02 22:35:47 +00:00
auto current_context = std : : make_unique < Context > ( context ) ;
2019-04-12 11:06:05 +00:00
current_context - > getClientInfo ( ) . query_kind = ClientInfo : : QueryKind : : SECONDARY_QUERY ;
2018-08-27 11:03:22 +00:00
current_context - > setCurrentQueryId ( " " ) ; // generate random query_id
2020-03-03 15:32:41 +00:00
executeQuery ( istr , ostr , false , * current_context , { } ) ;
2017-04-25 15:21:03 +00:00
}
catch ( . . . )
{
2017-04-27 15:19:11 +00:00
status = ExecutionStatus : : fromCurrentException ( ) ;
2017-07-27 18:44:55 +00:00
tryLogCurrentException ( log , " Query " + query + " wasn't finished successfully " ) ;
2017-04-25 15:21:03 +00:00
return false ;
}
2017-04-27 15:19:11 +00:00
status = ExecutionStatus ( 0 ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executed query: {} " , query ) ;
2017-04-25 15:21:03 +00:00
return true ;
}
2018-09-28 13:44:39 +00:00
void DDLWorker : : attachToThreadGroup ( )
{
if ( thread_group )
{
/// Put all threads to one thread pool
CurrentThread : : attachToIfDetached ( thread_group ) ;
}
else
{
CurrentThread : : initializeQuery ( ) ;
thread_group = CurrentThread : : getGroup ( ) ;
}
}
2017-04-25 15:21:03 +00:00
2020-09-02 22:35:47 +00:00
void DDLWorker : : enqueueTask ( DDLTaskPtr task_ptr )
{
auto & task = * task_ptr ;
while ( ! stop_flag )
{
try
{
processTask ( task ) ;
return ;
}
catch ( const Coordination : : Exception & e )
{
if ( Coordination : : isHardwareError ( e . code ) )
{
recoverZooKeeper ( ) ;
}
else if ( e . code = = Coordination : : Error : : ZNONODE )
{
LOG_ERROR ( log , " ZooKeeper error: {} " , getCurrentExceptionMessage ( true ) ) ;
// TODO: retry?
}
else
{
LOG_ERROR ( log , " Unexpected ZooKeeper error: {}. " , getCurrentExceptionMessage ( true ) ) ;
return ;
}
}
catch ( . . . )
{
LOG_WARNING ( log , " An error occurred while processing task {} ({}) : {} " , task . entry_name , task . entry . query , getCurrentExceptionMessage ( true ) ) ;
}
}
}
void DDLWorker : : processTask ( DDLTask & task )
2017-04-25 15:21:03 +00:00
{
2020-09-02 22:35:47 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Processing task {} ({}) " , task . entry_name , task . entry . query ) ;
2017-04-25 15:21:03 +00:00
2017-08-02 14:42:35 +00:00
String dummy ;
2017-07-28 16:14:49 +00:00
String active_node_path = task . entry_path + " /active/ " + task . host_id_str ;
2017-08-02 14:42:35 +00:00
String finished_node_path = task . entry_path + " /finished/ " + task . host_id_str ;
2018-03-24 00:45:04 +00:00
auto code = zookeeper - > tryCreate ( active_node_path , " " , zkutil : : CreateMode : : Ephemeral , dummy ) ;
2018-11-06 14:42:30 +00:00
2020-06-12 15:09:12 +00:00
if ( code = = Coordination : : Error : : ZOK | | code = = Coordination : : Error : : ZNODEEXISTS )
2017-08-02 14:42:35 +00:00
{
// Ok
}
2020-06-12 15:09:12 +00:00
else if ( code = = Coordination : : Error : : ZNONODE )
2017-08-02 14:42:35 +00:00
{
/// There is no parent
2019-02-22 16:14:48 +00:00
createStatusDirs ( task . entry_path , zookeeper ) ;
2020-06-12 15:09:12 +00:00
if ( Coordination : : Error : : ZOK ! = zookeeper - > tryCreate ( active_node_path , " " , zkutil : : CreateMode : : Ephemeral , dummy ) )
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code , active_node_path ) ;
2017-08-02 14:42:35 +00:00
}
else
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code , active_node_path ) ;
2017-04-25 15:21:03 +00:00
2017-07-28 16:14:49 +00:00
if ( ! task . was_executed )
2017-04-27 15:19:11 +00:00
{
try
{
2020-06-08 06:25:01 +00:00
is_circular_replicated = false ;
2017-07-28 16:14:49 +00:00
parseQueryAndResolveHost ( task ) ;
ASTPtr rewritten_ast = task . query_on_cluster - > getRewrittenASTWithoutOnCluster ( task . address_in_cluster . default_database ) ;
2017-04-27 15:19:11 +00:00
String rewritten_query = queryToString ( rewritten_ast ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executing query: {} " , rewritten_query ) ;
2017-04-25 15:21:03 +00:00
2020-04-22 06:01:33 +00:00
if ( auto * query_with_table = dynamic_cast < ASTQueryWithTableAndOutput * > ( rewritten_ast . get ( ) ) ; query_with_table )
2017-04-27 15:19:11 +00:00
{
2020-01-09 16:01:44 +00:00
StoragePtr storage ;
if ( ! query_with_table - > table . empty ( ) )
{
/// It's not CREATE DATABASE
2020-04-07 14:05:51 +00:00
auto table_id = context . tryResolveStorageID ( * query_with_table , Context : : ResolveOrdinary ) ;
2020-05-28 23:01:18 +00:00
storage = DatabaseCatalog : : instance ( ) . tryGetTable ( table_id , context ) ;
2020-01-09 16:01:44 +00:00
}
2019-06-26 14:52:20 +00:00
/// For some reason we check consistency of cluster definition only
/// in case of ALTER query, but not in case of CREATE/DROP etc.
/// It's strange, but this behaviour exits for a long and we cannot change it.
if ( storage & & query_with_table - > as < ASTAlterQuery > ( ) )
checkShardConfig ( query_with_table - > table , task , storage ) ;
2020-06-08 06:25:01 +00:00
if ( storage & & taskShouldBeExecutedOnLeader ( rewritten_ast , storage ) & & ! is_circular_replicated )
2019-06-26 14:52:20 +00:00
tryExecuteQueryOnLeaderReplica ( task , storage , rewritten_query , task . entry_path , zookeeper ) ;
else
tryExecuteQuery ( rewritten_query , task , task . execution_status ) ;
2017-04-27 15:19:11 +00:00
}
else
2017-07-28 16:14:49 +00:00
tryExecuteQuery ( rewritten_query , task , task . execution_status ) ;
2017-04-27 15:19:11 +00:00
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & )
2017-04-27 15:19:11 +00:00
{
throw ;
}
catch ( . . . )
{
2019-12-19 19:39:49 +00:00
tryLogCurrentException ( log , " An error occurred before execution of DDL task: " ) ;
task . execution_status = ExecutionStatus : : fromCurrentException ( " An error occurred before execution " ) ;
2017-04-27 15:19:11 +00:00
}
2020-08-08 01:01:47 +00:00
/// We need to distinguish ZK errors occurred before and after query executing
2017-07-28 16:14:49 +00:00
task . was_executed = true ;
2017-04-25 15:21:03 +00:00
}
2017-07-28 16:14:49 +00:00
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
2017-04-27 15:19:11 +00:00
/// Delete active flag and create finish flag
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
ops . emplace_back ( zkutil : : makeRemoveRequest ( active_node_path , - 1 ) ) ;
ops . emplace_back ( zkutil : : makeCreateRequest ( finished_node_path , task . execution_status . serializeText ( ) , zkutil : : CreateMode : : Persistent ) ) ;
2017-08-02 14:42:35 +00:00
zookeeper - > multi ( ops ) ;
2017-04-27 15:19:11 +00:00
}
2017-04-25 15:21:03 +00:00
2020-03-18 00:57:00 +00:00
bool DDLWorker : : taskShouldBeExecutedOnLeader ( const ASTPtr ast_ddl , const StoragePtr storage )
2017-04-27 15:19:11 +00:00
{
2019-06-26 14:52:20 +00:00
/// Pure DROP queries have to be executed on each node separately
2020-04-22 06:01:33 +00:00
if ( auto * query = ast_ddl - > as < ASTDropQuery > ( ) ; query & & query - > kind ! = ASTDropQuery : : Kind : : Truncate )
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
if ( ! ast_ddl - > as < ASTAlterQuery > ( ) & & ! ast_ddl - > as < ASTOptimizeQuery > ( ) & & ! ast_ddl - > as < ASTDropQuery > ( ) )
return false ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
return storage - > supportsReplication ( ) ;
}
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
void DDLWorker : : checkShardConfig ( const String & table , const DDLTask & task , StoragePtr storage ) const
{
2017-07-26 19:31:32 +00:00
const auto & shard_info = task . cluster - > getShardsInfo ( ) . at ( task . host_shard_num ) ;
2017-04-27 15:19:11 +00:00
bool config_is_replicated_shard = shard_info . hasInternalReplication ( ) ;
2017-04-25 15:21:03 +00:00
2020-03-12 14:36:54 +00:00
if ( dynamic_cast < const StorageDistributed * > ( storage . get ( ) ) )
2020-03-12 06:43:01 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Table {} is distributed, skip checking config. " , backQuote ( table ) ) ;
2020-03-12 06:43:01 +00:00
return ;
}
2019-06-26 14:52:20 +00:00
if ( storage - > supportsReplication ( ) & & ! config_is_replicated_shard )
2017-04-27 15:19:11 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" Table {} is replicated, but shard #{} isn't replicated according to its cluster definition. "
" Possibly <internal_replication>true</internal_replication> is forgotten in the cluster config. " ,
backQuote ( table ) , task . host_shard_num + 1 ) ;
2017-04-27 15:19:11 +00:00
}
2019-06-26 14:52:20 +00:00
if ( ! storage - > supportsReplication ( ) & & config_is_replicated_shard )
2017-04-27 15:19:11 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" Table {} isn't replicated, but shard #{} is replicated according to its cluster definition " ,
backQuote ( table ) , task . host_shard_num + 1 ) ;
2017-04-27 15:19:11 +00:00
}
2019-06-26 14:52:20 +00:00
}
bool DDLWorker : : tryExecuteQueryOnLeaderReplica (
DDLTask & task ,
StoragePtr storage ,
const String & rewritten_query ,
const String & node_path ,
const ZooKeeperPtr & zookeeper )
{
StorageReplicatedMergeTree * replicated_storage = dynamic_cast < StorageReplicatedMergeTree * > ( storage . get ( ) ) ;
/// If we will develop new replicated storage
if ( ! replicated_storage )
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Storage type '{}' is not supported by distributed DDL " , storage - > getName ( ) ) ;
2017-04-25 15:21:03 +00:00
2018-04-19 19:25:54 +00:00
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'replica_config_host_name:replica_port'
auto get_shard_name = [ ] ( const Cluster : : Addresses & shard_addresses )
2017-04-27 15:19:11 +00:00
{
Strings replica_names ;
2018-04-19 19:25:54 +00:00
for ( const Cluster : : Address & address : shard_addresses )
replica_names . emplace_back ( address . readableString ( ) ) ;
2017-04-27 15:19:11 +00:00
std : : sort ( replica_names . begin ( ) , replica_names . end ( ) ) ;
2017-04-25 15:21:03 +00:00
2018-04-19 19:25:54 +00:00
String res ;
2017-04-27 15:19:11 +00:00
for ( auto it = replica_names . begin ( ) ; it ! = replica_names . end ( ) ; + + it )
2018-04-19 19:25:54 +00:00
res + = * it + ( std : : next ( it ) ! = replica_names . end ( ) ? " , " : " " ) ;
return res ;
} ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
String shard_node_name = get_shard_name ( task . cluster - > getShardsAddresses ( ) . at ( task . host_shard_num ) ) ;
String shard_path = node_path + " /shards/ " + shard_node_name ;
String is_executed_path = shard_path + " /executed " ;
2020-08-07 09:18:34 +00:00
String tries_to_execute_path = shard_path + " /tries_to_execute " ;
2019-06-26 14:52:20 +00:00
zookeeper - > createAncestors ( shard_path + " / " ) ;
2017-04-25 15:21:03 +00:00
2020-08-07 09:18:34 +00:00
/// Node exists, or we will create or we will get an exception
zookeeper - > tryCreate ( tries_to_execute_path , " 0 " , zkutil : : CreateMode : : Persistent ) ;
2018-12-21 17:28:21 +00:00
2020-08-07 09:18:34 +00:00
static constexpr int MAX_TRIES_TO_EXECUTE = 3 ;
String executed_by ;
zkutil : : EventPtr event = std : : make_shared < Poco : : Event > ( ) ;
2020-08-24 09:07:37 +00:00
if ( zookeeper - > tryGet ( is_executed_path , executed_by ) )
2020-08-07 09:18:34 +00:00
{
2020-08-24 09:07:37 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , executed_by ) ;
2020-08-07 09:18:34 +00:00
return true ;
}
2018-12-21 17:28:21 +00:00
2019-06-26 14:52:20 +00:00
pcg64 rng ( randomSeed ( ) ) ;
2018-12-21 17:28:21 +00:00
2019-06-26 14:52:20 +00:00
auto lock = createSimpleZooKeeperLock ( zookeeper , shard_path , " lock " , task . host_id_str ) ;
2020-08-07 09:18:34 +00:00
2019-06-26 14:52:20 +00:00
bool executed_by_leader = false ;
2020-08-07 09:18:34 +00:00
while ( true )
2019-06-26 14:52:20 +00:00
{
StorageReplicatedMergeTree : : Status status ;
replicated_storage - > getStatus ( status ) ;
2017-04-25 15:21:03 +00:00
2020-08-07 09:18:34 +00:00
/// Any replica which is leader tries to take lock
2019-06-26 14:52:20 +00:00
if ( status . is_leader & & lock - > tryLock ( ) )
{
2020-08-24 09:07:37 +00:00
/// In replicated merge tree we can have multiple leaders. So we can
/// be "leader", but another "leader" replica may already execute
/// this task.
if ( zookeeper - > tryGet ( is_executed_path , executed_by ) )
{
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , executed_by ) ;
executed_by_leader = true ;
break ;
}
2020-08-07 09:18:34 +00:00
/// Doing it exclusively
size_t counter = parse < int > ( zookeeper - > get ( tries_to_execute_path ) ) ;
if ( counter > MAX_TRIES_TO_EXECUTE )
2019-06-26 14:52:20 +00:00
break ;
2020-08-24 09:07:37 +00:00
2020-08-07 09:18:34 +00:00
zookeeper - > set ( tries_to_execute_path , toString ( counter + 1 ) ) ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
/// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock
if ( tryExecuteQuery ( rewritten_query , task , task . execution_status ) )
{
zookeeper - > create ( is_executed_path , task . host_id_str , zkutil : : CreateMode : : Persistent ) ;
executed_by_leader = true ;
break ;
2017-04-25 15:21:03 +00:00
}
2020-08-07 09:18:34 +00:00
lock - > unlock ( ) ;
2018-04-19 19:25:54 +00:00
}
2019-06-26 14:52:20 +00:00
2020-08-07 09:18:34 +00:00
if ( event - > tryWait ( std : : uniform_int_distribution < int > ( 0 , 1000 ) ( rng ) ) )
{
executed_by_leader = true ;
break ;
}
else if ( parse < int > ( zookeeper - > get ( tries_to_execute_path ) ) > MAX_TRIES_TO_EXECUTE )
{
/// Nobody will try to execute query again
break ;
}
2017-04-25 15:21:03 +00:00
}
2019-06-26 14:52:20 +00:00
/// Not executed by leader so was not executed at all
if ( ! executed_by_leader )
2017-04-25 15:21:03 +00:00
{
2020-08-07 09:18:34 +00:00
task . execution_status = ExecutionStatus ( ErrorCodes : : NOT_IMPLEMENTED , " Cannot execute replicated DDL query " ) ;
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
}
2020-08-07 09:18:34 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2019-06-26 14:52:20 +00:00
return true ;
2017-04-25 15:21:03 +00:00
}
2019-02-22 18:36:36 +00:00
void DDLWorker : : cleanupQueue ( Int64 current_time_seconds , const ZooKeeperPtr & zookeeper )
2017-04-19 14:21:27 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Cleaning queue " ) ;
2017-04-27 15:19:11 +00:00
2017-08-02 14:42:35 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir ) ;
filterAndSortQueueNodes ( queue_nodes ) ;
size_t num_outdated_nodes = ( queue_nodes . size ( ) > max_tasks_in_queue ) ? queue_nodes . size ( ) - max_tasks_in_queue : 0 ;
auto first_non_outdated_node = queue_nodes . begin ( ) + num_outdated_nodes ;
2017-04-19 14:21:27 +00:00
2017-08-02 14:42:35 +00:00
for ( auto it = queue_nodes . cbegin ( ) ; it < queue_nodes . cend ( ) ; + + it )
2017-04-19 14:21:27 +00:00
{
2019-02-22 18:36:36 +00:00
if ( stop_flag )
return ;
2017-08-02 14:42:35 +00:00
String node_name = * it ;
2017-08-01 14:41:00 +00:00
String node_path = queue_dir + " / " + node_name ;
2017-08-02 14:42:35 +00:00
String lock_path = node_path + " /lock " ;
2017-08-01 14:41:00 +00:00
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2017-08-02 14:42:35 +00:00
String dummy ;
2017-08-01 14:41:00 +00:00
2017-04-19 14:21:27 +00:00
try
{
2017-08-12 17:39:14 +00:00
/// Already deleted
if ( ! zookeeper - > exists ( node_path , & stat ) )
continue ;
2020-01-11 09:50:41 +00:00
/// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
2019-02-22 18:36:36 +00:00
constexpr UInt64 zookeeper_time_resolution = 1000 ;
2017-08-13 09:27:38 +00:00
Int64 zookeeper_time_seconds = stat . ctime / zookeeper_time_resolution ;
2017-08-12 17:39:14 +00:00
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds ;
/// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
bool node_is_outside_max_window = it < first_non_outdated_node ;
if ( ! node_lifetime_is_expired & & ! node_is_outside_max_window )
continue ;
/// Skip if there are active nodes (it is weak guard)
if ( zookeeper - > exists ( node_path + " /active " , & stat ) & & stat . numChildren > 0 )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} should be deleted, but there are active workers. Skipping it. " , node_name ) ;
2017-08-12 17:39:14 +00:00
continue ;
}
2017-08-10 19:12:52 +00:00
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
2017-08-02 14:42:35 +00:00
auto lock = createSimpleZooKeeperLock ( zookeeper , node_path , " lock " , host_fqdn_id ) ;
if ( ! lock - > tryLock ( ) )
2017-08-12 17:39:14 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} should be deleted, but it is locked. Skipping it. " , node_name ) ;
2017-08-02 14:42:35 +00:00
continue ;
2017-08-12 17:39:14 +00:00
}
2017-08-02 14:42:35 +00:00
2017-08-12 17:39:14 +00:00
if ( node_lifetime_is_expired )
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Lifetime of task {} is expired, deleting it " , node_name ) ;
2017-08-12 17:39:14 +00:00
else if ( node_is_outside_max_window )
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} is outdated, deleting it " , node_name ) ;
2017-08-12 17:39:14 +00:00
/// Deleting
2017-08-01 14:41:00 +00:00
{
2020-08-08 01:01:47 +00:00
Strings children = zookeeper - > getChildren ( node_path ) ;
for ( const String & child : children )
2017-08-01 14:41:00 +00:00
{
2017-08-02 14:42:35 +00:00
if ( child ! = " lock " )
zookeeper - > tryRemoveRecursive ( node_path + " / " + child ) ;
2017-08-01 14:41:00 +00:00
}
2017-04-19 14:21:27 +00:00
2017-08-02 14:42:35 +00:00
/// Remove the lock node and its parent atomically
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
ops . emplace_back ( zkutil : : makeRemoveRequest ( lock_path , - 1 ) ) ;
ops . emplace_back ( zkutil : : makeRemoveRequest ( node_path , - 1 ) ) ;
2017-08-02 14:42:35 +00:00
zookeeper - > multi ( ops ) ;
2017-04-19 14:21:27 +00:00
}
}
catch ( . . . )
{
2020-08-08 01:01:47 +00:00
LOG_INFO ( log , " An error occurred while checking and cleaning task {} from queue: {} " , node_name , getCurrentExceptionMessage ( false ) ) ;
2017-04-19 14:21:27 +00:00
}
}
}
2017-04-13 16:12:56 +00:00
2017-08-11 20:20:15 +00:00
/// Try to create nonexisting "status" dirs for a node
2019-02-22 16:14:48 +00:00
void DDLWorker : : createStatusDirs ( const std : : string & node_path , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : CreateRequest request ;
2018-03-24 00:45:04 +00:00
request . path = node_path + " /active " ;
2018-08-25 01:58:14 +00:00
ops . emplace_back ( std : : make_shared < Coordination : : CreateRequest > ( std : : move ( request ) ) ) ;
2018-03-24 00:45:04 +00:00
}
{
2018-08-25 01:58:14 +00:00
Coordination : : CreateRequest request ;
2018-03-24 00:45:04 +00:00
request . path = node_path + " /finished " ;
2018-08-25 01:58:14 +00:00
ops . emplace_back ( std : : make_shared < Coordination : : CreateRequest > ( std : : move ( request ) ) ) ;
2018-03-24 00:45:04 +00:00
}
2018-08-25 01:58:14 +00:00
Coordination : : Responses responses ;
2020-06-12 15:09:12 +00:00
Coordination : : Error code = zookeeper - > tryMulti ( ops , responses ) ;
if ( code ! = Coordination : : Error : : ZOK
& & code ! = Coordination : : Error : : ZNODEEXISTS )
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-13 13:42:29 +00:00
2017-04-18 15:44:31 +00:00
String DDLWorker : : enqueueQuery ( DDLLogEntry & entry )
2017-04-13 16:12:56 +00:00
{
2017-04-17 17:04:31 +00:00
if ( entry . hosts . empty ( ) )
2017-08-02 14:42:35 +00:00
throw Exception ( " Empty host list in a distributed DDL task " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-13 16:12:56 +00:00
2019-02-22 16:14:48 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
2017-04-27 15:19:11 +00:00
String query_path_prefix = queue_dir + " /query- " ;
2017-04-17 17:04:31 +00:00
zookeeper - > createAncestors ( query_path_prefix ) ;
2017-04-13 16:12:56 +00:00
2018-03-21 21:40:53 +00:00
String node_path = zookeeper - > create ( query_path_prefix , entry . toString ( ) , zkutil : : CreateMode : : PersistentSequential ) ;
2017-08-02 14:42:35 +00:00
/// Optional step
try
{
2019-02-22 16:14:48 +00:00
createStatusDirs ( node_path , zookeeper ) ;
2017-08-02 14:42:35 +00:00
}
catch ( . . . )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " An error occurred while creating auxiliary ZooKeeper directories in {} . They will be created later. Error : {} " , node_path , getCurrentExceptionMessage ( true ) ) ;
2017-08-02 14:42:35 +00:00
}
2017-04-18 15:44:31 +00:00
return node_path ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
2019-02-22 18:36:36 +00:00
void DDLWorker : : runMainThread ( )
2017-04-13 13:42:29 +00:00
{
2017-04-27 15:19:11 +00:00
setThreadName ( " DDLWorker " ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Started DDLWorker thread " ) ;
2017-04-18 15:44:31 +00:00
2017-08-01 14:41:00 +00:00
bool initialized = false ;
do
2017-07-31 18:57:13 +00:00
{
2017-08-01 14:41:00 +00:00
try
{
2019-12-27 19:30:13 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
zookeeper - > createAncestors ( queue_dir + " / " ) ;
initialized = true ;
}
catch ( const Coordination : : Exception & e )
{
if ( ! Coordination : : isHardwareError ( e . code ) )
throw ; /// A logical error.
2018-11-10 17:52:25 +00:00
2019-12-27 19:30:13 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
2018-11-10 17:52:25 +00:00
2019-12-27 19:30:13 +00:00
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds ( 1 ) ;
2017-08-01 14:41:00 +00:00
}
catch ( . . . )
{
2017-08-14 05:51:03 +00:00
tryLogCurrentException ( log , " Terminating. Cannot initialize DDL queue. " ) ;
return ;
2017-08-01 14:41:00 +00:00
}
2019-02-22 18:36:36 +00:00
}
while ( ! initialized & & ! stop_flag ) ;
2017-08-01 14:41:00 +00:00
2017-04-13 13:42:29 +00:00
while ( ! stop_flag )
{
try
{
2018-09-28 15:30:03 +00:00
attachToThreadGroup ( ) ;
2019-02-22 18:36:36 +00:00
cleanup_event - > set ( ) ;
2020-09-02 22:35:47 +00:00
scheduleTasks ( ) ;
2017-04-13 13:42:29 +00:00
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Waiting a watch " ) ;
2019-02-22 18:36:36 +00:00
queue_updated_event - > wait ( ) ;
2017-04-19 14:21:27 +00:00
}
2018-09-24 18:44:09 +00:00
catch ( const Coordination : : Exception & e )
2017-04-27 15:19:11 +00:00
{
2018-08-25 01:58:14 +00:00
if ( Coordination : : isHardwareError ( e . code ) )
2017-07-28 16:14:49 +00:00
{
2020-09-02 22:35:47 +00:00
recoverZooKeeper ( ) ;
2017-07-28 16:14:49 +00:00
}
2020-06-12 15:09:12 +00:00
else if ( e . code = = Coordination : : Error : : ZNONODE )
2019-05-30 17:25:29 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " ZooKeeper error: {} " , getCurrentExceptionMessage ( true ) ) ;
2019-05-30 17:25:29 +00:00
}
2017-07-28 16:14:49 +00:00
else
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " Unexpected ZooKeeper error: {}. Terminating. " , getCurrentExceptionMessage ( true ) ) ;
2017-08-14 05:51:03 +00:00
return ;
2017-07-28 16:14:49 +00:00
}
2017-04-27 15:19:11 +00:00
}
2017-04-19 14:21:27 +00:00
catch ( . . . )
{
2018-11-06 14:42:30 +00:00
tryLogCurrentException ( log , " Unexpected error, will terminate: " ) ;
2017-08-14 05:51:03 +00:00
return ;
2017-04-19 14:21:27 +00:00
}
2017-04-13 13:42:29 +00:00
}
}
2017-04-13 16:12:56 +00:00
2019-02-22 18:36:36 +00:00
void DDLWorker : : runCleanupThread ( )
{
setThreadName ( " DDLWorkerClnr " ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Started DDLWorker cleanup thread " ) ;
2019-02-22 18:36:36 +00:00
Int64 last_cleanup_time_seconds = 0 ;
while ( ! stop_flag )
{
try
{
cleanup_event - > wait ( ) ;
if ( stop_flag )
break ;
Int64 current_time_seconds = Poco : : Timestamp ( ) . epochTime ( ) ;
if ( last_cleanup_time_seconds & & current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period )
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Too early to clean queue, will do it later. " ) ;
2019-02-22 18:36:36 +00:00
continue ;
}
auto zookeeper = tryGetZooKeeper ( ) ;
if ( zookeeper - > expired ( ) )
continue ;
cleanupQueue ( current_time_seconds , zookeeper ) ;
last_cleanup_time_seconds = current_time_seconds ;
}
catch ( . . . )
{
tryLogCurrentException ( log , __PRETTY_FUNCTION__ ) ;
}
}
}
2019-02-28 07:27:30 +00:00
class DDLQueryStatusInputStream : public IBlockInputStream
2017-04-13 16:12:56 +00:00
{
2017-04-18 15:44:31 +00:00
public :
2017-04-19 14:21:27 +00:00
2019-08-03 11:02:40 +00:00
DDLQueryStatusInputStream ( const String & zk_node_path , const DDLLogEntry & entry , const Context & context_ )
2020-05-30 21:57:37 +00:00
: node_path ( zk_node_path ) , context ( context_ ) , watch ( CLOCK_MONOTONIC_COARSE ) , log ( & Poco : : Logger : : get ( " DDLQueryStatusInputStream " ) )
2017-04-18 15:44:31 +00:00
{
sample = Block {
{ std : : make_shared < DataTypeString > ( ) , " host " } ,
2017-07-27 11:30:27 +00:00
{ std : : make_shared < DataTypeUInt16 > ( ) , " port " } ,
2017-07-27 13:11:16 +00:00
{ std : : make_shared < DataTypeInt64 > ( ) , " status " } ,
2017-04-18 15:44:31 +00:00
{ std : : make_shared < DataTypeString > ( ) , " error " } ,
{ std : : make_shared < DataTypeUInt64 > ( ) , " num_hosts_remaining " } ,
{ std : : make_shared < DataTypeUInt64 > ( ) , " num_hosts_active " } ,
} ;
2017-07-28 16:14:49 +00:00
for ( const HostID & host : entry . hosts )
waiting_hosts . emplace ( host . toString ( ) ) ;
2018-02-23 10:02:29 +00:00
addTotalRowsApprox ( entry . hosts . size ( ) ) ;
2017-08-02 21:37:04 +00:00
timeout_seconds = context . getSettingsRef ( ) . distributed_ddl_task_timeout ;
2017-04-18 15:44:31 +00:00
}
String getName ( ) const override
{
2019-02-28 07:27:30 +00:00
return " DDLQueryStatusInputStream " ;
2017-04-18 15:44:31 +00:00
}
2018-06-03 20:39:06 +00:00
Block getHeader ( ) const override { return sample ; }
2018-01-06 18:10:44 +00:00
2017-04-18 15:44:31 +00:00
Block readImpl ( ) override
{
Block res ;
2017-07-27 13:11:16 +00:00
if ( num_hosts_finished > = waiting_hosts . size ( ) )
2018-04-17 19:33:58 +00:00
{
if ( first_exception )
throw Exception ( * first_exception ) ;
2017-04-18 15:44:31 +00:00
return res ;
2018-04-17 19:33:58 +00:00
}
2017-04-18 15:44:31 +00:00
auto zookeeper = context . getZooKeeper ( ) ;
size_t try_number = 0 ;
2018-04-17 19:33:58 +00:00
while ( res . rows ( ) = = 0 )
2017-04-18 15:44:31 +00:00
{
2018-03-05 21:09:39 +00:00
if ( isCancelled ( ) )
2018-04-17 19:33:58 +00:00
{
if ( first_exception )
throw Exception ( * first_exception ) ;
2017-04-18 15:44:31 +00:00
return res ;
2018-04-17 19:33:58 +00:00
}
2017-04-18 15:44:31 +00:00
2018-02-28 13:23:40 +00:00
if ( timeout_seconds > = 0 & & watch . elapsedSeconds ( ) > timeout_seconds )
2017-08-12 20:00:00 +00:00
{
2018-02-28 13:23:40 +00:00
size_t num_unfinished_hosts = waiting_hosts . size ( ) - num_hosts_finished ;
size_t num_active_hosts = current_active_hosts . size ( ) ;
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : TIMEOUT_EXCEEDED ,
" Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
" There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background " ,
node_path , timeout_seconds , num_unfinished_hosts , num_active_hosts ) ;
2017-08-12 20:00:00 +00:00
}
2017-04-27 15:19:11 +00:00
2017-04-18 15:44:31 +00:00
if ( num_hosts_finished ! = 0 | | try_number ! = 0 )
2018-02-28 13:23:40 +00:00
{
2020-09-02 22:35:47 +00:00
sleepForMilliseconds ( std : : min < size_t > ( 1000 , 50 * ( try_number + 1 ) ) ) ;
2018-02-28 13:23:40 +00:00
}
2017-04-18 15:44:31 +00:00
2017-04-27 15:19:11 +00:00
/// TODO: add shared lock
2017-04-18 15:44:31 +00:00
if ( ! zookeeper - > exists ( node_path ) )
2017-04-21 12:39:28 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : UNFINISHED ,
" Cannot provide query execution status. The query's node {} has been deleted by the cleaner since it was finished (or its lifetime is expired) " ,
node_path ) ;
2017-04-21 12:39:28 +00:00
}
2017-04-18 15:44:31 +00:00
2017-07-27 11:30:27 +00:00
Strings new_hosts = getNewAndUpdate ( getChildrenAllowNoNode ( zookeeper , node_path + " /finished " ) ) ;
2017-04-18 15:44:31 +00:00
+ + try_number ;
if ( new_hosts . empty ( ) )
continue ;
2018-02-28 13:23:40 +00:00
current_active_hosts = getChildrenAllowNoNode ( zookeeper , node_path + " /active " ) ;
2017-04-21 12:39:28 +00:00
2017-12-15 18:23:05 +00:00
MutableColumns columns = sample . cloneEmptyColumns ( ) ;
2017-07-26 19:31:32 +00:00
for ( const String & host_id : new_hosts )
2017-04-18 15:44:31 +00:00
{
2017-07-27 13:11:16 +00:00
ExecutionStatus status ( - 1 , " Cannot obtain error message " ) ;
2017-04-27 15:19:11 +00:00
{
String status_data ;
2017-07-26 19:31:32 +00:00
if ( zookeeper - > tryGet ( node_path + " /finished/ " + host_id , status_data ) )
2017-07-27 13:11:16 +00:00
status . tryDeserializeText ( status_data ) ;
2017-04-27 15:19:11 +00:00
}
2019-01-21 19:45:26 +00:00
auto [ host , port ] = Cluster : : Address : : fromString ( host_id ) ;
2017-07-26 19:31:32 +00:00
2018-04-17 19:33:58 +00:00
if ( status . code ! = 0 & & first_exception = = nullptr )
2020-09-02 22:35:47 +00:00
first_exception = std : : make_unique < Exception > ( status . code , " There was an error on [{}:{}]: {} " , host , port , status . message ) ;
2018-04-17 19:33:58 +00:00
2018-02-28 13:23:40 +00:00
+ + num_hosts_finished ;
2017-12-15 18:23:05 +00:00
columns [ 0 ] - > insert ( host ) ;
2018-10-22 08:54:54 +00:00
columns [ 1 ] - > insert ( port ) ;
columns [ 2 ] - > insert ( status . code ) ;
2017-12-15 18:23:05 +00:00
columns [ 3 ] - > insert ( status . message ) ;
2018-10-22 08:54:54 +00:00
columns [ 4 ] - > insert ( waiting_hosts . size ( ) - num_hosts_finished ) ;
columns [ 5 ] - > insert ( current_active_hosts . size ( ) ) ;
2017-04-18 15:44:31 +00:00
}
2017-12-15 18:23:05 +00:00
res = sample . cloneWithColumns ( std : : move ( columns ) ) ;
2017-04-18 15:44:31 +00:00
}
2017-04-13 16:12:56 +00:00
2017-04-18 15:44:31 +00:00
return res ;
}
2017-07-27 18:44:55 +00:00
Block getSampleBlock ( ) const
{
return sample . cloneEmpty ( ) ;
}
2019-02-28 07:27:30 +00:00
~ DDLQueryStatusInputStream ( ) override = default ;
2017-07-27 18:44:55 +00:00
private :
2017-04-21 12:39:28 +00:00
static Strings getChildrenAllowNoNode ( const std : : shared_ptr < zkutil : : ZooKeeper > & zookeeper , const String & node_path )
{
Strings res ;
2020-06-12 15:09:12 +00:00
Coordination : : Error code = zookeeper - > tryGetChildren ( node_path , res ) ;
if ( code ! = Coordination : : Error : : ZOK & & code ! = Coordination : : Error : : ZNONODE )
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code , node_path ) ;
2017-04-21 12:39:28 +00:00
return res ;
}
2017-07-27 11:30:27 +00:00
Strings getNewAndUpdate ( const Strings & current_list_of_finished_hosts )
2017-04-18 15:44:31 +00:00
{
Strings diff ;
2017-07-27 11:30:27 +00:00
for ( const String & host : current_list_of_finished_hosts )
2017-04-18 15:44:31 +00:00
{
2017-07-27 11:30:27 +00:00
if ( ! waiting_hosts . count ( host ) )
{
if ( ! ignoring_hosts . count ( host ) )
{
ignoring_hosts . emplace ( host ) ;
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Unexpected host {} appeared in task {} " , host , node_path ) ;
2017-07-27 11:30:27 +00:00
}
continue ;
}
if ( ! finished_hosts . count ( host ) )
2017-04-19 14:21:27 +00:00
{
2017-07-27 11:30:27 +00:00
diff . emplace_back ( host ) ;
finished_hosts . emplace ( host ) ;
2017-04-19 14:21:27 +00:00
}
2017-04-18 15:44:31 +00:00
}
return diff ;
}
String node_path ;
2017-09-04 17:49:39 +00:00
const Context & context ;
2017-07-27 13:11:16 +00:00
Stopwatch watch ;
2020-05-30 21:57:37 +00:00
Poco : : Logger * log ;
2017-07-27 13:11:16 +00:00
Block sample ;
2017-04-18 15:44:31 +00:00
2017-07-27 11:30:27 +00:00
NameSet waiting_hosts ; /// hosts from task host list
NameSet finished_hosts ; /// finished hosts from host list
NameSet ignoring_hosts ; /// appeared hosts that are not in hosts list
2018-02-28 13:23:40 +00:00
Strings current_active_hosts ; /// Hosts that were in active state at the last check
2017-04-18 15:44:31 +00:00
size_t num_hosts_finished = 0 ;
2017-08-02 21:37:04 +00:00
2019-01-22 19:56:53 +00:00
/// Save the first detected error and throw it at the end of execution
2018-04-17 19:33:58 +00:00
std : : unique_ptr < Exception > first_exception ;
2017-08-02 21:37:04 +00:00
Int64 timeout_seconds = 120 ;
2017-04-17 17:04:31 +00:00
} ;
2017-04-13 16:12:56 +00:00
2020-06-20 22:44:52 +00:00
BlockIO executeDDLQueryOnCluster ( const ASTPtr & query_ptr_ , const Context & context , AccessRightsElements & & query_requires_access , bool query_requires_grant_option )
2017-04-13 16:12:56 +00:00
{
2018-02-12 18:41:53 +00:00
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
2018-01-22 15:56:30 +00:00
ASTPtr query_ptr = query_ptr_ - > clone ( ) ;
2018-02-12 18:41:53 +00:00
ASTQueryWithOutput : : resetOutputASTIfExist ( * query_ptr ) ;
2017-08-10 19:12:52 +00:00
2019-03-11 12:49:39 +00:00
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
auto * query = dynamic_cast < ASTQueryWithOnCluster * > ( query_ptr . get ( ) ) ;
2017-04-25 15:21:03 +00:00
if ( ! query )
{
2017-08-10 19:12:52 +00:00
throw Exception ( " Distributed execution is not supported for such DDL queries " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2017-04-25 15:21:03 +00:00
}
2017-04-17 17:04:31 +00:00
2018-02-28 13:23:40 +00:00
if ( ! context . getSettingsRef ( ) . allow_distributed_ddl )
throw Exception ( " Distributed DDL queries are prohibited for the user " , ErrorCodes : : QUERY_IS_PROHIBITED ) ;
2019-03-11 13:22:51 +00:00
if ( const auto * query_alter = query_ptr - > as < ASTAlterQuery > ( ) )
2017-04-25 15:21:03 +00:00
{
2018-06-09 15:53:14 +00:00
for ( const auto & command : query_alter - > command_list - > commands )
2017-04-25 15:21:03 +00:00
{
2018-06-09 15:53:14 +00:00
if ( ! isSupportedAlterType ( command - > type ) )
2017-04-25 15:21:03 +00:00
throw Exception ( " Unsupported type of ALTER query " , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
}
2018-03-12 18:16:51 +00:00
query - > cluster = context . getMacros ( ) - > expand ( query - > cluster ) ;
2017-04-25 15:21:03 +00:00
ClusterPtr cluster = context . getCluster ( query - > cluster ) ;
DDLWorker & ddl_worker = context . getDDLWorker ( ) ;
2017-04-21 12:39:28 +00:00
2020-01-24 16:20:36 +00:00
/// Enumerate hosts which will be used to send query.
2017-08-11 15:02:07 +00:00
Cluster : : AddressesWithFailover shards = cluster - > getShardsAddresses ( ) ;
2018-10-25 09:40:30 +00:00
std : : vector < HostID > hosts ;
2017-04-13 16:12:56 +00:00
for ( const auto & shard : shards )
2017-04-25 15:21:03 +00:00
{
2017-04-13 16:12:56 +00:00
for ( const auto & addr : shard )
2018-10-25 09:40:30 +00:00
hosts . emplace_back ( addr ) ;
2020-01-24 16:20:36 +00:00
}
2018-04-17 19:33:58 +00:00
2020-01-24 16:20:36 +00:00
if ( hosts . empty ( ) )
throw Exception ( " No hosts defined to execute distributed DDL query " , ErrorCodes : : LOGICAL_ERROR ) ;
/// The current database in a distributed query need to be replaced with either
/// the local current database or a shard's default database.
bool need_replace_current_database
= ( std : : find_if (
2020-06-20 22:44:52 +00:00
query_requires_access . begin ( ) ,
query_requires_access . end ( ) ,
2020-01-24 16:20:36 +00:00
[ ] ( const AccessRightsElement & elem ) { return elem . isEmptyDatabase ( ) ; } )
2020-06-20 22:44:52 +00:00
! = query_requires_access . end ( ) ) ;
2020-01-24 16:20:36 +00:00
2020-09-09 09:58:59 +00:00
bool use_local_default_database = false ;
const String & current_database = context . getCurrentDatabase ( ) ;
2020-01-24 16:20:36 +00:00
if ( need_replace_current_database )
{
Strings shard_default_databases ;
for ( const auto & shard : shards )
{
for ( const auto & addr : shard )
2018-10-25 09:40:30 +00:00
{
2020-01-24 16:20:36 +00:00
if ( ! addr . default_database . empty ( ) )
shard_default_databases . push_back ( addr . default_database ) ;
2018-10-25 09:40:30 +00:00
else
2020-01-24 16:20:36 +00:00
use_local_default_database = true ;
2018-10-25 09:40:30 +00:00
}
2018-04-17 19:33:58 +00:00
}
2020-01-24 16:20:36 +00:00
std : : sort ( shard_default_databases . begin ( ) , shard_default_databases . end ( ) ) ;
shard_default_databases . erase ( std : : unique ( shard_default_databases . begin ( ) , shard_default_databases . end ( ) ) , shard_default_databases . end ( ) ) ;
assert ( use_local_default_database | | ! shard_default_databases . empty ( ) ) ;
2018-10-25 09:40:30 +00:00
2020-01-24 16:20:36 +00:00
if ( use_local_default_database & & ! shard_default_databases . empty ( ) )
throw Exception ( " Mixed local default DB and shard default DB in DDL query " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2018-04-17 19:33:58 +00:00
2020-01-24 16:20:36 +00:00
if ( use_local_default_database )
{
2020-06-20 22:44:52 +00:00
query_requires_access . replaceEmptyDatabase ( current_database ) ;
2020-01-24 16:20:36 +00:00
}
else
{
2020-06-20 22:44:52 +00:00
for ( size_t i = 0 ; i ! = query_requires_access . size ( ) ; )
2020-01-24 16:20:36 +00:00
{
2020-06-20 22:44:52 +00:00
auto & element = query_requires_access [ i ] ;
2020-01-24 16:20:36 +00:00
if ( element . isEmptyDatabase ( ) )
{
2020-06-20 22:44:52 +00:00
query_requires_access . insert ( query_requires_access . begin ( ) + i + 1 , shard_default_databases . size ( ) - 1 , element ) ;
for ( size_t j = 0 ; j ! = shard_default_databases . size ( ) ; + + j )
query_requires_access [ i + j ] . replaceEmptyDatabase ( shard_default_databases [ j ] ) ;
i + = shard_default_databases . size ( ) ;
2020-01-24 16:20:36 +00:00
}
2020-06-20 22:44:52 +00:00
else
+ + i ;
2020-01-24 16:20:36 +00:00
}
}
2018-10-25 09:40:30 +00:00
}
2020-09-09 09:58:59 +00:00
AddDefaultDatabaseVisitor visitor ( current_database , ! use_local_default_database ) ;
visitor . visitDDL ( query_ptr ) ;
2020-01-24 16:20:36 +00:00
/// Check access rights, assume that all servers have the same users config
2020-06-20 22:44:52 +00:00
if ( query_requires_grant_option )
context . getAccess ( ) - > checkGrantOption ( query_requires_access ) ;
else
context . checkAccess ( query_requires_access ) ;
2020-01-24 16:20:36 +00:00
2018-10-25 09:40:30 +00:00
DDLLogEntry entry ;
entry . hosts = std : : move ( hosts ) ;
entry . query = queryToString ( query_ptr ) ;
entry . initiator = ddl_worker . getCommonHostID ( ) ;
2017-04-18 15:44:31 +00:00
String node_path = ddl_worker . enqueueQuery ( entry ) ;
2017-04-13 16:12:56 +00:00
BlockIO io ;
2017-08-02 21:37:04 +00:00
if ( context . getSettingsRef ( ) . distributed_ddl_task_timeout = = 0 )
2017-04-18 15:44:31 +00:00
return io ;
2019-02-28 07:27:30 +00:00
auto stream = std : : make_shared < DDLQueryStatusInputStream > ( node_path , entry , context ) ;
2017-04-18 15:44:31 +00:00
io . in = std : : move ( stream ) ;
2017-04-13 16:12:56 +00:00
return io ;
}
2020-06-20 22:44:52 +00:00
BlockIO executeDDLQueryOnCluster ( const ASTPtr & query_ptr , const Context & context , const AccessRightsElements & query_requires_access , bool query_requires_grant_option )
{
return executeDDLQueryOnCluster ( query_ptr , context , AccessRightsElements { query_requires_access } , query_requires_grant_option ) ;
}
2017-04-13 16:12:56 +00:00
2020-04-05 23:03:20 +00:00
BlockIO executeDDLQueryOnCluster ( const ASTPtr & query_ptr_ , const Context & context )
{
return executeDDLQueryOnCluster ( query_ptr_ , context , { } ) ;
}
2017-04-13 13:42:29 +00:00
}