2017-04-01 09:19:00 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
# include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
# include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
2017-06-25 00:01:10 +00:00
# include <Interpreters/PartLog.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/IBlockOutputStream.h>
# include <Common/SipHash.h>
2018-04-03 17:35:48 +00:00
# include <Common/ZooKeeper/KeeperException.h>
2017-04-01 09:19:00 +00:00
# include <IO/Operators.h>
2016-01-17 05:22:22 +00:00
2018-04-03 17:35:48 +00:00
2018-01-25 18:46:24 +00:00
namespace ProfileEvents
{
extern const Event DuplicatedInsertedBlocks ;
}
2016-01-17 05:22:22 +00:00
namespace DB
{
namespace ErrorCodes
{
2018-12-07 03:20:27 +00:00
extern const int TOO_FEW_LIVE_REPLICAS ;
2017-04-01 07:20:54 +00:00
extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE ;
extern const int UNEXPECTED_ZOOKEEPER_ERROR ;
extern const int NO_ZOOKEEPER ;
extern const int READONLY ;
extern const int UNKNOWN_STATUS_OF_INSERT ;
2018-01-23 22:56:46 +00:00
extern const int INSERT_WAS_DEDUPLICATED ;
2018-11-22 21:19:58 +00:00
extern const int TIMEOUT_EXCEEDED ;
extern const int NO_ACTIVE_REPLICAS ;
2020-06-15 18:57:38 +00:00
extern const int DUPLICATE_DATA_PART ;
2020-06-16 01:13:45 +00:00
extern const int LOGICAL_ERROR ;
2016-01-17 05:22:22 +00:00
}
ReplicatedMergeTreeBlockOutputStream : : ReplicatedMergeTreeBlockOutputStream (
2019-08-03 11:02:40 +00:00
StorageReplicatedMergeTree & storage_ , size_t quorum_ , size_t quorum_timeout_ms_ , size_t max_parts_per_block_ , bool deduplicate_ )
: storage ( storage_ ) , quorum ( quorum_ ) , quorum_timeout_ms ( quorum_timeout_ms_ ) , max_parts_per_block ( max_parts_per_block_ ) , deduplicate ( deduplicate_ ) ,
2020-05-30 21:57:37 +00:00
log ( & Poco : : Logger : : get ( storage . getLogName ( ) + " (Replicated OutputStream) " ) )
2016-01-17 05:22:22 +00:00
{
2017-04-01 07:20:54 +00:00
/// The quorum value `1` has the same meaning as if it is disabled.
if ( quorum = = 1 )
quorum = 0 ;
2016-01-17 05:22:22 +00:00
}
2018-02-19 00:45:32 +00:00
Block ReplicatedMergeTreeBlockOutputStream : : getHeader ( ) const
{
return storage . getSampleBlock ( ) ;
}
2017-03-12 19:18:07 +00:00
/// Allow to verify that the session in ZooKeeper is still alive.
2016-01-17 05:22:22 +00:00
static void assertSessionIsNotExpired ( zkutil : : ZooKeeperPtr & zookeeper )
{
2017-04-01 07:20:54 +00:00
if ( ! zookeeper )
throw Exception ( " No ZooKeeper session. " , ErrorCodes : : NO_ZOOKEEPER ) ;
2016-01-17 08:12:48 +00:00
2017-04-01 07:20:54 +00:00
if ( zookeeper - > expired ( ) )
throw Exception ( " ZooKeeper session has been expired. " , ErrorCodes : : NO_ZOOKEEPER ) ;
2016-01-17 05:22:22 +00:00
}
2017-06-25 00:01:10 +00:00
void ReplicatedMergeTreeBlockOutputStream : : checkQuorumPrecondition ( zkutil : : ZooKeeperPtr & zookeeper )
{
quorum_info . status_path = storage . zookeeper_path + " /quorum/status " ;
2018-08-25 01:58:14 +00:00
std : : future < Coordination : : GetResponse > quorum_status_future = zookeeper - > asyncTryGet ( quorum_info . status_path ) ;
std : : future < Coordination : : GetResponse > is_active_future = zookeeper - > asyncTryGet ( storage . replica_path + " /is_active " ) ;
std : : future < Coordination : : GetResponse > host_future = zookeeper - > asyncTryGet ( storage . replica_path + " /host " ) ;
2017-06-25 00:01:10 +00:00
/// List of live replicas. All of them register an ephemeral node for leader_election.
2018-08-25 01:58:14 +00:00
Coordination : : Stat leader_election_stat ;
2017-06-25 00:01:10 +00:00
zookeeper - > get ( storage . zookeeper_path + " /leader_election " , & leader_election_stat ) ;
if ( leader_election_stat . numChildren < static_cast < int32_t > ( quorum ) )
throw Exception ( " Number of alive replicas ( "
+ toString ( leader_election_stat . numChildren ) + " ) is less than requested quorum ( " + toString ( quorum ) + " ). " ,
2018-12-07 03:20:27 +00:00
ErrorCodes : : TOO_FEW_LIVE_REPLICAS ) ;
2017-06-25 00:01:10 +00:00
/** Is there a quorum for the last part for which a quorum is needed?
* Write of all the parts with the included quorum is linearly ordered .
* This means that at any time there can be only one part ,
* for which you need , but not yet reach the quorum .
* Information about this part will be located in ` / quorum / status ` node .
* If the quorum is reached , then the node is deleted .
*/
auto quorum_status = quorum_status_future . get ( ) ;
2020-06-12 15:09:12 +00:00
if ( quorum_status . error ! = Coordination : : Error : : ZNONODE )
2018-03-24 01:00:12 +00:00
throw Exception ( " Quorum for previous write has not been satisfied yet. Status: " + quorum_status . data , ErrorCodes : : UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE ) ;
2017-06-25 00:01:10 +00:00
/// Both checks are implicitly made also later (otherwise there would be a race condition).
auto is_active = is_active_future . get ( ) ;
auto host = host_future . get ( ) ;
2020-06-12 15:09:12 +00:00
if ( is_active . error = = Coordination : : Error : : ZNONODE | | host . error = = Coordination : : Error : : ZNONODE )
2017-06-25 00:01:10 +00:00
throw Exception ( " Replica is not active right now " , ErrorCodes : : READONLY ) ;
2018-03-24 01:00:12 +00:00
quorum_info . is_active_node_value = is_active . data ;
2017-06-25 00:01:10 +00:00
quorum_info . is_active_node_version = is_active . stat . version ;
quorum_info . host_node_version = host . stat . version ;
}
2016-01-17 05:22:22 +00:00
void ReplicatedMergeTreeBlockOutputStream : : write ( const Block & block )
{
2017-10-24 19:32:23 +00:00
last_block_is_duplicate = false ;
2017-06-25 00:01:10 +00:00
/// TODO Is it possible to not lock the table structure here?
2019-05-03 02:00:57 +00:00
storage . delayInsertOrThrowIfNeeded ( & storage . partial_shutdown_event ) ;
2017-04-01 07:20:54 +00:00
auto zookeeper = storage . getZooKeeper ( ) ;
assertSessionIsNotExpired ( zookeeper ) ;
/** If write is with quorum, then we check that the required number of replicas is now live,
2017-06-25 00:01:10 +00:00
* and also that for all previous parts for which quorum is required , this quorum is reached .
2017-04-01 07:20:54 +00:00
* And also check that during the insertion , the replica was not reinitialized or disabled ( by the value of ` is_active ` node ) .
* TODO Too complex logic , you can do better .
*/
if ( quorum )
2017-06-25 00:01:10 +00:00
checkQuorumPrecondition ( zookeeper ) ;
2017-04-01 07:20:54 +00:00
2019-03-29 09:33:39 +00:00
auto part_blocks = storage . writer . splitBlockIntoParts ( block , max_parts_per_block ) ;
2017-04-01 07:20:54 +00:00
for ( auto & current_block : part_blocks )
{
2017-06-25 00:01:10 +00:00
Stopwatch watch ;
2017-04-01 07:20:54 +00:00
2017-06-25 00:01:10 +00:00
/// Write part to the filesystem under temporary name. Calculate a checksum.
2017-04-01 07:20:54 +00:00
2017-06-25 00:01:10 +00:00
MergeTreeData : : MutableDataPartPtr part = storage . writer . writeTempPart ( current_block ) ;
2017-04-01 07:20:54 +00:00
2017-10-24 19:32:23 +00:00
String block_id ;
2017-04-01 07:20:54 +00:00
2017-10-24 19:32:23 +00:00
if ( deduplicate )
{
SipHash hash ;
2018-03-21 20:21:34 +00:00
part - > checksums . computeTotalChecksumDataOnly ( hash ) ;
2017-10-24 19:32:23 +00:00
union
{
char bytes [ 16 ] ;
UInt64 words [ 2 ] ;
} hash_value ;
hash . get128 ( hash_value . bytes ) ;
2017-04-01 07:20:54 +00:00
2017-11-15 20:05:10 +00:00
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
2017-11-15 16:32:47 +00:00
block_id = part - > info . partition_id + " _ " + toString ( hash_value . words [ 0 ] ) + " _ " + toString ( hash_value . words [ 1 ] ) ;
2017-04-01 07:20:54 +00:00
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Wrote block with ID '{}', {} rows " , block_id , current_block . block . rows ( ) ) ;
2017-10-24 19:32:23 +00:00
}
else
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Wrote block with {} rows " , current_block . block . rows ( ) ) ;
2017-10-24 19:32:23 +00:00
}
2017-04-01 07:20:54 +00:00
2018-01-23 22:56:46 +00:00
try
{
commitPart ( zookeeper , part , block_id ) ;
/// Set a special error code if the block is duplicate
int error = ( deduplicate & & last_block_is_duplicate ) ? ErrorCodes : : INSERT_WAS_DEDUPLICATED : 0 ;
2019-01-04 12:10:00 +00:00
PartLog : : addNewPart ( storage . global_context , part , watch . elapsed ( ) , ExecutionStatus ( error ) ) ;
2018-01-23 22:56:46 +00:00
}
catch ( . . . )
{
2019-01-04 12:10:00 +00:00
PartLog : : addNewPart ( storage . global_context , part , watch . elapsed ( ) , ExecutionStatus : : fromCurrentException ( __PRETTY_FUNCTION__ ) ) ;
2018-01-23 22:56:46 +00:00
throw ;
}
2017-06-25 00:01:10 +00:00
}
}
2017-04-01 07:20:54 +00:00
2017-06-25 00:01:10 +00:00
2017-06-25 00:51:51 +00:00
void ReplicatedMergeTreeBlockOutputStream : : writeExistingPart ( MergeTreeData : : MutableDataPartPtr & part )
{
2017-10-24 19:32:23 +00:00
last_block_is_duplicate = false ;
2018-05-21 13:49:54 +00:00
/// NOTE: No delay in this case. That's Ok.
2017-06-25 00:51:51 +00:00
auto zookeeper = storage . getZooKeeper ( ) ;
assertSessionIsNotExpired ( zookeeper ) ;
if ( quorum )
checkQuorumPrecondition ( zookeeper ) ;
Stopwatch watch ;
2018-01-23 22:56:46 +00:00
try
{
commitPart ( zookeeper , part , " " ) ;
2019-01-04 12:10:00 +00:00
PartLog : : addNewPart ( storage . global_context , part , watch . elapsed ( ) ) ;
2018-01-23 22:56:46 +00:00
}
catch ( . . . )
{
2019-01-04 12:10:00 +00:00
PartLog : : addNewPart ( storage . global_context , part , watch . elapsed ( ) , ExecutionStatus : : fromCurrentException ( __PRETTY_FUNCTION__ ) ) ;
2018-01-23 22:56:46 +00:00
throw ;
}
2017-06-25 00:51:51 +00:00
}
2020-06-13 20:59:20 +00:00
void ReplicatedMergeTreeBlockOutputStream : : commitPart (
zkutil : : ZooKeeperPtr & zookeeper , MergeTreeData : : MutableDataPartPtr & part , const String & block_id )
2017-06-25 00:01:10 +00:00
{
2020-01-16 16:15:01 +00:00
storage . check ( part - > getColumns ( ) ) ;
2017-06-25 00:01:10 +00:00
assertSessionIsNotExpired ( zookeeper ) ;
2020-06-15 18:57:38 +00:00
String temporary_part_name = part - > name ;
2018-01-19 22:37:50 +00:00
2020-06-15 18:57:38 +00:00
while ( true )
2018-01-19 22:37:50 +00:00
{
2020-06-15 18:57:38 +00:00
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
/// Allocate new block number and check for duplicates
bool deduplicate_block = ! block_id . empty ( ) ;
String block_id_path = deduplicate_block ? storage . zookeeper_path + " /blocks/ " + block_id : " " ;
auto block_number_lock = storage . allocateBlockNumber ( part - > info . partition_id , zookeeper , block_id_path ) ;
Int64 block_number ;
String existing_part_name ;
if ( block_number_lock )
{
block_number = block_number_lock - > getNumber ( ) ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// Set part attributes according to part_number. Prepare an entry for log.
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
part - > info . min_block = block_number ;
part - > info . max_block = block_number ;
part - > info . level = 0 ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
part - > name = part - > getNewName ( part - > info ) ;
}
else
{
/// This block was already written to some replica. Get the part name for it.
/// Note: race condition with DROP PARTITION operation is possible. User will get "No node" exception and it is Ok.
existing_part_name = zookeeper - > get ( storage . zookeeper_path + " /blocks/ " + block_id ) ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// If it exists on our replica, ignore it.
if ( storage . getActiveContainingPart ( existing_part_name ) )
{
LOG_INFO ( log , " Block with ID {} already exists locally as part {}; ignoring it. " , block_id , existing_part_name ) ;
part - > is_duplicate = true ;
last_block_is_duplicate = true ;
ProfileEvents : : increment ( ProfileEvents : : DuplicatedInsertedBlocks ) ;
return ;
}
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
LOG_INFO ( log , " Block with ID {} already exists on other replicas as part {}; will write it locally with that name. " ,
block_id , existing_part_name ) ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// If it does not exist, we will write a new part with existing name.
/// Note that it may also appear on filesystem right now in PreCommitted state due to concurrent inserts of the same data.
/// It will be checked when we will try to rename directory.
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
part - > name = existing_part_name ;
part - > info = MergeTreePartInfo : : fromPartName ( existing_part_name , storage . format_version ) ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// Don't do subsequent duplicate check.
block_id_path . clear ( ) ;
}
StorageReplicatedMergeTree : : LogEntry log_entry ;
log_entry . type = StorageReplicatedMergeTree : : LogEntry : : GET_PART ;
log_entry . create_time = time ( nullptr ) ;
log_entry . source_replica = storage . replica_name ;
log_entry . new_part_name = part - > name ;
log_entry . quorum = quorum ;
log_entry . block_id = block_id ;
/// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// Information about the part.
Coordination : : Requests ops ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
storage . getCommitPartOps ( ops , part , block_id_path ) ;
2017-06-25 00:01:10 +00:00
2020-06-15 18:57:38 +00:00
/// Replication log.
ops . emplace_back ( zkutil : : makeCreateRequest (
storage . zookeeper_path + " /log/log- " ,
log_entry . toString ( ) ,
zkutil : : CreateMode : : PersistentSequential ) ) ;
/// Deletes the information that the block number is used for writing.
if ( block_number_lock )
block_number_lock - > getUnlockOps ( ops ) ;
/** If you need a quorum - create a node in which the quorum is monitored.
* ( If such a node already exists , then someone has managed to make another quorum record at the same time ,
* but for it the quorum has not yet been reached .
2017-06-25 00:01:10 +00:00
* You can not do the next quorum record at this time . )
*/
2020-06-15 18:57:38 +00:00
if ( quorum ) /// TODO Duplicate blocks.
{
ReplicatedMergeTreeQuorumEntry quorum_entry ;
quorum_entry . part_name = part - > name ;
quorum_entry . required_number_of_replicas = quorum ;
quorum_entry . replicas . insert ( storage . replica_name ) ;
/** At this point, this node will contain information that the current replica received a part.
* When other replicas will receive this part ( in the usual way , processing the replication log ) ,
* they will add themselves to the contents of this node .
* When it contains information about ` quorum ` number of replicas , this node is deleted ,
* which indicates that the quorum has been reached .
*/
ops . emplace_back (
zkutil : : makeCreateRequest (
quorum_info . status_path ,
quorum_entry . toString ( ) ,
zkutil : : CreateMode : : Persistent ) ) ;
/// Make sure that during the insertion time, the replica was not reinitialized or disabled (when the server is finished).
ops . emplace_back (
zkutil : : makeCheckRequest (
storage . replica_path + " /is_active " ,
quorum_info . is_active_node_version ) ) ;
/// Unfortunately, just checking the above is not enough, because `is_active` node can be deleted and reappear with the same version.
/// But then the `host` value will change. We will check this.
/// It's great that these two nodes change in the same transaction (see MergeTreeRestartingThread).
ops . emplace_back (
zkutil : : makeCheckRequest (
storage . replica_path + " /host " ,
quorum_info . host_node_version ) ) ;
}
2017-04-01 07:20:54 +00:00
2020-06-15 18:57:38 +00:00
MergeTreeData : : Transaction transaction ( storage ) ; /// If you can not add a part to ZK, we'll remove it back from the working set.
bool renamed = false ;
try
{
renamed = storage . renameTempPartAndAdd ( part , nullptr , & transaction ) ;
}
catch ( const Exception & e )
{
if ( e . code ( ) ! = ErrorCodes : : DUPLICATE_DATA_PART )
throw ;
}
if ( ! renamed )
{
if ( ! existing_part_name . empty ( ) )
{
LOG_INFO ( log , " Part {} is duplicate and it is already written by concurrent request; ignoring it. " , block_id , existing_part_name ) ;
return ;
}
else
throw Exception ( " Part with name {} is already written by concurrent request. It should not happen for non-duplicate data parts because unique names are assigned for them. It's a bug " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2018-01-19 22:37:50 +00:00
2020-06-15 18:57:38 +00:00
Coordination : : Responses responses ;
Coordination : : Error multi_code = zookeeper - > tryMultiNoThrow ( ops , responses ) ; /// 1 RTT
2018-03-15 17:04:47 +00:00
2020-06-15 18:57:38 +00:00
if ( multi_code = = Coordination : : Error : : ZOK )
{
transaction . commit ( ) ;
storage . merge_selecting_task - > schedule ( ) ;
2018-01-19 22:37:50 +00:00
2020-06-15 18:57:38 +00:00
/// Lock nodes have been already deleted, do not delete them in destructor
if ( block_number_lock )
block_number_lock - > assumeUnlocked ( ) ;
}
else if ( multi_code = = Coordination : : Error : : ZCONNECTIONLOSS
| | multi_code = = Coordination : : Error : : ZOPERATIONTIMEOUT )
{
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied , the inserted block appeared in ` / blocks / ` , and it can not be inserted again .
*/
transaction . commit ( ) ;
storage . enqueuePartForCheck ( part - > name , MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER ) ;
/// We do not know whether or not data has been inserted.
throw Exception ( " Unknown status, client must retry. Reason: " + String ( Coordination : : errorMessage ( multi_code ) ) ,
ErrorCodes : : UNKNOWN_STATUS_OF_INSERT ) ;
}
else if ( Coordination : : isUserError ( multi_code ) )
2017-04-01 07:20:54 +00:00
{
2020-06-15 18:57:38 +00:00
String failed_op_path = zkutil : : KeeperMultiException ( multi_code , ops , responses ) . getPathForFirstFailedOp ( ) ;
2018-01-19 22:37:50 +00:00
2020-06-15 18:57:38 +00:00
if ( multi_code = = Coordination : : Error : : ZNODEEXISTS & & deduplicate_block & & failed_op_path = = block_id_path )
{
/// Block with the same id have just appeared in table (or other replica), rollback thee insertion.
LOG_INFO ( log , " Block with ID {} already exists (it was just appeared). Renaming part {} back to {}. Will retry write. " ,
block_id , part - > name , temporary_part_name ) ;
transaction . rollback ( ) ;
part - > is_duplicate = true ;
part - > is_temp = true ;
part - > state = MergeTreeDataPartState : : Temporary ;
part - > renameTo ( temporary_part_name ) ;
continue ;
}
else if ( multi_code = = Coordination : : Error : : ZNODEEXISTS & & failed_op_path = = quorum_info . status_path )
{
transaction . rollback ( ) ;
throw Exception ( " Another quorum insert has been already started " , ErrorCodes : : UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE ) ;
}
else
{
/// NOTE: We could be here if the node with the quorum existed, but was quickly removed.
transaction . rollback ( ) ;
throw Exception ( " Unexpected logical error while adding block " + toString ( block_number ) + " with ID ' " + block_id + " ': "
+ Coordination : : errorMessage ( multi_code ) + " , path " + failed_op_path ,
ErrorCodes : : UNEXPECTED_ZOOKEEPER_ERROR ) ;
}
2017-04-01 07:20:54 +00:00
}
2020-06-15 18:57:38 +00:00
else if ( Coordination : : isHardwareError ( multi_code ) )
2017-04-01 07:20:54 +00:00
{
2018-01-19 22:37:50 +00:00
transaction . rollback ( ) ;
2020-06-15 18:57:38 +00:00
throw Exception ( " Unrecoverable network error while adding block " + toString ( block_number ) + " with ID ' " + block_id + " ': "
+ Coordination : : errorMessage ( multi_code ) , ErrorCodes : : UNEXPECTED_ZOOKEEPER_ERROR ) ;
2017-04-01 07:20:54 +00:00
}
2017-06-25 00:01:10 +00:00
else
2017-04-01 07:20:54 +00:00
{
2018-01-19 22:37:50 +00:00
transaction . rollback ( ) ;
2020-06-15 18:57:38 +00:00
throw Exception ( " Unexpected ZooKeeper error while adding block " + toString ( block_number ) + " with ID ' " + block_id + " ': "
+ Coordination : : errorMessage ( multi_code ) , ErrorCodes : : UNEXPECTED_ZOOKEEPER_ERROR ) ;
2017-06-25 00:01:10 +00:00
}
2020-06-15 18:57:38 +00:00
break ;
2018-01-19 22:37:50 +00:00
}
2017-06-25 00:01:10 +00:00
if ( quorum )
{
/// We are waiting for quorum to be satisfied.
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Waiting for quorum " ) ;
2017-06-25 00:01:10 +00:00
String quorum_status_path = storage . zookeeper_path + " /quorum/status " ;
try
{
while ( true )
2017-04-01 07:20:54 +00:00
{
2017-06-25 00:01:10 +00:00
zkutil : : EventPtr event = std : : make_shared < Poco : : Event > ( ) ;
std : : string value ;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if ( ! zookeeper - > tryGet ( quorum_status_path , value , nullptr , event ) )
break ;
ReplicatedMergeTreeQuorumEntry quorum_entry ( value ) ;
/// If the node has time to disappear, and then appear again for the next insert.
2020-06-15 18:57:38 +00:00
if ( quorum_entry . part_name ! = part - > name )
2017-06-25 00:01:10 +00:00
break ;
if ( ! event - > tryWait ( quorum_timeout_ms ) )
2018-11-22 21:19:58 +00:00
throw Exception ( " Timeout while waiting for quorum " , ErrorCodes : : TIMEOUT_EXCEEDED ) ;
2017-04-01 07:20:54 +00:00
}
2017-06-25 00:01:10 +00:00
/// And what if it is possible that the current replica at this time has ceased to be active and the quorum is marked as failed and deleted?
String value ;
if ( ! zookeeper - > tryGet ( storage . replica_path + " /is_active " , value , nullptr )
| | value ! = quorum_info . is_active_node_value )
2018-11-22 21:19:58 +00:00
throw Exception ( " Replica become inactive while waiting for quorum " , ErrorCodes : : NO_ACTIVE_REPLICAS ) ;
2017-04-01 07:20:54 +00:00
}
2017-06-25 00:01:10 +00:00
catch ( . . . )
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception ( " Unknown status, client must retry. Reason: " + getCurrentExceptionMessage ( false ) ,
ErrorCodes : : UNKNOWN_STATUS_OF_INSERT ) ;
}
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Quorum satisfied " ) ;
2017-04-01 07:20:54 +00:00
}
2016-01-17 05:22:22 +00:00
}
2018-05-21 23:17:57 +00:00
void ReplicatedMergeTreeBlockOutputStream : : writePrefix ( )
{
2019-05-03 02:00:57 +00:00
storage . throwInsertIfNeeded ( ) ;
2018-05-21 23:17:57 +00:00
}
2016-01-17 05:22:22 +00:00
}