2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/DataPartsExchange.h>
|
2018-03-06 20:18:34 +00:00
|
|
|
#include <Storages/IStorage.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
#include <Common/NetException.h>
|
2017-07-13 20:58:19 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2018-06-14 21:20:39 +00:00
|
|
|
#include <IO/HTTPCommon.h>
|
2017-04-06 18:32:00 +00:00
|
|
|
#include <IO/ReadWriteBufferFromHTTP.h>
|
2017-01-21 04:24:28 +00:00
|
|
|
#include <Poco/File.h>
|
2017-06-06 17:18:32 +00:00
|
|
|
#include <ext/scope_guard.h>
|
2017-04-06 13:03:23 +00:00
|
|
|
#include <Poco/Net/HTTPServerResponse.h>
|
2017-04-11 14:13:19 +00:00
|
|
|
#include <Poco/Net/HTTPRequest.h>
|
2014-07-22 13:49:52 +00:00
|
|
|
|
|
|
|
|
2016-10-24 04:06:27 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric ReplicatedSend;
|
|
|
|
extern const Metric ReplicatedFetch;
|
2016-10-24 04:06:27 +00:00
|
|
|
}
|
|
|
|
|
2014-07-22 13:49:52 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int ABORTED;
|
|
|
|
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
|
2017-08-09 13:31:13 +00:00
|
|
|
extern const int CANNOT_WRITE_TO_OSTREAM;
|
2018-03-23 16:33:51 +00:00
|
|
|
extern const int UNKNOWN_TABLE;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
namespace DataPartsExchange
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
|
|
|
|
|
|
|
std::string getEndpointId(const std::string & node_id)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return "DataPartsExchange:" + node_id;
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
std::string Service::getId(const std::string & node_id) const
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return getEndpointId(node_id);
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
2016-01-11 21:46:36 +00:00
|
|
|
|
2017-12-01 21:40:58 +00:00
|
|
|
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
|
2014-07-22 13:49:52 +00:00
|
|
|
{
|
2017-10-06 16:53:55 +00:00
|
|
|
if (blocker.isCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
|
|
|
|
|
|
|
String part_name = params.get("part");
|
|
|
|
|
2017-04-06 13:03:23 +00:00
|
|
|
static std::atomic_uint total_sends {0};
|
|
|
|
|
2017-04-20 18:37:01 +00:00
|
|
|
if ((data.settings.replicated_max_parallel_sends && total_sends >= data.settings.replicated_max_parallel_sends)
|
|
|
|
|| (data.settings.replicated_max_parallel_sends_for_table && data.current_table_sends >= data.settings.replicated_max_parallel_sends_for_table))
|
2017-04-06 13:03:23 +00:00
|
|
|
{
|
|
|
|
response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS));
|
|
|
|
response.setReason("Too many concurrent fetches, try again later");
|
|
|
|
response.set("Retry-After", "10");
|
|
|
|
response.setChunkedTransferEncoding(false);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
++total_sends;
|
|
|
|
SCOPE_EXIT({--total_sends;});
|
|
|
|
|
|
|
|
++data.current_table_sends;
|
|
|
|
SCOPE_EXIT({--data.current_table_sends;});
|
|
|
|
|
2018-03-23 16:33:51 +00:00
|
|
|
StoragePtr owned_storage = storage.lock();
|
|
|
|
if (!owned_storage)
|
|
|
|
throw Exception("The table was already dropped", ErrorCodes::UNKNOWN_TABLE);
|
2017-04-06 13:03:23 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_TRACE(log, "Sending part " << part_name);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2017-09-01 15:05:23 +00:00
|
|
|
auto storage_lock = owned_storage->lockStructure(false, __PRETTY_FUNCTION__);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-11-18 02:34:00 +00:00
|
|
|
MergeTreeData::DataPartPtr part = findPart(part_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-07-28 17:34:02 +00:00
|
|
|
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
|
|
|
|
|
|
|
|
/// We'll take a list of files from the list of checksums.
|
|
|
|
MergeTreeData::DataPart::Checksums checksums = part->checksums;
|
|
|
|
/// Add files that are not in the checksum list.
|
|
|
|
checksums.files["checksums.txt"];
|
|
|
|
checksums.files["columns.txt"];
|
|
|
|
|
|
|
|
MergeTreeData::DataPart::Checksums data_checksums;
|
|
|
|
|
|
|
|
writeBinary(checksums.files.size(), out);
|
|
|
|
for (const auto & it : checksums.files)
|
|
|
|
{
|
|
|
|
String file_name = it.first;
|
|
|
|
|
2017-11-18 02:34:00 +00:00
|
|
|
String path = data.getFullPath() + part_name + "/" + file_name;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
UInt64 size = Poco::File(path).getSize();
|
|
|
|
|
|
|
|
writeStringBinary(it.first, out);
|
|
|
|
writeBinary(size, out);
|
|
|
|
|
|
|
|
ReadBufferFromFile file_in(path);
|
|
|
|
HashingWriteBuffer hashing_out(out);
|
2017-10-12 20:34:01 +00:00
|
|
|
copyData(file_in, hashing_out, blocker.getCounter());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-06 16:53:55 +00:00
|
|
|
if (blocker.isCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
|
|
|
|
|
|
|
if (hashing_out.count() != size)
|
|
|
|
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
|
|
|
|
|
2017-06-21 01:24:05 +00:00
|
|
|
writePODBinary(hashing_out.getHash(), out);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (file_name != "checksums.txt" &&
|
|
|
|
file_name != "columns.txt")
|
|
|
|
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
|
|
|
|
}
|
|
|
|
|
|
|
|
part->checksums.checkEqual(data_checksums, false);
|
|
|
|
}
|
2018-08-10 04:02:56 +00:00
|
|
|
catch (const NetException &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-23 16:33:51 +00:00
|
|
|
/// Network error or error on remote side. No need to enqueue part for check.
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
2017-08-09 13:31:13 +00:00
|
|
|
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
|
2017-12-26 19:15:15 +00:00
|
|
|
data.reportBrokenPart(part_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2017-12-26 19:15:15 +00:00
|
|
|
data.reportBrokenPart(part_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
2014-07-22 13:49:52 +00:00
|
|
|
}
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
|
|
|
{
|
2017-12-18 17:26:46 +00:00
|
|
|
/// It is important to include PreCommitted and Outdated parts here because remote replicas cannot reliably
|
|
|
|
/// determine the local state of the part, so queries for the parts in these states are completely normal.
|
|
|
|
auto part = data.getPartIfExists(
|
|
|
|
name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
|
2017-04-01 07:20:54 +00:00
|
|
|
if (part)
|
|
|
|
return part;
|
2017-10-06 15:17:14 +00:00
|
|
|
|
|
|
|
throw Exception("No part " + name + " in table", ErrorCodes::NO_SUCH_DATA_PART);
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & part_name,
|
|
|
|
const String & replica_path,
|
|
|
|
const String & host,
|
|
|
|
int port,
|
2017-12-27 17:58:52 +00:00
|
|
|
const ConnectionTimeouts & timeouts,
|
2018-07-26 15:10:57 +00:00
|
|
|
const String & user,
|
|
|
|
const String & password,
|
2018-07-30 18:32:21 +00:00
|
|
|
const String & interserver_scheme,
|
2018-05-21 13:49:54 +00:00
|
|
|
bool to_detached,
|
|
|
|
const String & tmp_prefix_)
|
2014-07-22 13:49:52 +00:00
|
|
|
{
|
2017-04-06 18:32:00 +00:00
|
|
|
Poco::URI uri;
|
2018-07-30 18:32:21 +00:00
|
|
|
uri.setScheme(interserver_scheme);
|
2017-04-06 18:32:00 +00:00
|
|
|
uri.setHost(host);
|
|
|
|
uri.setPort(port);
|
|
|
|
uri.setQueryParameters(
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
{"endpoint", getEndpointId(replica_path)},
|
|
|
|
{"part", part_name},
|
|
|
|
{"compress", "false"}
|
2017-11-17 20:42:03 +00:00
|
|
|
});
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2018-07-26 15:10:57 +00:00
|
|
|
Poco::Net::HTTPBasicCredentials creds{};
|
|
|
|
if (!user.empty())
|
|
|
|
{
|
|
|
|
creds.setUsername(user);
|
|
|
|
creds.setPassword(password);
|
|
|
|
}
|
|
|
|
|
2018-11-16 13:15:17 +00:00
|
|
|
PooledReadWriteBufferFromHTTP in{
|
|
|
|
uri,
|
|
|
|
Poco::Net::HTTPRequest::HTTP_POST,
|
|
|
|
{},
|
|
|
|
timeouts,
|
|
|
|
creds,
|
|
|
|
DBMS_DEFAULT_BUFFER_SIZE,
|
|
|
|
data.settings.replicated_max_parallel_fetches_for_host
|
|
|
|
};
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-05-26 00:47:06 +00:00
|
|
|
static const String TMP_PREFIX = "tmp_fetch_";
|
2018-05-21 13:49:54 +00:00
|
|
|
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
|
|
|
|
|
|
|
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
2017-05-16 15:40:32 +00:00
|
|
|
String absolute_part_path = data.getFullPath() + relative_part_path + "/";
|
|
|
|
Poco::File part_file(absolute_part_path);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (part_file.exists())
|
2017-05-16 15:40:32 +00:00
|
|
|
throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
|
|
|
|
|
|
|
|
part_file.createDirectory();
|
|
|
|
|
2017-08-16 19:24:50 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_name);
|
2017-05-16 15:40:32 +00:00
|
|
|
new_data_part->relative_path = relative_part_path;
|
2017-04-01 07:20:54 +00:00
|
|
|
new_data_part->is_temp = true;
|
|
|
|
|
|
|
|
size_t files;
|
|
|
|
readBinary(files, in);
|
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
for (size_t i = 0; i < files; ++i)
|
|
|
|
{
|
|
|
|
String file_name;
|
|
|
|
UInt64 file_size;
|
|
|
|
|
|
|
|
readStringBinary(file_name, in);
|
|
|
|
readBinary(file_size, in);
|
|
|
|
|
2017-05-16 15:40:32 +00:00
|
|
|
WriteBufferFromFile file_out(absolute_part_path + file_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
HashingWriteBuffer hashing_out(file_out);
|
2017-10-12 20:34:01 +00:00
|
|
|
copyData(in, hashing_out, file_size, blocker.getCounter());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-06 16:53:55 +00:00
|
|
|
if (blocker.isCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
|
|
|
|
/// And now we check it only between read chunks (in the `copyData` function).
|
|
|
|
part_file.remove(true);
|
|
|
|
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
|
|
|
}
|
|
|
|
|
2017-06-21 01:24:05 +00:00
|
|
|
MergeTreeDataPartChecksum::uint128 expected_hash;
|
|
|
|
readPODBinary(expected_hash, in);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (expected_hash != hashing_out.getHash())
|
2017-05-16 15:40:32 +00:00
|
|
|
throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (file_name != "checksums.txt" &&
|
|
|
|
file_name != "columns.txt")
|
|
|
|
checksums.addFile(file_name, file_size, expected_hash);
|
|
|
|
}
|
|
|
|
|
|
|
|
assertEOF(in);
|
|
|
|
|
2017-08-04 14:00:26 +00:00
|
|
|
new_data_part->modification_time = time(nullptr);
|
2017-08-16 19:24:50 +00:00
|
|
|
new_data_part->loadColumnsChecksumsIndexes(true, false);
|
2017-04-01 07:20:54 +00:00
|
|
|
new_data_part->checksums.checkEqual(checksums, false);
|
|
|
|
|
|
|
|
return new_data_part;
|
2014-07-22 13:49:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2016-01-28 01:00:27 +00:00
|
|
|
|
|
|
|
}
|