2017-04-13 13:42:29 +00:00
# include <Interpreters/DDLWorker.h>
2020-11-03 13:47:26 +00:00
# include <Interpreters/DDLTask.h>
2017-04-13 16:12:56 +00:00
# include <Parsers/ASTAlterQuery.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTDropQuery.h>
# include <Parsers/ASTOptimizeQuery.h>
2017-04-21 12:39:28 +00:00
# include <Parsers/ASTQueryWithOnCluster.h>
2019-06-26 14:52:20 +00:00
# include <Parsers/ASTQueryWithTableAndOutput.h>
2017-04-25 15:21:03 +00:00
# include <Parsers/ParserQuery.h>
# include <Parsers/parseQuery.h>
# include <Parsers/queryToString.h>
2017-04-17 17:04:31 +00:00
# include <IO/WriteHelpers.h>
# include <IO/ReadHelpers.h>
# include <IO/ReadBufferFromString.h>
2017-04-25 15:21:03 +00:00
# include <Storages/IStorage.h>
2017-04-13 13:42:29 +00:00
# include <Interpreters/executeQuery.h>
2017-04-13 16:12:56 +00:00
# include <Interpreters/Cluster.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2017-04-27 15:19:11 +00:00
# include <Common/setThreadName.h>
2017-09-09 23:17:38 +00:00
# include <Common/randomSeed.h>
2020-03-12 14:36:54 +00:00
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/ZooKeeper/KeeperException.h>
# include <Common/isLocalAddress.h>
2019-06-26 14:52:20 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
2017-04-19 14:21:27 +00:00
# include <Poco/Timestamp.h>
2020-03-12 14:36:54 +00:00
# include <common/sleep.h>
2020-09-02 22:35:47 +00:00
# include <common/getFQDNOrHostName.h>
2020-11-03 13:47:26 +00:00
# include <common/logger_useful.h>
2017-09-09 23:17:38 +00:00
# include <random>
# include <pcg_random.hpp>
2017-04-13 13:42:29 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED ;
2018-10-25 09:40:30 +00:00
extern const int LOGICAL_ERROR ;
2017-04-25 15:21:03 +00:00
extern const int INCONSISTENT_CLUSTER_DEFINITION ;
2017-04-27 15:19:11 +00:00
extern const int TIMEOUT_EXCEEDED ;
2017-07-26 19:31:32 +00:00
extern const int UNKNOWN_TYPE_OF_QUERY ;
2017-08-02 14:42:35 +00:00
extern const int UNFINISHED ;
2018-02-28 13:23:40 +00:00
extern const int QUERY_IS_PROHIBITED ;
2017-04-13 13:42:29 +00:00
}
2020-08-18 19:02:07 +00:00
namespace
{
/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
* and highlights your poor understanding of distributed systems .
*
* It ' s only correct if all the operations that are performed under lock
* are atomically checking that the lock still holds
* or if we ensure that these operations will be undone if lock is lost
* ( due to ZooKeeper session loss ) that ' s very difficult to achieve .
*
* It ' s Ok if every operation that we perform under lock is actually operation in ZooKeeper .
*
* In 1 % of cases when you can correctly use Lock , the logic is complex enough , so you don ' t need this class .
*
* TLDR : Don ' t use this code .
* We only have a few cases of it ' s usage and it will be removed .
*/
class ZooKeeperLock
2017-08-02 14:42:35 +00:00
{
2020-08-18 19:02:07 +00:00
public :
/// lock_prefix - path where the ephemeral lock node will be created
/// lock_name - the name of the ephemeral lock node
ZooKeeperLock (
const zkutil : : ZooKeeperPtr & zookeeper_ ,
const std : : string & lock_prefix_ ,
const std : : string & lock_name_ ,
const std : : string & lock_message_ = " " )
:
zookeeper ( zookeeper_ ) ,
lock_path ( lock_prefix_ + " / " + lock_name_ ) ,
lock_message ( lock_message_ ) ,
log ( & Poco : : Logger : : get ( " zkutil::Lock " ) )
{
zookeeper - > createIfNotExists ( lock_prefix_ , " " ) ;
}
~ ZooKeeperLock ( )
{
try
{
unlock ( ) ;
}
catch ( . . . )
{
DB : : tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
}
void unlock ( )
{
Coordination : : Stat stat ;
std : : string dummy ;
bool result = zookeeper - > tryGet ( lock_path , dummy , & stat ) ;
if ( result & & stat . ephemeralOwner = = zookeeper - > getClientID ( ) )
zookeeper - > remove ( lock_path , - 1 ) ;
else
LOG_WARNING ( log , " Lock is lost. It is normal if session was expired. Path: {}/{} " , lock_path , lock_message ) ;
}
bool tryLock ( )
{
std : : string dummy ;
Coordination : : Error code = zookeeper - > tryCreate ( lock_path , lock_message , zkutil : : CreateMode : : Ephemeral , dummy ) ;
if ( code = = Coordination : : Error : : ZNODEEXISTS )
{
return false ;
}
else if ( code = = Coordination : : Error : : ZOK )
{
return true ;
}
else
{
throw Coordination : : Exception ( code ) ;
}
}
private :
zkutil : : ZooKeeperPtr zookeeper ;
std : : string lock_path ;
std : : string lock_message ;
Poco : : Logger * log ;
} ;
std : : unique_ptr < ZooKeeperLock > createSimpleZooKeeperLock (
const zkutil : : ZooKeeperPtr & zookeeper , const String & lock_prefix , const String & lock_name , const String & lock_message )
2017-08-02 14:42:35 +00:00
{
2020-08-18 19:02:07 +00:00
return std : : make_unique < ZooKeeperLock > ( zookeeper , lock_prefix , lock_name , lock_message ) ;
}
2017-08-02 14:42:35 +00:00
}
2020-11-13 18:35:45 +00:00
DDLWorker : : DDLWorker ( int pool_size_ , const std : : string & zk_root_dir , const Context & context_ , const Poco : : Util : : AbstractConfiguration * config , const String & prefix ,
std : : optional < DatabaseReplicatedExtensions > database_replicated_ext_ )
2020-09-02 22:35:47 +00:00
: context ( context_ )
2020-11-13 18:35:45 +00:00
, log ( & Poco : : Logger : : get ( database_replicated_ext_ ? fmt : : format ( " DDLWorker ({}) " , database_replicated_ext_ - > database_name ) : " DDLWorker " ) )
, database_replicated_ext ( std : : move ( database_replicated_ext_ ) )
2020-09-02 22:35:47 +00:00
, pool_size ( pool_size_ )
, worker_pool ( pool_size_ )
2017-04-13 16:12:56 +00:00
{
2020-11-13 18:35:45 +00:00
assert ( ! database_replicated_ext | | pool_size = = 1 ) ;
2020-09-02 22:35:47 +00:00
last_tasks . reserve ( pool_size ) ;
2017-04-27 15:19:11 +00:00
queue_dir = zk_root_dir ;
if ( queue_dir . back ( ) = = ' / ' )
queue_dir . resize ( queue_dir . size ( ) - 1 ) ;
2017-04-13 13:42:29 +00:00
2017-08-01 14:41:00 +00:00
if ( config )
{
2017-08-14 05:44:04 +00:00
task_max_lifetime = config - > getUInt64 ( prefix + " .task_max_lifetime " , static_cast < UInt64 > ( task_max_lifetime ) ) ;
cleanup_delay_period = config - > getUInt64 ( prefix + " .cleanup_delay_period " , static_cast < UInt64 > ( cleanup_delay_period ) ) ;
2019-03-12 12:06:17 +00:00
max_tasks_in_queue = std : : max < UInt64 > ( 1 , config - > getUInt64 ( prefix + " .max_tasks_in_queue " , max_tasks_in_queue ) ) ;
2018-02-01 13:52:29 +00:00
if ( config - > has ( prefix + " .profile " ) )
context . setSetting ( " profile " , config - > getString ( prefix + " .profile " ) ) ;
}
2018-03-11 00:15:26 +00:00
if ( context . getSettingsRef ( ) . readonly )
2018-02-01 13:52:29 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Distributed DDL worker is run with readonly settings, it will not be able to execute DDL queries Set appropriate system_profile or distributed_ddl.profile to fix this. " ) ;
2017-08-01 14:41:00 +00:00
}
2017-07-26 19:31:32 +00:00
host_fqdn = getFQDNOrHostName ( ) ;
host_fqdn_id = Cluster : : Address : : toString ( host_fqdn , context . getTCPPort ( ) ) ;
2017-04-13 13:42:29 +00:00
2019-02-22 18:36:36 +00:00
main_thread = ThreadFromGlobalPool ( & DDLWorker : : runMainThread , this ) ;
cleanup_thread = ThreadFromGlobalPool ( & DDLWorker : : runCleanupThread , this ) ;
2017-04-13 13:42:29 +00:00
}
2020-11-13 18:35:45 +00:00
void DDLWorker : : shutdown ( )
2017-04-13 13:42:29 +00:00
{
stop_flag = true ;
2019-02-22 18:36:36 +00:00
queue_updated_event - > set ( ) ;
cleanup_event - > set ( ) ;
2020-11-13 18:35:45 +00:00
}
DDLWorker : : ~ DDLWorker ( )
{
shutdown ( ) ;
2020-09-02 22:35:47 +00:00
worker_pool . wait ( ) ;
2019-02-22 18:36:36 +00:00
main_thread . join ( ) ;
cleanup_thread . join ( ) ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
2020-11-13 18:35:45 +00:00
ZooKeeperPtr DDLWorker : : tryGetZooKeeper ( ) const
2019-02-22 16:14:48 +00:00
{
std : : lock_guard lock ( zookeeper_mutex ) ;
return current_zookeeper ;
}
2020-11-13 18:35:45 +00:00
ZooKeeperPtr DDLWorker : : getAndSetZooKeeper ( )
2019-02-22 16:14:48 +00:00
{
std : : lock_guard lock ( zookeeper_mutex ) ;
if ( ! current_zookeeper | | current_zookeeper - > expired ( ) )
current_zookeeper = context . getZooKeeper ( ) ;
return current_zookeeper ;
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : recoverZooKeeper ( )
{
LOG_DEBUG ( log , " Recovering ZooKeeper session after: {} " , getCurrentExceptionMessage ( false ) ) ;
2019-02-22 16:14:48 +00:00
2020-09-02 22:35:47 +00:00
while ( ! stop_flag )
{
try
{
getAndSetZooKeeper ( ) ;
break ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
sleepForSeconds ( 5 ) ;
}
}
}
DDLTaskPtr DDLWorker : : initAndCheckTask ( const String & entry_name , String & out_reason , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2017-07-28 16:14:49 +00:00
String node_data ;
2017-08-01 14:41:00 +00:00
String entry_path = queue_dir + " / " + entry_name ;
if ( ! zookeeper - > tryGet ( entry_path , node_data ) )
2017-07-28 16:14:49 +00:00
{
/// It is Ok that node could be deleted just now. It means that there are no current host in node's host list.
2017-08-12 20:00:00 +00:00
out_reason = " The task was deleted " ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
2017-04-13 16:12:56 +00:00
2017-08-01 14:41:00 +00:00
auto task = std : : make_unique < DDLTask > ( ) ;
task - > entry_name = entry_name ;
task - > entry_path = entry_path ;
2017-07-28 16:14:49 +00:00
try
{
2017-08-01 14:41:00 +00:00
task - > entry . parse ( node_data ) ;
2017-07-28 16:14:49 +00:00
}
catch ( . . . )
{
/// What should we do if we even cannot parse host name and therefore cannot properly submit execution status?
2019-01-22 19:56:53 +00:00
/// We can try to create fail node using FQDN if it equal to host name in cluster config attempt will be successful.
2019-02-28 07:27:30 +00:00
/// Otherwise, that node will be ignored by DDLQueryStatusInputStream.
2017-04-13 16:12:56 +00:00
2017-08-01 14:41:00 +00:00
tryLogCurrentException ( log , " Cannot parse DDL task " + entry_name + " , will try to send error status " ) ;
2017-04-13 16:12:56 +00:00
2017-07-28 16:14:49 +00:00
String status = ExecutionStatus : : fromCurrentException ( ) . serializeText ( ) ;
2017-08-01 14:41:00 +00:00
try
{
2019-02-22 16:14:48 +00:00
createStatusDirs ( entry_path , zookeeper ) ;
2017-08-01 14:41:00 +00:00
zookeeper - > tryCreate ( entry_path + " /finished/ " + host_fqdn_id , status , zkutil : : CreateMode : : Persistent ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " Can't report the task has invalid format " ) ;
}
2017-04-27 15:19:11 +00:00
2017-08-12 20:00:00 +00:00
out_reason = " Incorrect task format " ;
2020-09-02 22:35:47 +00:00
return { } ;
2017-07-28 16:14:49 +00:00
}
2020-11-13 18:35:45 +00:00
if ( database_replicated_ext )
2020-11-05 09:52:23 +00:00
{
task - > host_id . host_name = host_fqdn ;
task - > host_id . port = context . getTCPPort ( ) ;
2020-11-13 18:35:45 +00:00
task - > host_id_str = database_replicated_ext - > shard_name + ' | ' + database_replicated_ext - > replica_name ;
2020-11-05 09:52:23 +00:00
return task ;
}
2017-07-28 16:14:49 +00:00
bool host_in_hostlist = false ;
2017-08-01 14:41:00 +00:00
for ( const HostID & host : task - > entry . hosts )
2017-07-28 16:14:49 +00:00
{
2018-11-06 14:42:30 +00:00
auto maybe_secure_port = context . getTCPPortSecure ( ) ;
2018-11-19 15:09:19 +00:00
2018-11-18 22:04:11 +00:00
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_port = ( maybe_secure_port & & host . isLocalAddress ( * maybe_secure_port ) )
| | host . isLocalAddress ( context . getTCPPort ( ) ) ;
2018-11-06 16:38:01 +00:00
if ( ! is_local_port )
2017-04-18 15:44:31 +00:00
continue ;
2017-04-17 17:04:31 +00:00
2017-07-28 16:14:49 +00:00
if ( host_in_hostlist )
2017-07-26 19:31:32 +00:00
{
2017-07-28 16:14:49 +00:00
/// This check could be slow a little bit
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " There are two the same ClickHouse instances in task {}: {} and {}. Will use the first one only. " , entry_name , task - > host_id . readableString ( ) , host . readableString ( ) ) ;
2017-07-26 19:31:32 +00:00
}
2017-07-28 16:14:49 +00:00
else
2017-07-26 19:31:32 +00:00
{
2017-07-28 16:14:49 +00:00
host_in_hostlist = true ;
2017-08-01 14:41:00 +00:00
task - > host_id = host ;
task - > host_id_str = host . toString ( ) ;
2017-07-28 16:14:49 +00:00
}
}
2017-07-27 11:30:27 +00:00
2020-09-02 22:35:47 +00:00
if ( ! host_in_hostlist )
{
2017-08-12 20:00:00 +00:00
out_reason = " There is no a local address in host list " ;
2020-09-02 22:35:47 +00:00
return { } ;
}
2017-08-01 14:41:00 +00:00
2020-09-02 22:35:47 +00:00
return task ;
2017-07-28 16:14:49 +00:00
}
2017-07-27 11:30:27 +00:00
2017-08-02 14:42:35 +00:00
static void filterAndSortQueueNodes ( Strings & all_nodes )
{
all_nodes . erase ( std : : remove_if ( all_nodes . begin ( ) , all_nodes . end ( ) , [ ] ( const String & s ) { return ! startsWith ( s , " query- " ) ; } ) , all_nodes . end ( ) ) ;
std : : sort ( all_nodes . begin ( ) , all_nodes . end ( ) ) ;
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : scheduleTasks ( )
2017-07-28 16:14:49 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Scheduling tasks " ) ;
2019-02-22 16:14:48 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
2017-07-27 11:30:27 +00:00
2019-02-22 18:36:36 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir , nullptr , queue_updated_event ) ;
2017-08-02 14:42:35 +00:00
filterAndSortQueueNodes ( queue_nodes ) ;
2017-07-28 16:14:49 +00:00
if ( queue_nodes . empty ( ) )
return ;
2017-07-26 19:31:32 +00:00
2020-09-02 22:35:47 +00:00
bool server_startup = last_tasks . empty ( ) ;
2017-07-28 16:14:49 +00:00
auto begin_node = server_startup
? queue_nodes . begin ( )
2020-09-02 22:35:47 +00:00
: std : : upper_bound ( queue_nodes . begin ( ) , queue_nodes . end ( ) , last_tasks . back ( ) ) ;
2017-07-28 16:14:49 +00:00
for ( auto it = begin_node ; it ! = queue_nodes . end ( ) ; + + it )
{
String entry_name = * it ;
2020-09-02 22:35:47 +00:00
String reason ;
auto task = initAndCheckTask ( entry_name , reason , zookeeper ) ;
if ( ! task )
2017-07-26 19:31:32 +00:00
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Will not execute task {}: {} " , entry_name , reason ) ;
saveTask ( entry_name ) ;
continue ;
2017-07-26 19:31:32 +00:00
}
2020-09-02 22:35:47 +00:00
bool already_processed = zookeeper - > exists ( task - > entry_path + " /finished/ " + task - > host_id_str ) ;
if ( ! server_startup & & ! task - > was_executed & & already_processed )
2017-04-17 17:04:31 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Server expects that DDL task {} should be processed, but it was already processed according to ZK " ,
entry_name ) ;
2017-04-17 17:04:31 +00:00
}
2017-07-26 19:31:32 +00:00
if ( ! already_processed )
2017-04-17 17:04:31 +00:00
{
2020-09-02 22:35:47 +00:00
worker_pool . scheduleOrThrowOnError ( [ this , task_ptr = task . release ( ) ] ( )
2017-04-17 17:04:31 +00:00
{
2020-09-17 18:07:14 +00:00
setThreadName ( " DDLWorkerExec " ) ;
2020-09-02 22:35:47 +00:00
enqueueTask ( DDLTaskPtr ( task_ptr ) ) ;
} ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-27 15:19:11 +00:00
else
{
2020-09-02 22:35:47 +00:00
LOG_DEBUG ( log , " Task {} ({}) has been already processed " , entry_name , task - > entry . query ) ;
2017-07-28 16:14:49 +00:00
}
2020-09-02 22:35:47 +00:00
saveTask ( entry_name ) ;
2017-08-01 14:41:00 +00:00
if ( stop_flag )
break ;
2017-07-28 16:14:49 +00:00
}
}
2020-09-02 22:35:47 +00:00
void DDLWorker : : saveTask ( const String & entry_name )
{
if ( last_tasks . size ( ) = = pool_size )
{
last_tasks . erase ( last_tasks . begin ( ) ) ;
}
last_tasks . emplace_back ( entry_name ) ;
}
2017-07-28 16:14:49 +00:00
/// Parses query and resolves cluster and host in cluster
void DDLWorker : : parseQueryAndResolveHost ( DDLTask & task )
{
{
const char * begin = task . entry . query . data ( ) ;
const char * end = begin + task . entry . query . size ( ) ;
ParserQuery parser_query ( end ) ;
String description ;
2020-04-15 20:28:05 +00:00
task . query = parseQuery ( parser_query , begin , end , description , 0 , context . getSettingsRef ( ) . max_parser_depth ) ;
2017-07-28 16:14:49 +00:00
}
2019-03-11 12:49:39 +00:00
// XXX: serious design flaw since `ASTQueryWithOnCluster` is not inherited from `IAST`!
2017-07-28 16:14:49 +00:00
if ( ! task . query | | ! ( task . query_on_cluster = dynamic_cast < ASTQueryWithOnCluster * > ( task . query . get ( ) ) ) )
2018-09-28 14:53:20 +00:00
throw Exception ( " Received unknown DDL query " , ErrorCodes : : UNKNOWN_TYPE_OF_QUERY ) ;
2017-07-28 16:14:49 +00:00
2020-11-13 18:35:45 +00:00
if ( database_replicated_ext )
2020-11-05 09:52:23 +00:00
return ;
2017-07-28 16:14:49 +00:00
task . cluster_name = task . query_on_cluster - > cluster ;
task . cluster = context . tryGetCluster ( task . cluster_name ) ;
if ( ! task . cluster )
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" DDL task {} contains current host {} in cluster {}, but there are no such cluster here. " ,
task . entry_name , task . host_id . readableString ( ) , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
2017-08-02 20:33:29 +00:00
/// Try to find host from task host list in cluster
2017-07-28 16:14:49 +00:00
/// At the first, try find exact match (host name and ports should be literally equal)
/// If the attempt fails, try find it resolving host name of each instance
2017-08-12 20:00:00 +00:00
const auto & shards = task . cluster - > getShardsAddresses ( ) ;
2017-07-28 16:14:49 +00:00
bool found_exact_match = false ;
2020-06-08 06:25:01 +00:00
String default_database ;
2017-07-28 16:14:49 +00:00
for ( size_t shard_num = 0 ; shard_num < shards . size ( ) ; + + shard_num )
2017-08-11 19:56:32 +00:00
{
2017-07-28 16:14:49 +00:00
for ( size_t replica_num = 0 ; replica_num < shards [ shard_num ] . size ( ) ; + + replica_num )
{
const Cluster : : Address & address = shards [ shard_num ] [ replica_num ] ;
if ( address . host_name = = task . host_id . host_name & & address . port = = task . host_id . port )
{
if ( found_exact_match )
{
2020-06-09 03:13:27 +00:00
if ( default_database = = address . default_database )
2020-06-08 06:25:01 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" There are two exactly the same ClickHouse instances {} in cluster {} " ,
address . readableString ( ) , task . cluster_name ) ;
2020-06-09 03:13:27 +00:00
}
2020-06-17 08:44:52 +00:00
else
2020-06-08 06:25:01 +00:00
{
2020-06-17 08:44:52 +00:00
/* Circular replication is used.
* It is when every physical node contains
* replicas of different shards of the same table .
* To distinguish one replica from another on the same node ,
* every shard is placed into separate database .
* */
2020-06-08 06:25:01 +00:00
is_circular_replicated = true ;
2020-06-16 18:07:41 +00:00
auto * query_with_table = dynamic_cast < ASTQueryWithTableAndOutput * > ( task . query . get ( ) ) ;
2020-06-17 08:44:52 +00:00
if ( ! query_with_table | | query_with_table - > database . empty ( ) )
2020-06-08 06:25:01 +00:00
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" For a distributed DDL on circular replicated cluster its table name must be qualified by database name. " ) ;
2020-06-08 06:25:01 +00:00
}
2020-06-09 03:13:27 +00:00
if ( default_database = = query_with_table - > database )
return ;
2020-06-08 06:25:01 +00:00
}
2017-07-28 16:14:49 +00:00
}
found_exact_match = true ;
task . host_shard_num = shard_num ;
task . host_replica_num = replica_num ;
task . address_in_cluster = address ;
2020-06-08 06:25:01 +00:00
default_database = address . default_database ;
2017-07-28 16:14:49 +00:00
}
}
2017-08-11 19:56:32 +00:00
}
2017-07-28 16:14:49 +00:00
if ( found_exact_match )
return ;
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Not found the exact match of host {} from task {} in cluster {} definition. Will try to find it using host name resolving. " , task . host_id . readableString ( ) , task . entry_name , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
bool found_via_resolving = false ;
for ( size_t shard_num = 0 ; shard_num < shards . size ( ) ; + + shard_num )
2017-08-11 20:20:15 +00:00
{
2017-07-28 16:14:49 +00:00
for ( size_t replica_num = 0 ; replica_num < shards [ shard_num ] . size ( ) ; + + replica_num )
{
const Cluster : : Address & address = shards [ shard_num ] [ replica_num ] ;
2019-07-08 01:43:41 +00:00
if ( auto resolved = address . getResolvedAddress ( ) ;
resolved & & ( isLocalAddress ( * resolved , context . getTCPPort ( ) )
| | ( context . getTCPPortSecure ( ) & & isLocalAddress ( * resolved , * context . getTCPPortSecure ( ) ) ) ) )
2017-07-28 16:14:49 +00:00
{
if ( found_via_resolving )
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" There are two the same ClickHouse instances in cluster {} : {} and {} " ,
task . cluster_name , task . address_in_cluster . readableString ( ) , address . readableString ( ) ) ;
2017-07-28 16:14:49 +00:00
}
else
{
found_via_resolving = true ;
task . host_shard_num = shard_num ;
task . host_replica_num = replica_num ;
task . address_in_cluster = address ;
}
}
2017-04-27 15:19:11 +00:00
}
2017-08-11 20:20:15 +00:00
}
2017-04-17 17:04:31 +00:00
2017-07-28 16:14:49 +00:00
if ( ! found_via_resolving )
{
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : INCONSISTENT_CLUSTER_DEFINITION ,
" Not found host {} in definition of cluster {} " ,
task . host_id . readableString ( ) , task . cluster_name ) ;
2017-07-28 16:14:49 +00:00
}
else
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Resolved host {} from task {} as host {} in definition of cluster {} " , task . host_id . readableString ( ) , task . entry_name , task . address_in_cluster . readableString ( ) , task . cluster_name ) ;
2017-04-13 16:12:56 +00:00
}
2017-04-13 13:42:29 +00:00
}
2017-04-25 15:21:03 +00:00
2017-07-27 18:44:55 +00:00
bool DDLWorker : : tryExecuteQuery ( const String & query , const DDLTask & task , ExecutionStatus & status )
2017-04-25 15:21:03 +00:00
{
2017-07-28 16:14:49 +00:00
/// Add special comment at the start of query to easily identify DDL-produced queries in query_log
2017-08-11 20:20:15 +00:00
String query_prefix = " /* ddl_entry= " + task . entry_name + " */ " ;
2017-07-27 18:44:55 +00:00
String query_to_execute = query_prefix + query ;
ReadBufferFromString istr ( query_to_execute ) ;
String dummy_string ;
WriteBufferFromString ostr ( dummy_string ) ;
2017-04-25 15:21:03 +00:00
try
{
2020-09-02 22:35:47 +00:00
auto current_context = std : : make_unique < Context > ( context ) ;
2020-11-13 18:35:45 +00:00
if ( database_replicated_ext )
2020-11-05 09:52:23 +00:00
{
current_context - > getClientInfo ( ) . query_kind
= ClientInfo : : QueryKind : : REPLICATED_LOG_QUERY ; //FIXME why do we need separate query kind?
2020-11-13 18:35:45 +00:00
current_context - > setCurrentDatabase ( database_replicated_ext - > database_name ) ;
2020-11-05 09:52:23 +00:00
}
else
current_context - > getClientInfo ( ) . query_kind = ClientInfo : : QueryKind : : SECONDARY_QUERY ;
2018-08-27 11:03:22 +00:00
current_context - > setCurrentQueryId ( " " ) ; // generate random query_id
2020-03-03 15:32:41 +00:00
executeQuery ( istr , ostr , false , * current_context , { } ) ;
2017-04-25 15:21:03 +00:00
}
catch ( . . . )
{
2017-04-27 15:19:11 +00:00
status = ExecutionStatus : : fromCurrentException ( ) ;
2017-07-27 18:44:55 +00:00
tryLogCurrentException ( log , " Query " + query + " wasn't finished successfully " ) ;
2017-04-25 15:21:03 +00:00
return false ;
}
2017-04-27 15:19:11 +00:00
status = ExecutionStatus ( 0 ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executed query: {} " , query ) ;
2017-04-25 15:21:03 +00:00
return true ;
}
2018-09-28 13:44:39 +00:00
void DDLWorker : : attachToThreadGroup ( )
{
if ( thread_group )
{
/// Put all threads to one thread pool
CurrentThread : : attachToIfDetached ( thread_group ) ;
}
else
{
CurrentThread : : initializeQuery ( ) ;
thread_group = CurrentThread : : getGroup ( ) ;
}
}
2017-04-25 15:21:03 +00:00
2020-09-02 22:35:47 +00:00
void DDLWorker : : enqueueTask ( DDLTaskPtr task_ptr )
{
auto & task = * task_ptr ;
while ( ! stop_flag )
{
try
{
processTask ( task ) ;
return ;
}
catch ( const Coordination : : Exception & e )
{
if ( Coordination : : isHardwareError ( e . code ) )
{
recoverZooKeeper ( ) ;
}
else if ( e . code = = Coordination : : Error : : ZNONODE )
{
LOG_ERROR ( log , " ZooKeeper error: {} " , getCurrentExceptionMessage ( true ) ) ;
// TODO: retry?
}
else
{
LOG_ERROR ( log , " Unexpected ZooKeeper error: {}. " , getCurrentExceptionMessage ( true ) ) ;
return ;
}
}
catch ( . . . )
{
LOG_WARNING ( log , " An error occurred while processing task {} ({}) : {} " , task . entry_name , task . entry . query , getCurrentExceptionMessage ( true ) ) ;
}
}
}
void DDLWorker : : processTask ( DDLTask & task )
2017-04-25 15:21:03 +00:00
{
2020-09-02 22:35:47 +00:00
auto zookeeper = tryGetZooKeeper ( ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Processing task {} ({}) " , task . entry_name , task . entry . query ) ;
2017-04-25 15:21:03 +00:00
2017-08-02 14:42:35 +00:00
String dummy ;
2017-07-28 16:14:49 +00:00
String active_node_path = task . entry_path + " /active/ " + task . host_id_str ;
2017-08-02 14:42:35 +00:00
String finished_node_path = task . entry_path + " /finished/ " + task . host_id_str ;
2018-03-24 00:45:04 +00:00
auto code = zookeeper - > tryCreate ( active_node_path , " " , zkutil : : CreateMode : : Ephemeral , dummy ) ;
2018-11-06 14:42:30 +00:00
2020-06-12 15:09:12 +00:00
if ( code = = Coordination : : Error : : ZOK | | code = = Coordination : : Error : : ZNODEEXISTS )
2017-08-02 14:42:35 +00:00
{
// Ok
}
2020-06-12 15:09:12 +00:00
else if ( code = = Coordination : : Error : : ZNONODE )
2017-08-02 14:42:35 +00:00
{
/// There is no parent
2020-10-27 09:19:45 +00:00
//TODO why not to create parent before active_node?
2019-02-22 16:14:48 +00:00
createStatusDirs ( task . entry_path , zookeeper ) ;
2020-06-12 15:09:12 +00:00
if ( Coordination : : Error : : ZOK ! = zookeeper - > tryCreate ( active_node_path , " " , zkutil : : CreateMode : : Ephemeral , dummy ) )
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code , active_node_path ) ;
2017-08-02 14:42:35 +00:00
}
else
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code , active_node_path ) ;
2017-04-25 15:21:03 +00:00
2017-07-28 16:14:49 +00:00
if ( ! task . was_executed )
2017-04-27 15:19:11 +00:00
{
try
{
2020-06-08 06:25:01 +00:00
is_circular_replicated = false ;
2017-07-28 16:14:49 +00:00
parseQueryAndResolveHost ( task ) ;
ASTPtr rewritten_ast = task . query_on_cluster - > getRewrittenASTWithoutOnCluster ( task . address_in_cluster . default_database ) ;
2017-04-27 15:19:11 +00:00
String rewritten_query = queryToString ( rewritten_ast ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Executing query: {} " , rewritten_query ) ;
2017-04-25 15:21:03 +00:00
2020-04-22 06:01:33 +00:00
if ( auto * query_with_table = dynamic_cast < ASTQueryWithTableAndOutput * > ( rewritten_ast . get ( ) ) ; query_with_table )
2017-04-27 15:19:11 +00:00
{
2020-01-09 16:01:44 +00:00
StoragePtr storage ;
if ( ! query_with_table - > table . empty ( ) )
{
/// It's not CREATE DATABASE
2020-04-07 14:05:51 +00:00
auto table_id = context . tryResolveStorageID ( * query_with_table , Context : : ResolveOrdinary ) ;
2020-05-28 23:01:18 +00:00
storage = DatabaseCatalog : : instance ( ) . tryGetTable ( table_id , context ) ;
2020-01-09 16:01:44 +00:00
}
2019-06-26 14:52:20 +00:00
2020-06-08 06:25:01 +00:00
if ( storage & & taskShouldBeExecutedOnLeader ( rewritten_ast , storage ) & & ! is_circular_replicated )
2019-06-26 14:52:20 +00:00
tryExecuteQueryOnLeaderReplica ( task , storage , rewritten_query , task . entry_path , zookeeper ) ;
else
tryExecuteQuery ( rewritten_query , task , task . execution_status ) ;
2017-04-27 15:19:11 +00:00
}
else
2017-07-28 16:14:49 +00:00
tryExecuteQuery ( rewritten_query , task , task . execution_status ) ;
2017-04-27 15:19:11 +00:00
}
2018-08-25 01:58:14 +00:00
catch ( const Coordination : : Exception & )
2017-04-27 15:19:11 +00:00
{
throw ;
}
catch ( . . . )
{
2019-12-19 19:39:49 +00:00
tryLogCurrentException ( log , " An error occurred before execution of DDL task: " ) ;
task . execution_status = ExecutionStatus : : fromCurrentException ( " An error occurred before execution " ) ;
2017-04-27 15:19:11 +00:00
}
2020-08-08 01:01:47 +00:00
/// We need to distinguish ZK errors occurred before and after query executing
2017-07-28 16:14:49 +00:00
task . was_executed = true ;
2017-04-25 15:21:03 +00:00
}
2017-07-28 16:14:49 +00:00
/// FIXME: if server fails right here, the task will be executed twice. We need WAL here.
2017-04-27 15:19:11 +00:00
/// Delete active flag and create finish flag
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
ops . emplace_back ( zkutil : : makeRemoveRequest ( active_node_path , - 1 ) ) ;
ops . emplace_back ( zkutil : : makeCreateRequest ( finished_node_path , task . execution_status . serializeText ( ) , zkutil : : CreateMode : : Persistent ) ) ;
2017-08-02 14:42:35 +00:00
zookeeper - > multi ( ops ) ;
2017-04-27 15:19:11 +00:00
}
2017-04-25 15:21:03 +00:00
2020-03-18 00:57:00 +00:00
bool DDLWorker : : taskShouldBeExecutedOnLeader ( const ASTPtr ast_ddl , const StoragePtr storage )
2017-04-27 15:19:11 +00:00
{
2019-06-26 14:52:20 +00:00
/// Pure DROP queries have to be executed on each node separately
2020-04-22 06:01:33 +00:00
if ( auto * query = ast_ddl - > as < ASTDropQuery > ( ) ; query & & query - > kind ! = ASTDropQuery : : Kind : : Truncate )
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
if ( ! ast_ddl - > as < ASTAlterQuery > ( ) & & ! ast_ddl - > as < ASTOptimizeQuery > ( ) & & ! ast_ddl - > as < ASTDropQuery > ( ) )
return false ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
return storage - > supportsReplication ( ) ;
}
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
bool DDLWorker : : tryExecuteQueryOnLeaderReplica (
DDLTask & task ,
StoragePtr storage ,
const String & rewritten_query ,
const String & node_path ,
const ZooKeeperPtr & zookeeper )
{
StorageReplicatedMergeTree * replicated_storage = dynamic_cast < StorageReplicatedMergeTree * > ( storage . get ( ) ) ;
/// If we will develop new replicated storage
if ( ! replicated_storage )
2020-09-02 22:35:47 +00:00
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Storage type '{}' is not supported by distributed DDL " , storage - > getName ( ) ) ;
2017-04-25 15:21:03 +00:00
2018-04-19 19:25:54 +00:00
/// Generate unique name for shard node, it will be used to execute the query by only single host
/// Shard node name has format 'replica_name1,replica_name2,...,replica_nameN'
/// Where replica_name is 'replica_config_host_name:replica_port'
auto get_shard_name = [ ] ( const Cluster : : Addresses & shard_addresses )
2017-04-27 15:19:11 +00:00
{
Strings replica_names ;
2018-04-19 19:25:54 +00:00
for ( const Cluster : : Address & address : shard_addresses )
replica_names . emplace_back ( address . readableString ( ) ) ;
2017-04-27 15:19:11 +00:00
std : : sort ( replica_names . begin ( ) , replica_names . end ( ) ) ;
2017-04-25 15:21:03 +00:00
2018-04-19 19:25:54 +00:00
String res ;
2017-04-27 15:19:11 +00:00
for ( auto it = replica_names . begin ( ) ; it ! = replica_names . end ( ) ; + + it )
2018-04-19 19:25:54 +00:00
res + = * it + ( std : : next ( it ) ! = replica_names . end ( ) ? " , " : " " ) ;
return res ;
} ;
2017-04-25 15:21:03 +00:00
2020-11-05 09:52:23 +00:00
String shard_node_name ;
2020-11-13 18:35:45 +00:00
if ( database_replicated_ext )
shard_node_name = database_replicated_ext - > shard_name ;
2020-11-05 09:52:23 +00:00
else
shard_node_name = get_shard_name ( task . cluster - > getShardsAddresses ( ) . at ( task . host_shard_num ) ) ;
2019-06-26 14:52:20 +00:00
String shard_path = node_path + " /shards/ " + shard_node_name ;
String is_executed_path = shard_path + " /executed " ;
2020-08-07 09:18:34 +00:00
String tries_to_execute_path = shard_path + " /tries_to_execute " ;
2019-06-26 14:52:20 +00:00
zookeeper - > createAncestors ( shard_path + " / " ) ;
2017-04-25 15:21:03 +00:00
2020-08-07 09:18:34 +00:00
/// Node exists, or we will create or we will get an exception
zookeeper - > tryCreate ( tries_to_execute_path , " 0 " , zkutil : : CreateMode : : Persistent ) ;
2018-12-21 17:28:21 +00:00
2020-08-07 09:18:34 +00:00
static constexpr int MAX_TRIES_TO_EXECUTE = 3 ;
String executed_by ;
zkutil : : EventPtr event = std : : make_shared < Poco : : Event > ( ) ;
2020-09-30 14:58:27 +00:00
if ( zookeeper - > tryGet ( is_executed_path , executed_by , nullptr , event ) )
2020-08-07 09:18:34 +00:00
{
2020-08-24 09:07:37 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , executed_by ) ;
2020-08-07 09:18:34 +00:00
return true ;
}
2018-12-21 17:28:21 +00:00
2019-06-26 14:52:20 +00:00
pcg64 rng ( randomSeed ( ) ) ;
2018-12-21 17:28:21 +00:00
2019-06-26 14:52:20 +00:00
auto lock = createSimpleZooKeeperLock ( zookeeper , shard_path , " lock " , task . host_id_str ) ;
2020-08-07 09:18:34 +00:00
2019-06-26 14:52:20 +00:00
bool executed_by_leader = false ;
2020-08-07 09:18:34 +00:00
while ( true )
2019-06-26 14:52:20 +00:00
{
StorageReplicatedMergeTree : : Status status ;
replicated_storage - > getStatus ( status ) ;
2017-04-25 15:21:03 +00:00
2020-08-07 09:18:34 +00:00
/// Any replica which is leader tries to take lock
2019-06-26 14:52:20 +00:00
if ( status . is_leader & & lock - > tryLock ( ) )
{
2020-08-24 09:07:37 +00:00
/// In replicated merge tree we can have multiple leaders. So we can
/// be "leader", but another "leader" replica may already execute
/// this task.
if ( zookeeper - > tryGet ( is_executed_path , executed_by ) )
{
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , executed_by ) ;
executed_by_leader = true ;
break ;
}
2020-08-07 09:18:34 +00:00
/// Doing it exclusively
size_t counter = parse < int > ( zookeeper - > get ( tries_to_execute_path ) ) ;
if ( counter > MAX_TRIES_TO_EXECUTE )
2019-06-26 14:52:20 +00:00
break ;
2020-08-24 09:07:37 +00:00
2020-08-07 09:18:34 +00:00
zookeeper - > set ( tries_to_execute_path , toString ( counter + 1 ) ) ;
2017-04-25 15:21:03 +00:00
2019-06-26 14:52:20 +00:00
/// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock
if ( tryExecuteQuery ( rewritten_query , task , task . execution_status ) )
{
zookeeper - > create ( is_executed_path , task . host_id_str , zkutil : : CreateMode : : Persistent ) ;
executed_by_leader = true ;
break ;
2017-04-25 15:21:03 +00:00
}
2020-08-07 09:18:34 +00:00
lock - > unlock ( ) ;
2018-04-19 19:25:54 +00:00
}
2019-06-26 14:52:20 +00:00
2020-08-07 09:18:34 +00:00
if ( event - > tryWait ( std : : uniform_int_distribution < int > ( 0 , 1000 ) ( rng ) ) )
{
2020-09-30 14:58:27 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2020-08-07 09:18:34 +00:00
executed_by_leader = true ;
break ;
}
else if ( parse < int > ( zookeeper - > get ( tries_to_execute_path ) ) > MAX_TRIES_TO_EXECUTE )
{
/// Nobody will try to execute query again
break ;
}
2017-04-25 15:21:03 +00:00
}
2019-06-26 14:52:20 +00:00
/// Not executed by leader so was not executed at all
if ( ! executed_by_leader )
2017-04-25 15:21:03 +00:00
{
2020-08-07 09:18:34 +00:00
task . execution_status = ExecutionStatus ( ErrorCodes : : NOT_IMPLEMENTED , " Cannot execute replicated DDL query " ) ;
2019-06-26 14:52:20 +00:00
return false ;
2017-04-25 15:21:03 +00:00
}
2020-08-07 09:18:34 +00:00
LOG_DEBUG ( log , " Task {} has already been executed by replica ({}) of the same shard. " , task . entry_name , zookeeper - > get ( is_executed_path ) ) ;
2019-06-26 14:52:20 +00:00
return true ;
2017-04-25 15:21:03 +00:00
}
2019-02-22 18:36:36 +00:00
void DDLWorker : : cleanupQueue ( Int64 current_time_seconds , const ZooKeeperPtr & zookeeper )
2017-04-19 14:21:27 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Cleaning queue " ) ;
2017-04-27 15:19:11 +00:00
2017-08-02 14:42:35 +00:00
Strings queue_nodes = zookeeper - > getChildren ( queue_dir ) ;
filterAndSortQueueNodes ( queue_nodes ) ;
size_t num_outdated_nodes = ( queue_nodes . size ( ) > max_tasks_in_queue ) ? queue_nodes . size ( ) - max_tasks_in_queue : 0 ;
auto first_non_outdated_node = queue_nodes . begin ( ) + num_outdated_nodes ;
2017-04-19 14:21:27 +00:00
2017-08-02 14:42:35 +00:00
for ( auto it = queue_nodes . cbegin ( ) ; it < queue_nodes . cend ( ) ; + + it )
2017-04-19 14:21:27 +00:00
{
2019-02-22 18:36:36 +00:00
if ( stop_flag )
return ;
2017-08-02 14:42:35 +00:00
String node_name = * it ;
2017-08-01 14:41:00 +00:00
String node_path = queue_dir + " / " + node_name ;
2017-08-02 14:42:35 +00:00
String lock_path = node_path + " /lock " ;
2017-08-01 14:41:00 +00:00
2018-08-25 01:58:14 +00:00
Coordination : : Stat stat ;
2017-08-02 14:42:35 +00:00
String dummy ;
2017-08-01 14:41:00 +00:00
2017-04-19 14:21:27 +00:00
try
{
2017-08-12 17:39:14 +00:00
/// Already deleted
if ( ! zookeeper - > exists ( node_path , & stat ) )
continue ;
2020-01-11 09:50:41 +00:00
/// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
2019-02-22 18:36:36 +00:00
constexpr UInt64 zookeeper_time_resolution = 1000 ;
2017-08-13 09:27:38 +00:00
Int64 zookeeper_time_seconds = stat . ctime / zookeeper_time_resolution ;
2017-08-12 17:39:14 +00:00
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds ;
/// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
bool node_is_outside_max_window = it < first_non_outdated_node ;
if ( ! node_lifetime_is_expired & & ! node_is_outside_max_window )
continue ;
/// Skip if there are active nodes (it is weak guard)
if ( zookeeper - > exists ( node_path + " /active " , & stat ) & & stat . numChildren > 0 )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} should be deleted, but there are active workers. Skipping it. " , node_name ) ;
2017-08-12 17:39:14 +00:00
continue ;
}
2017-08-10 19:12:52 +00:00
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
2017-08-02 14:42:35 +00:00
auto lock = createSimpleZooKeeperLock ( zookeeper , node_path , " lock " , host_fqdn_id ) ;
if ( ! lock - > tryLock ( ) )
2017-08-12 17:39:14 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} should be deleted, but it is locked. Skipping it. " , node_name ) ;
2017-08-02 14:42:35 +00:00
continue ;
2017-08-12 17:39:14 +00:00
}
2017-08-02 14:42:35 +00:00
2017-08-12 17:39:14 +00:00
if ( node_lifetime_is_expired )
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Lifetime of task {} is expired, deleting it " , node_name ) ;
2017-08-12 17:39:14 +00:00
else if ( node_is_outside_max_window )
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Task {} is outdated, deleting it " , node_name ) ;
2017-08-12 17:39:14 +00:00
/// Deleting
2017-08-01 14:41:00 +00:00
{
2020-08-08 01:01:47 +00:00
Strings children = zookeeper - > getChildren ( node_path ) ;
for ( const String & child : children )
2017-08-01 14:41:00 +00:00
{
2017-08-02 14:42:35 +00:00
if ( child ! = " lock " )
zookeeper - > tryRemoveRecursive ( node_path + " / " + child ) ;
2017-08-01 14:41:00 +00:00
}
2017-04-19 14:21:27 +00:00
2017-08-02 14:42:35 +00:00
/// Remove the lock node and its parent atomically
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
ops . emplace_back ( zkutil : : makeRemoveRequest ( lock_path , - 1 ) ) ;
ops . emplace_back ( zkutil : : makeRemoveRequest ( node_path , - 1 ) ) ;
2017-08-02 14:42:35 +00:00
zookeeper - > multi ( ops ) ;
2017-04-19 14:21:27 +00:00
}
}
catch ( . . . )
{
2020-08-08 01:01:47 +00:00
LOG_INFO ( log , " An error occurred while checking and cleaning task {} from queue: {} " , node_name , getCurrentExceptionMessage ( false ) ) ;
2017-04-19 14:21:27 +00:00
}
}
}
2017-04-13 16:12:56 +00:00
2017-08-11 20:20:15 +00:00
/// Try to create nonexisting "status" dirs for a node
2019-02-22 16:14:48 +00:00
void DDLWorker : : createStatusDirs ( const std : : string & node_path , const ZooKeeperPtr & zookeeper )
2017-04-13 13:42:29 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : Requests ops ;
2018-03-24 00:45:04 +00:00
{
2018-08-25 01:58:14 +00:00
Coordination : : CreateRequest request ;
2018-03-24 00:45:04 +00:00
request . path = node_path + " /active " ;
2018-08-25 01:58:14 +00:00
ops . emplace_back ( std : : make_shared < Coordination : : CreateRequest > ( std : : move ( request ) ) ) ;
2018-03-24 00:45:04 +00:00
}
{
2018-08-25 01:58:14 +00:00
Coordination : : CreateRequest request ;
2018-03-24 00:45:04 +00:00
request . path = node_path + " /finished " ;
2018-08-25 01:58:14 +00:00
ops . emplace_back ( std : : make_shared < Coordination : : CreateRequest > ( std : : move ( request ) ) ) ;
2018-03-24 00:45:04 +00:00
}
2018-08-25 01:58:14 +00:00
Coordination : : Responses responses ;
2020-06-12 15:09:12 +00:00
Coordination : : Error code = zookeeper - > tryMulti ( ops , responses ) ;
if ( code ! = Coordination : : Error : : ZOK
& & code ! = Coordination : : Error : : ZNODEEXISTS )
2018-08-25 01:58:14 +00:00
throw Coordination : : Exception ( code ) ;
2017-04-17 17:04:31 +00:00
}
2017-04-13 13:42:29 +00:00
2017-04-18 15:44:31 +00:00
String DDLWorker : : enqueueQuery ( DDLLogEntry & entry )
2017-04-13 16:12:56 +00:00
{
2020-11-13 18:35:45 +00:00
if ( entry . hosts . empty ( ) & & ! database_replicated_ext )
2017-08-02 14:42:35 +00:00
throw Exception ( " Empty host list in a distributed DDL task " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-13 16:12:56 +00:00
2019-02-22 16:14:48 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
2017-04-27 15:19:11 +00:00
String query_path_prefix = queue_dir + " /query- " ;
2017-04-17 17:04:31 +00:00
zookeeper - > createAncestors ( query_path_prefix ) ;
2017-04-13 16:12:56 +00:00
2018-03-21 21:40:53 +00:00
String node_path = zookeeper - > create ( query_path_prefix , entry . toString ( ) , zkutil : : CreateMode : : PersistentSequential ) ;
2017-08-02 14:42:35 +00:00
/// Optional step
try
{
2019-02-22 16:14:48 +00:00
createStatusDirs ( node_path , zookeeper ) ;
2017-08-02 14:42:35 +00:00
}
catch ( . . . )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " An error occurred while creating auxiliary ZooKeeper directories in {} . They will be created later. Error : {} " , node_path , getCurrentExceptionMessage ( true ) ) ;
2017-08-02 14:42:35 +00:00
}
2017-04-18 15:44:31 +00:00
return node_path ;
2017-04-13 13:42:29 +00:00
}
2017-04-13 16:12:56 +00:00
2019-02-22 18:36:36 +00:00
void DDLWorker : : runMainThread ( )
2017-04-13 13:42:29 +00:00
{
2017-04-27 15:19:11 +00:00
setThreadName ( " DDLWorker " ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Started DDLWorker thread " ) ;
2017-04-18 15:44:31 +00:00
2017-08-01 14:41:00 +00:00
bool initialized = false ;
do
2017-07-31 18:57:13 +00:00
{
2017-08-01 14:41:00 +00:00
try
{
2019-12-27 19:30:13 +00:00
auto zookeeper = getAndSetZooKeeper ( ) ;
zookeeper - > createAncestors ( queue_dir + " / " ) ;
initialized = true ;
}
catch ( const Coordination : : Exception & e )
{
if ( ! Coordination : : isHardwareError ( e . code ) )
throw ; /// A logical error.
2018-11-10 17:52:25 +00:00
2019-12-27 19:30:13 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
2018-11-10 17:52:25 +00:00
2019-12-27 19:30:13 +00:00
/// Avoid busy loop when ZooKeeper is not available.
sleepForSeconds ( 1 ) ;
2017-08-01 14:41:00 +00:00
}
catch ( . . . )
{
2017-08-14 05:51:03 +00:00
tryLogCurrentException ( log , " Terminating. Cannot initialize DDL queue. " ) ;
return ;
2017-08-01 14:41:00 +00:00
}
2019-02-22 18:36:36 +00:00
}
while ( ! initialized & & ! stop_flag ) ;
2017-08-01 14:41:00 +00:00
2017-04-13 13:42:29 +00:00
while ( ! stop_flag )
{
try
{
2018-09-28 15:30:03 +00:00
attachToThreadGroup ( ) ;
2019-02-22 18:36:36 +00:00
cleanup_event - > set ( ) ;
2020-09-02 22:35:47 +00:00
scheduleTasks ( ) ;
2017-04-13 13:42:29 +00:00
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Waiting a watch " ) ;
2019-02-22 18:36:36 +00:00
queue_updated_event - > wait ( ) ;
2017-04-19 14:21:27 +00:00
}
2018-09-24 18:44:09 +00:00
catch ( const Coordination : : Exception & e )
2017-04-27 15:19:11 +00:00
{
2018-08-25 01:58:14 +00:00
if ( Coordination : : isHardwareError ( e . code ) )
2017-07-28 16:14:49 +00:00
{
2020-09-02 22:35:47 +00:00
recoverZooKeeper ( ) ;
2017-07-28 16:14:49 +00:00
}
2020-06-12 15:09:12 +00:00
else if ( e . code = = Coordination : : Error : : ZNONODE )
2019-05-30 17:25:29 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " ZooKeeper error: {} " , getCurrentExceptionMessage ( true ) ) ;
2019-05-30 17:25:29 +00:00
}
2017-07-28 16:14:49 +00:00
else
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " Unexpected ZooKeeper error: {}. Terminating. " , getCurrentExceptionMessage ( true ) ) ;
2017-08-14 05:51:03 +00:00
return ;
2017-07-28 16:14:49 +00:00
}
2017-04-27 15:19:11 +00:00
}
2017-04-19 14:21:27 +00:00
catch ( . . . )
{
2018-11-06 14:42:30 +00:00
tryLogCurrentException ( log , " Unexpected error, will terminate: " ) ;
2017-08-14 05:51:03 +00:00
return ;
2017-04-19 14:21:27 +00:00
}
2017-04-13 13:42:29 +00:00
}
}
2017-04-13 16:12:56 +00:00
2019-02-22 18:36:36 +00:00
void DDLWorker : : runCleanupThread ( )
{
setThreadName ( " DDLWorkerClnr " ) ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Started DDLWorker cleanup thread " ) ;
2019-02-22 18:36:36 +00:00
Int64 last_cleanup_time_seconds = 0 ;
while ( ! stop_flag )
{
try
{
cleanup_event - > wait ( ) ;
if ( stop_flag )
break ;
Int64 current_time_seconds = Poco : : Timestamp ( ) . epochTime ( ) ;
if ( last_cleanup_time_seconds & & current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period )
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Too early to clean queue, will do it later. " ) ;
2019-02-22 18:36:36 +00:00
continue ;
}
auto zookeeper = tryGetZooKeeper ( ) ;
if ( zookeeper - > expired ( ) )
continue ;
cleanupQueue ( current_time_seconds , zookeeper ) ;
last_cleanup_time_seconds = current_time_seconds ;
}
catch ( . . . )
{
tryLogCurrentException ( log , __PRETTY_FUNCTION__ ) ;
}
}
}
2017-04-13 13:42:29 +00:00
}