Disable part sendings and fetches before ALTER. [#CLICKHOUSE-3343]

This commit is contained in:
Vitaliy Lyudvichenko 2017-10-06 19:53:55 +03:00 committed by alexey-milovidov
parent 83925f1d5e
commit 62ea1133dd
17 changed files with 128 additions and 104 deletions

View File

@ -0,0 +1,59 @@
#pragma once
#include <atomic>
namespace DB
{
/// An atomic variable that is used to block and interrupt certain actions
/// If it is not zero then actions related with it should be considered as interrupted
struct ActionBlocker
{
mutable std::atomic<int> counter{0};
bool isCancelled() const { return counter > 0; }
/// Temporarily blocks merges (while the returned object is alive)
struct BlockHolder;
BlockHolder cancel() const { return BlockHolder(this); }
// Cancel the actions forever.
void cancelForever() const { ++counter; }
/// Blocks related action while a BlockerHolder instance exists
struct BlockHolder
{
explicit BlockHolder(const ActionBlocker * var_ = nullptr) : var(var_)
{
if (var)
++var->counter;
}
BlockHolder(BlockHolder && other) noexcept
{
var = other.var;
other.var = nullptr;
}
BlockHolder & operator=(BlockHolder && other) noexcept
{
var = other.var;
other.var = nullptr;
return *this;
}
BlockHolder(const BlockHolder & other) = delete;
BlockHolder & operator=(const BlockHolder & other) = delete;
~BlockHolder()
{
if (var)
--var->counter;
}
private:
const ActionBlocker * var = nullptr;
};
};
}

View File

@ -10,7 +10,7 @@ namespace DB
namespace
{
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<bool> * is_cancelled)
void copyDataImpl(ReadBuffer & from, WriteBuffer & to, bool check_bytes, size_t bytes, std::atomic<int> * is_cancelled)
{
/// If read to the end of the buffer, eof() either fills the buffer with new data and moves the cursor to the beginning, or returns false.
while (bytes > 0 && !from.eof())
@ -55,7 +55,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to)
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancelled)
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, false, std::numeric_limits<size_t>::max(), &is_cancelled);
}
@ -70,7 +70,7 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes)
copyDataImpl(from, to, true, bytes, nullptr);
}
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<bool> & is_cancelled)
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<int> & is_cancelled)
{
copyDataImpl(from, to, true, bytes, &is_cancelled);
}

View File

@ -21,8 +21,8 @@ void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes);
/** The same, with the condition to cancel.
*/
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<bool> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, std::atomic<int> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::atomic<int> & is_cancelled);
void copyData(ReadBuffer & from, WriteBuffer & to, std::function<void()> cancellation_hook);
void copyData(ReadBuffer & from, WriteBuffer & to, size_t bytes, std::function<void()> cancellation_hook);

View File

@ -6,9 +6,11 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/ActionBlocker.h>
#include <Core/Types.h>
#include <map>
#include <atomic>
#include <utility>
#include <Poco/Net/HTMLForm.h>
namespace Poco { namespace Net { class HTTPServerResponse; } }
@ -67,11 +69,8 @@ public:
virtual void processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) = 0;
virtual ~InterserverIOEndpoint() {}
void cancel() { is_cancelled = true; }
protected:
/// You need to stop the data transfer.
std::atomic<bool> is_cancelled {false};
/// You need to stop the data transfer if blocker is activated.
ActionBlocker blocker;
};
using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;
@ -88,7 +87,7 @@ public:
std::lock_guard<std::mutex> lock(mutex);
if (endpoint_map.count(name))
throw Exception("Duplicate interserver IO endpoint: " + name, ErrorCodes::DUPLICATE_INTERSERVER_IO_ENDPOINT);
endpoint_map[name] = endpoint;
endpoint_map[name] = std::move(endpoint);
}
void removeEndpoint(const String & name)
@ -119,7 +118,7 @@ class InterserverIOEndpointHolder
{
public:
InterserverIOEndpointHolder(const String & name_, InterserverIOEndpointPtr endpoint_, InterserverIOHandler & handler_)
: name(name_), endpoint(endpoint_), handler(handler_)
: name(name_), endpoint(std::move(endpoint_)), handler(handler_)
{
handler.addEndpoint(name, endpoint);
}
@ -143,7 +142,9 @@ public:
}
}
void cancel() { endpoint->cancel(); }
ActionBlocker & getBlocker() { return endpoint->blocker; }
void cancelForever() { getBlocker().cancelForever(); }
ActionBlocker::BlockHolder cancel() { return getBlocker().cancel(); }
private:
String name;

View File

@ -47,7 +47,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
String part_name = params.get("part");
@ -120,9 +120,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
ReadBufferFromFile file_in(path);
HashingWriteBuffer hashing_out(out);
copyData(file_in, hashing_out, is_cancelled);
copyData(file_in, hashing_out, blocker.counter);
if (is_cancelled)
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
@ -181,14 +181,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
return fetchPartImpl(part_name, replica_path, host, port, "", to_detached);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no)
{
return fetchPartImpl(part_name, location.name, location.host, location.port, toString(shard_no), true);
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
const String & part_name,
const String & replica_path,
@ -241,9 +233,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
WriteBufferFromFile file_out(absolute_part_path + file_name);
HashingWriteBuffer hashing_out(file_out);
copyData(in, hashing_out, file_size, is_cancelled);
copyData(in, hashing_out, file_size, blocker.counter);
if (is_cancelled)
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).

View File

@ -54,14 +54,8 @@ public:
int port,
bool to_detached = false);
/// Method for resharding. Downloads a sharded part
/// from the specified shard to the `to_detached` folder.
MergeTreeData::MutableDataPartPtr fetchShardedPart(
const InterserverIOEndpointLocation & location,
const String & part_name,
size_t shard_no);
void cancel() { is_cancelled = true; }
/// You need to stop the data transfer.
ActionBlocker blocker;
private:
MergeTreeData::MutableDataPartPtr fetchPartImpl(
@ -74,8 +68,6 @@ private:
private:
MergeTreeData & data;
/// You need to stop the data transfer.
std::atomic<bool> is_cancelled {false};
Logger * log;
};

View File

@ -479,7 +479,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
{
static const String TMP_PREFIX = "tmp_merge_";
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
const MergeTreeData::DataPartsVector & parts = future_part.parts;
@ -633,7 +633,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
Block block;
while (!isCancelled() && (block = merged_stream->read()))
while (!merges_blocker.isCancelled() && (block = merged_stream->read()))
{
rows_written += block.rows();
to.write(block);
@ -656,7 +656,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merged_stream->readSuffix();
merged_stream.reset();
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
MergeTreeData::DataPart::Checksums checksums_gathered_columns;
@ -727,7 +727,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
merge_entry->bytes_written_uncompressed += column_gathered_stream.getProfileInfo().bytes;
merge_entry->progress = progress_before + column_sizes.columnProgress(column_name, sum_input_rows_exact, sum_input_rows_exact);
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled merging parts", ErrorCodes::ABORTED);
}
@ -1097,7 +1097,7 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP
void MergeTreeDataMerger::abortReshardPartitionIfRequested()
{
if (isCancelled())
if (merges_blocker.isCancelled())
throw Exception("Cancelled partition resharding", ErrorCodes::ABORTED);
if (cancellation_hook)

View File

@ -4,6 +4,8 @@
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <atomic>
#include <functional>
#include <Common/ActionBlocker.h>
namespace DB
{
@ -110,42 +112,15 @@ private:
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id);
/** Temporarily cancel merges.
*/
class BlockerImpl
{
public:
BlockerImpl(MergeTreeDataMerger * merger_) : merger(merger_)
{
++merger->cancelled;
}
~BlockerImpl()
{
--merger->cancelled;
}
private:
MergeTreeDataMerger * merger;
};
public:
/** Cancel all merges. All currently running 'mergeParts' methods will throw exception soon.
* All new calls to 'mergeParts' will throw exception till all 'Blocker' objects will be destroyed.
/** Is used to cancel all merges. On cancel() call all currently running 'mergeParts' methods will throw exception soon.
* All new calls to 'mergeParts' will throw exception till all 'BlockHolder' objects will be destroyed.
*/
using Blocker = std::unique_ptr<BlockerImpl>;
Blocker cancel() { return std::make_unique<BlockerImpl>(this); }
/** Cancel all merges forever.
*/
void cancelForever() { ++cancelled; }
bool isCancelled() const { return cancelled > 0; }
public:
ActionBlocker merges_blocker;
enum class MergeAlgorithm
{
Horizontal, /// per-row merge of all columns
Horizontal, /// per-row merge of all columns
Vertical /// per-row merge of PK columns, per-column gather for non-PK columns
};
@ -166,8 +141,6 @@ private:
CancellationHook cancellation_hook;
std::atomic<int> cancelled {0};
void abortReshardPartitionIfRequested();
};

View File

@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception{"RemoteDiskSpaceMonitor service terminated", ErrorCodes::ABORTED};
size_t free_space = DiskSpaceMonitor::getUnreservedFreeSpace(context.getPath());

View File

@ -39,7 +39,7 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
if (is_cancelled)
if (blocker.isCancelled())
throw Exception{"RemoteQueryExecutor service terminated", ErrorCodes::ABORTED};
std::string query = params.get("query");

View File

@ -67,10 +67,10 @@ void ReplicatedMergeTreeAlterThread::run()
{
/// If you need to lock table structure, then suspend merges.
MergeTreeDataMerger::Blocker merge_blocker;
ActionBlocker::BlockHolder merge_blocker;
if (changed_version || force_recheck_parts)
merge_blocker = storage.merger.cancel();
merge_blocker = storage.merger.merges_blocker.cancel();
MergeTreeData::DataParts parts;
@ -80,6 +80,14 @@ void ReplicatedMergeTreeAlterThread::run()
/// Temporarily cancel part checks to avoid locking for long time.
auto temporarily_stop_part_checks = storage.part_check_thread.temporarilyStop();
/// Temporarily cancel parts sending
ActionBlocker::BlockHolder data_parts_exchange_blocker;
if (storage.data_parts_exchange_endpoint_holder)
data_parts_exchange_blocker = storage.data_parts_exchange_endpoint_holder->cancel();
/// Temporarily cancel part fetches
auto fetches_blocker = storage.fetcher.blocker.cancel();
LOG_INFO(log, "Changed version of 'columns' node in ZooKeeper. Waiting for structure write lock.");
auto table_lock = storage.lockStructureForAlter(__PRETTY_FUNCTION__);

View File

@ -594,7 +594,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
sum_parts_size_in_bytes += part->size_in_bytes;
}
if (merger.isCancelled())
if (merger.merges_blocker.isCancelled())
{
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges are cancelled now.";
LOG_DEBUG(log, reason);

View File

@ -160,22 +160,22 @@ void ReplicatedMergeTreeRestartingThread::run()
try
{
storage.endpoint_holder->cancel();
storage.endpoint_holder = nullptr;
storage.data_parts_exchange_endpoint_holder->cancelForever();
storage.data_parts_exchange_endpoint_holder = nullptr;
storage.disk_space_monitor_endpoint_holder->cancel();
storage.disk_space_monitor_endpoint_holder->cancelForever();
storage.disk_space_monitor_endpoint_holder = nullptr;
storage.sharded_partition_uploader_endpoint_holder->cancel();
storage.sharded_partition_uploader_endpoint_holder->cancelForever();
storage.sharded_partition_uploader_endpoint_holder = nullptr;
storage.remote_query_executor_endpoint_holder->cancel();
storage.remote_query_executor_endpoint_holder->cancelForever();
storage.remote_query_executor_endpoint_holder = nullptr;
storage.remote_part_checker_endpoint_holder->cancel();
storage.remote_part_checker_endpoint_holder->cancelForever();
storage.remote_part_checker_endpoint_holder = nullptr;
storage.merger.cancelForever();
storage.merger.merges_blocker.cancelForever();
partialShutdown();

View File

@ -82,9 +82,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
WriteBufferFromFile file_out{absolute_part_path + file_name};
HashingWriteBuffer hashing_out{file_out};
copyData(body, hashing_out, file_size, is_cancelled);
copyData(body, hashing_out, file_size, blocker.counter);
if (is_cancelled)
if (blocker.isCancelled())
{
part_file.remove(true);
throw Exception{"Fetching of part was cancelled", ErrorCodes::ABORTED};
@ -96,8 +96,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
if (expected_hash != hashing_out.getHash())
throw Exception{"Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path};
if (file_name != "checksums.txt" &&
file_name != "columns.txt")
if (file_name != "checksums.txt" && file_name != "columns.txt")
checksums.addFile(file_name, file_size, expected_hash);
}

View File

@ -91,7 +91,7 @@ void StorageMergeTree::shutdown()
if (shutdown_called)
return;
shutdown_called = true;
merger.cancelForever();
merger.merges_blocker.cancelForever();
if (merge_task_handle)
background_pool.removeTask(merge_task_handle);
}
@ -151,7 +151,7 @@ void StorageMergeTree::alter(
const Context & context)
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);
@ -402,7 +402,7 @@ void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Fi
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__);
@ -462,7 +462,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partit
{
/// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger.cancel();
auto merge_blocker = merger.merges_blocker.cancel();
/// Waits for completion of merge and does not start new ones.
auto lock = lockForAlter(__PRETTY_FUNCTION__);

View File

@ -335,7 +335,7 @@ StoragePtr StorageReplicatedMergeTree::create(
{
{
InterserverIOEndpointPtr endpoint = std::make_shared<DataPartsExchange::Service>(res->data, res_ptr);
res->endpoint_holder = get_endpoint_holder(endpoint);
res->data_parts_exchange_endpoint_holder = get_endpoint_holder(endpoint);
}
/// Services for resharding.
@ -2275,7 +2275,7 @@ void StorageReplicatedMergeTree::shutdown()
* Because restarting_thread will wait for finishing of tasks in background pool,
* and parts are fetched in that tasks.
*/
fetcher.cancel();
fetcher.blocker.cancelForever();
if (restarting_thread)
{
@ -2283,36 +2283,36 @@ void StorageReplicatedMergeTree::shutdown()
restarting_thread.reset();
}
if (endpoint_holder)
if (data_parts_exchange_endpoint_holder)
{
endpoint_holder->cancel();
endpoint_holder = nullptr;
data_parts_exchange_endpoint_holder->cancelForever();
data_parts_exchange_endpoint_holder = nullptr;
}
if (disk_space_monitor_endpoint_holder)
{
disk_space_monitor_endpoint_holder->cancel();
disk_space_monitor_endpoint_holder->cancelForever();
disk_space_monitor_endpoint_holder = nullptr;
}
disk_space_monitor_client.cancel();
if (sharded_partition_uploader_endpoint_holder)
{
sharded_partition_uploader_endpoint_holder->cancel();
sharded_partition_uploader_endpoint_holder->cancelForever();
sharded_partition_uploader_endpoint_holder = nullptr;
}
sharded_partition_uploader_client.cancel();
if (remote_query_executor_endpoint_holder)
{
remote_query_executor_endpoint_holder->cancel();
remote_query_executor_endpoint_holder->cancelForever();
remote_query_executor_endpoint_holder = nullptr;
}
remote_query_executor_client.cancel();
if (remote_part_checker_endpoint_holder)
{
remote_part_checker_endpoint_holder->cancel();
remote_part_checker_endpoint_holder->cancelForever();
remote_part_checker_endpoint_holder = nullptr;
}
}

View File

@ -258,7 +258,7 @@ private:
bool is_leader_node = false;
std::mutex leader_node_mutex;
InterserverIOEndpointHolderPtr endpoint_holder;
InterserverIOEndpointHolderPtr data_parts_exchange_endpoint_holder;
InterserverIOEndpointHolderPtr disk_space_monitor_endpoint_holder;
InterserverIOEndpointHolderPtr sharded_partition_uploader_endpoint_holder;
InterserverIOEndpointHolderPtr remote_query_executor_endpoint_holder;