2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/RemoteBlockOutputStream.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <DataStreams/NativeBlockInputStream.h>
|
2021-07-20 18:18:43 +00:00
|
|
|
#include <Processors/Sources/SourceWithProgress.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/escapeForFileName.h>
|
|
|
|
#include <Common/CurrentMetrics.h>
|
2018-01-15 19:07:47 +00:00
|
|
|
#include <Common/StringUtils/StringUtils.h>
|
2017-07-21 12:03:37 +00:00
|
|
|
#include <Common/SipHash.h>
|
2019-11-05 19:31:07 +00:00
|
|
|
#include <Common/quoteString.h>
|
2019-11-10 11:11:57 +00:00
|
|
|
#include <Common/hex.h>
|
2020-06-03 23:50:47 +00:00
|
|
|
#include <Common/ActionBlocker.h>
|
2021-04-07 20:56:15 +00:00
|
|
|
#include <Common/formatReadable.h>
|
|
|
|
#include <Common/Stopwatch.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/StringRef.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Context.h>
|
2020-06-13 16:31:28 +00:00
|
|
|
#include <Interpreters/Cluster.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/Distributed/DirectoryMonitor.h>
|
2021-10-03 09:51:09 +00:00
|
|
|
#include <Storages/Distributed/Defines.h>
|
2020-06-03 23:50:47 +00:00
|
|
|
#include <Storages/StorageDistributed.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2019-11-16 23:12:35 +00:00
|
|
|
#include <IO/ReadBufferFromString.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <IO/WriteBufferFromFile.h>
|
2018-12-28 18:15:26 +00:00
|
|
|
#include <Compression/CompressedReadBuffer.h>
|
2021-01-10 12:37:54 +00:00
|
|
|
#include <Compression/CheckingCompressedReadBuffer.h>
|
2019-03-01 23:09:36 +00:00
|
|
|
#include <IO/ConnectionTimeouts.h>
|
2020-12-10 22:05:02 +00:00
|
|
|
#include <IO/ConnectionTimeoutsContext.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <IO/Operators.h>
|
2021-01-09 12:26:37 +00:00
|
|
|
#include <Disks/IDisk.h>
|
2016-12-12 03:33:34 +00:00
|
|
|
#include <boost/algorithm/string/find_iterator.hpp>
|
|
|
|
#include <boost/algorithm/string/finder.hpp>
|
2021-07-16 19:27:30 +00:00
|
|
|
#include <boost/range/adaptor/indexed.hpp>
|
2021-04-27 00:05:43 +00:00
|
|
|
#include <filesystem>
|
2016-12-12 03:33:34 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric DistributedSend;
|
2019-08-22 01:30:49 +00:00
|
|
|
extern const Metric DistributedFilesToInsert;
|
2021-05-04 19:42:07 +00:00
|
|
|
extern const Metric BrokenDistributedFilesToInsert;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int CANNOT_READ_ALL_DATA;
|
2019-11-26 17:20:54 +00:00
|
|
|
extern const int UNKNOWN_CODEC;
|
|
|
|
extern const int CANNOT_DECOMPRESS;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int CHECKSUM_DOESNT_MATCH;
|
|
|
|
extern const int TOO_LARGE_SIZE_COMPRESSED;
|
2017-06-22 00:56:17 +00:00
|
|
|
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
2021-01-22 18:26:47 +00:00
|
|
|
extern const int EMPTY_DATA_PASSED;
|
2021-02-15 07:45:19 +00:00
|
|
|
extern const int INCORRECT_FILE_NAME;
|
2021-05-03 07:52:45 +00:00
|
|
|
extern const int MEMORY_LIMIT_EXCEEDED;
|
|
|
|
extern const int DISTRIBUTED_BROKEN_BATCH_INFO;
|
2021-05-03 07:52:51 +00:00
|
|
|
extern const int DISTRIBUTED_BROKEN_BATCH_FILES;
|
2021-05-03 07:52:45 +00:00
|
|
|
extern const int TOO_MANY_PARTS;
|
|
|
|
extern const int TOO_MANY_BYTES;
|
|
|
|
extern const int TOO_MANY_ROWS_OR_BYTES;
|
|
|
|
extern const int TOO_MANY_PARTITIONS;
|
|
|
|
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
|
|
|
|
extern const int ARGUMENT_OUT_OF_BOUND;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2020-03-09 01:22:33 +00:00
|
|
|
constexpr const std::chrono::minutes decrease_error_count_period{5};
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
template <typename PoolFactory>
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory, const Cluster::ShardsInfo & shards_info, Poco::Logger * log)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-04-19 17:40:55 +00:00
|
|
|
ConnectionPoolPtrs pools;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
auto make_connection = [&](const Cluster::Address & address)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2021-02-15 07:45:19 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
pools.emplace_back(factory(address));
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() == ErrorCodes::INCORRECT_FILE_NAME)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log);
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
return;
|
2021-02-15 07:45:19 +00:00
|
|
|
}
|
|
|
|
throw;
|
|
|
|
}
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
|
|
|
|
{
|
|
|
|
const std::string & dirname = boost::copy_range<std::string>(*it);
|
|
|
|
Cluster::Address address = Cluster::Address::fromFullString(dirname);
|
|
|
|
if (address.shard_index && dirname.ends_with("_all_replicas"))
|
|
|
|
{
|
|
|
|
if (address.shard_index > shards_info.size())
|
|
|
|
{
|
|
|
|
LOG_ERROR(log, "No shard with shard_index={} ({})", address.shard_index, name);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
const auto & shard_info = shards_info[address.shard_index - 1];
|
|
|
|
size_t replicas = shard_info.per_replica_pools.size();
|
|
|
|
|
|
|
|
for (size_t replica_index = 1; replica_index <= replicas; ++replica_index)
|
|
|
|
{
|
|
|
|
address.replica_index = replica_index;
|
|
|
|
make_connection(address);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
make_connection(address);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return pools;
|
|
|
|
}
|
2019-11-10 11:11:57 +00:00
|
|
|
|
|
|
|
void assertChecksum(CityHash_v1_0_2::uint128 expected, CityHash_v1_0_2::uint128 calculated)
|
|
|
|
{
|
|
|
|
if (expected != calculated)
|
|
|
|
{
|
|
|
|
String message = "Checksum of extra info doesn't match: corrupted data."
|
|
|
|
" Reference: " + getHexUIntLowercase(expected.first) + getHexUIntLowercase(expected.second)
|
|
|
|
+ ". Actual: " + getHexUIntLowercase(calculated.first) + getHexUIntLowercase(calculated.second)
|
|
|
|
+ ".";
|
|
|
|
throw Exception(message, ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-10 11:39:46 +00:00
|
|
|
struct DistributedHeader
|
|
|
|
{
|
|
|
|
Settings insert_settings;
|
|
|
|
std::string insert_query;
|
|
|
|
ClientInfo client_info;
|
2021-01-10 12:03:22 +00:00
|
|
|
|
|
|
|
/// .bin file cannot have zero rows/bytes.
|
|
|
|
size_t rows = 0;
|
|
|
|
size_t bytes = 0;
|
2021-01-14 19:36:30 +00:00
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
/// dumpStructure() of the header -- obsolete
|
|
|
|
std::string block_header_string;
|
|
|
|
Block block_header;
|
2021-01-10 11:39:46 +00:00
|
|
|
};
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
DistributedHeader readDistributedHeader(ReadBufferFromFile & in, Poco::Logger * log)
|
2021-01-10 11:39:46 +00:00
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
DistributedHeader distributed_header;
|
2021-01-10 11:39:46 +00:00
|
|
|
|
|
|
|
UInt64 query_size;
|
|
|
|
readVarUInt(query_size, in);
|
|
|
|
|
|
|
|
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER)
|
|
|
|
{
|
|
|
|
/// Read the header as a string.
|
|
|
|
String header_data;
|
|
|
|
readStringBinary(header_data, in);
|
|
|
|
|
|
|
|
/// Check the checksum of the header.
|
|
|
|
CityHash_v1_0_2::uint128 checksum;
|
|
|
|
readPODBinary(checksum, in);
|
|
|
|
assertChecksum(checksum, CityHash_v1_0_2::CityHash128(header_data.data(), header_data.size()));
|
|
|
|
|
|
|
|
/// Read the parts of the header.
|
|
|
|
ReadBufferFromString header_buf(header_data);
|
|
|
|
|
|
|
|
UInt64 initiator_revision;
|
|
|
|
readVarUInt(initiator_revision, header_buf);
|
|
|
|
if (DBMS_TCP_PROTOCOL_VERSION < initiator_revision)
|
|
|
|
{
|
|
|
|
LOG_WARNING(log, "ClickHouse shard version is older than ClickHouse initiator version. It may lack support for new features.");
|
|
|
|
}
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
readStringBinary(distributed_header.insert_query, header_buf);
|
|
|
|
distributed_header.insert_settings.read(header_buf);
|
2021-01-10 11:39:46 +00:00
|
|
|
|
|
|
|
if (header_buf.hasPendingData())
|
2021-04-02 04:09:39 +00:00
|
|
|
distributed_header.client_info.read(header_buf, initiator_revision);
|
2021-01-10 11:39:46 +00:00
|
|
|
|
2021-01-10 12:03:22 +00:00
|
|
|
if (header_buf.hasPendingData())
|
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
readVarUInt(distributed_header.rows, header_buf);
|
|
|
|
readVarUInt(distributed_header.bytes, header_buf);
|
|
|
|
readStringBinary(distributed_header.block_header_string, header_buf);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (header_buf.hasPendingData())
|
|
|
|
{
|
|
|
|
NativeBlockInputStream header_block_in(header_buf, DBMS_TCP_PROTOCOL_VERSION);
|
|
|
|
distributed_header.block_header = header_block_in.read();
|
|
|
|
if (!distributed_header.block_header)
|
|
|
|
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read header from the {} batch", in.getFileName());
|
2021-01-10 12:03:22 +00:00
|
|
|
}
|
|
|
|
|
2021-01-10 11:39:46 +00:00
|
|
|
/// Add handling new data here, for example:
|
|
|
|
///
|
|
|
|
/// if (header_buf.hasPendingData())
|
|
|
|
/// readVarUInt(my_new_data, header_buf);
|
|
|
|
///
|
|
|
|
/// And note that it is safe, because we have checksum and size for header.
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
return distributed_header;
|
2021-01-10 11:39:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_HEADER_OLD_FORMAT)
|
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
distributed_header.insert_settings.read(in, SettingsWriteFormat::BINARY);
|
|
|
|
readStringBinary(distributed_header.insert_query, in);
|
|
|
|
return distributed_header;
|
2021-01-10 11:39:46 +00:00
|
|
|
}
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
distributed_header.insert_query.resize(query_size);
|
|
|
|
in.readStrict(distributed_header.insert_query.data(), query_size);
|
2021-01-10 11:39:46 +00:00
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
return distributed_header;
|
2021-01-10 11:39:46 +00:00
|
|
|
}
|
2021-01-19 19:22:58 +00:00
|
|
|
|
|
|
|
/// remote_error argument is used to decide whether some errors should be
|
|
|
|
/// ignored or not, in particular:
|
|
|
|
///
|
|
|
|
/// - ATTEMPT_TO_READ_AFTER_EOF should not be ignored
|
|
|
|
/// if we receive it from remote (receiver), since:
|
|
|
|
/// - the sender will got ATTEMPT_TO_READ_AFTER_EOF when the client just go away,
|
|
|
|
/// i.e. server had been restarted
|
|
|
|
/// - since #18853 the file will be checked on the sender locally, and
|
|
|
|
/// if there is something wrong with the file itself, we will receive
|
|
|
|
/// ATTEMPT_TO_READ_AFTER_EOF not from the remote at first
|
|
|
|
/// and mark batch as broken.
|
|
|
|
bool isFileBrokenErrorCode(int code, bool remote_error)
|
|
|
|
{
|
|
|
|
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|
2021-01-22 18:26:47 +00:00
|
|
|
|| code == ErrorCodes::EMPTY_DATA_PASSED
|
2021-01-19 19:22:58 +00:00
|
|
|
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|
|
|
|
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
|
|
|
|| code == ErrorCodes::UNKNOWN_CODEC
|
|
|
|
|| code == ErrorCodes::CANNOT_DECOMPRESS
|
2021-05-03 07:52:45 +00:00
|
|
|
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|
2021-05-03 07:52:51 +00:00
|
|
|
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES
|
2021-01-19 19:22:58 +00:00
|
|
|
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
|
|
|
|
}
|
2021-01-26 13:29:45 +00:00
|
|
|
|
2021-05-03 07:52:45 +00:00
|
|
|
/// Can the batch be split and send files from batch one-by-one instead?
|
2021-05-03 07:52:51 +00:00
|
|
|
bool isSplittableErrorCode(int code, bool remote)
|
2021-05-03 07:52:45 +00:00
|
|
|
{
|
|
|
|
return code == ErrorCodes::MEMORY_LIMIT_EXCEEDED
|
|
|
|
/// FunctionRange::max_elements and similar
|
|
|
|
|| code == ErrorCodes::ARGUMENT_OUT_OF_BOUND
|
|
|
|
|| code == ErrorCodes::TOO_MANY_PARTS
|
|
|
|
|| code == ErrorCodes::TOO_MANY_BYTES
|
|
|
|
|| code == ErrorCodes::TOO_MANY_ROWS_OR_BYTES
|
|
|
|
|| code == ErrorCodes::TOO_MANY_PARTITIONS
|
|
|
|
|| code == ErrorCodes::DISTRIBUTED_TOO_MANY_PENDING_BYTES
|
|
|
|
|| code == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO
|
2021-05-03 07:52:51 +00:00
|
|
|
|| isFileBrokenErrorCode(code, remote)
|
2021-05-03 07:52:45 +00:00
|
|
|
;
|
|
|
|
}
|
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
SyncGuardPtr getDirectorySyncGuard(bool dir_fsync, const DiskPtr & disk, const String & path)
|
|
|
|
{
|
|
|
|
if (dir_fsync)
|
|
|
|
return disk->getDirectorySyncGuard(path);
|
|
|
|
return nullptr;
|
|
|
|
}
|
2021-02-01 18:02:36 +00:00
|
|
|
|
2021-09-03 17:29:36 +00:00
|
|
|
void writeAndConvert(RemoteInserter & remote, ReadBufferFromFile & in)
|
2021-02-01 18:02:36 +00:00
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
CompressedReadBuffer decompressing_in(in);
|
|
|
|
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
|
|
|
block_in.readPrefix();
|
|
|
|
|
|
|
|
while (Block block = block_in.read())
|
2021-02-01 18:02:36 +00:00
|
|
|
{
|
2021-10-07 08:26:08 +00:00
|
|
|
auto converting_dag = ActionsDAG::makeConvertingActions(
|
|
|
|
block.cloneEmpty().getColumnsWithTypeAndName(),
|
|
|
|
remote.getHeader().getColumnsWithTypeAndName(),
|
|
|
|
ActionsDAG::MatchColumnsMode::Name);
|
|
|
|
|
|
|
|
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
|
|
|
|
converting_actions->execute(block);
|
|
|
|
remote.write(block);
|
2021-04-02 04:09:39 +00:00
|
|
|
}
|
2021-02-01 18:02:36 +00:00
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
block_in.readSuffix();
|
|
|
|
}
|
2021-02-01 18:02:36 +00:00
|
|
|
|
2021-04-13 21:53:39 +00:00
|
|
|
void writeRemoteConvert(
|
|
|
|
const DistributedHeader & distributed_header,
|
2021-09-03 17:29:36 +00:00
|
|
|
RemoteInserter & remote,
|
2021-04-13 21:53:39 +00:00
|
|
|
bool compression_expected,
|
|
|
|
ReadBufferFromFile & in,
|
|
|
|
Poco::Logger * log)
|
2021-04-02 04:09:39 +00:00
|
|
|
{
|
|
|
|
if (!remote.getHeader())
|
2021-02-01 18:02:36 +00:00
|
|
|
{
|
|
|
|
CheckingCompressedReadBuffer checking_in(in);
|
|
|
|
remote.writePrepared(checking_in);
|
2021-04-02 04:09:39 +00:00
|
|
|
return;
|
2021-02-01 18:02:36 +00:00
|
|
|
}
|
2021-04-02 04:09:39 +00:00
|
|
|
|
|
|
|
/// This is old format, that does not have header for the block in the file header,
|
|
|
|
/// applying ConvertingBlockInputStream in this case is not a big overhead.
|
|
|
|
///
|
|
|
|
/// Anyway we can get header only from the first block, which contain all rows anyway.
|
|
|
|
if (!distributed_header.block_header)
|
|
|
|
{
|
|
|
|
LOG_TRACE(log, "Processing batch {} with old format (no header)", in.getFileName());
|
|
|
|
|
|
|
|
writeAndConvert(remote, in);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!blocksHaveEqualStructure(distributed_header.block_header, remote.getHeader()))
|
|
|
|
{
|
|
|
|
LOG_WARNING(log,
|
|
|
|
"Structure does not match (remote: {}, local: {}), implicit conversion will be done",
|
|
|
|
remote.getHeader().dumpStructure(), distributed_header.block_header.dumpStructure());
|
|
|
|
|
|
|
|
writeAndConvert(remote, in);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-04-13 21:53:39 +00:00
|
|
|
/// If connection does not use compression, we have to uncompress the data.
|
|
|
|
if (!compression_expected)
|
|
|
|
{
|
|
|
|
writeAndConvert(remote, in);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Otherwise write data as it was already prepared (more efficient path).
|
2021-04-02 04:09:39 +00:00
|
|
|
CheckingCompressedReadBuffer checking_in(in);
|
|
|
|
remote.writePrepared(checking_in);
|
2021-02-01 18:02:36 +00:00
|
|
|
}
|
2021-07-17 07:53:20 +00:00
|
|
|
|
|
|
|
uint64_t doubleToUInt64(double d)
|
|
|
|
{
|
|
|
|
if (d >= std::numeric_limits<uint64_t>::max())
|
|
|
|
return std::numeric_limits<uint64_t>::max();
|
|
|
|
return static_cast<uint64_t>(d);
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
|
2021-01-09 12:26:37 +00:00
|
|
|
StorageDistributed & storage_,
|
|
|
|
const DiskPtr & disk_,
|
|
|
|
const std::string & relative_path_,
|
|
|
|
ConnectionPoolPtr pool_,
|
|
|
|
ActionBlocker & monitor_blocker_,
|
|
|
|
BackgroundSchedulePool & bg_pool)
|
2019-12-19 19:39:49 +00:00
|
|
|
: storage(storage_)
|
2019-12-24 18:25:00 +00:00
|
|
|
, pool(std::move(pool_))
|
2021-01-09 12:26:37 +00:00
|
|
|
, disk(disk_)
|
|
|
|
, relative_path(relative_path_)
|
2021-05-08 10:59:55 +00:00
|
|
|
, path(fs::path(disk->getPath()) / relative_path / "")
|
2021-07-15 06:26:10 +00:00
|
|
|
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
|
|
|
|
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
|
2021-01-09 14:51:30 +00:00
|
|
|
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
|
2021-04-10 23:33:54 +00:00
|
|
|
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
|
|
|
|
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
|
2021-01-09 12:26:37 +00:00
|
|
|
, current_batch_file_path(path + "current_batch.txt")
|
2021-07-15 06:26:10 +00:00
|
|
|
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
|
2021-01-09 12:26:37 +00:00
|
|
|
, sleep_time(default_sleep_time)
|
2021-07-15 06:26:10 +00:00
|
|
|
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
|
2021-01-09 12:26:37 +00:00
|
|
|
, log(&Poco::Logger::get(getLoggerName()))
|
2019-08-03 11:02:40 +00:00
|
|
|
, monitor_blocker(monitor_blocker_)
|
2020-08-26 21:43:00 +00:00
|
|
|
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
|
2021-05-04 19:42:07 +00:00
|
|
|
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2020-04-14 18:12:08 +00:00
|
|
|
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
|
|
|
|
task_handle->activateAndSchedule();
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
|
|
|
|
{
|
2018-04-21 00:35:20 +00:00
|
|
|
if (!quit)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-04-14 17:23:06 +00:00
|
|
|
quit = true;
|
2020-04-14 18:12:08 +00:00
|
|
|
task_handle->deactivate();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
|
|
|
|
2019-05-10 04:19:02 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::flushAllData()
|
2019-04-08 05:13:16 +00:00
|
|
|
{
|
2020-06-02 23:47:32 +00:00
|
|
|
if (quit)
|
|
|
|
return;
|
|
|
|
|
2021-01-26 18:45:37 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2020-06-02 23:47:32 +00:00
|
|
|
|
2020-08-26 21:43:00 +00:00
|
|
|
const auto & files = getFiles();
|
2020-06-02 23:47:32 +00:00
|
|
|
if (!files.empty())
|
2019-04-22 15:11:16 +00:00
|
|
|
{
|
2020-08-26 21:43:00 +00:00
|
|
|
processFiles(files);
|
2020-06-02 23:47:32 +00:00
|
|
|
|
|
|
|
/// Update counters
|
2020-08-26 21:43:00 +00:00
|
|
|
getFiles();
|
2019-04-22 15:11:16 +00:00
|
|
|
}
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
2018-04-21 00:35:20 +00:00
|
|
|
|
|
|
|
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
|
|
|
|
{
|
|
|
|
if (!quit)
|
|
|
|
{
|
2020-04-14 17:23:06 +00:00
|
|
|
quit = true;
|
2020-04-14 18:12:08 +00:00
|
|
|
task_handle->deactivate();
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::remove_all(path);
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::run()
|
|
|
|
{
|
2021-01-26 18:45:37 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-05-10 09:32:51 +00:00
|
|
|
bool do_sleep = false;
|
2020-04-14 18:12:08 +00:00
|
|
|
while (!quit)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-05-10 09:32:51 +00:00
|
|
|
do_sleep = true;
|
2020-06-02 23:47:32 +00:00
|
|
|
|
2020-08-26 21:43:00 +00:00
|
|
|
const auto & files = getFiles();
|
2020-06-02 23:47:32 +00:00
|
|
|
if (files.empty())
|
|
|
|
break;
|
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
if (!monitor_blocker.isCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-04-08 05:13:16 +00:00
|
|
|
try
|
|
|
|
{
|
2020-08-26 21:43:00 +00:00
|
|
|
do_sleep = !processFiles(files);
|
2020-06-03 08:22:48 +00:00
|
|
|
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
|
|
|
status.last_exception = std::exception_ptr{};
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2020-06-03 08:22:48 +00:00
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
do_sleep = true;
|
2021-05-04 19:16:36 +00:00
|
|
|
++status.error_count;
|
2021-07-17 07:53:20 +00:00
|
|
|
|
|
|
|
UInt64 q = doubleToUInt64(std::exp2(status.error_count));
|
|
|
|
std::chrono::milliseconds new_sleep_time(default_sleep_time.count() * q);
|
|
|
|
if (new_sleep_time.count() < 0)
|
|
|
|
sleep_time = max_sleep_time;
|
|
|
|
else
|
|
|
|
sleep_time = std::min(new_sleep_time, max_sleep_time);
|
|
|
|
|
2019-04-08 05:13:16 +00:00
|
|
|
tryLogCurrentException(getLoggerName().data());
|
2021-05-04 19:16:36 +00:00
|
|
|
status.last_exception = std::current_exception();
|
2019-04-08 05:13:16 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2019-04-22 15:11:16 +00:00
|
|
|
else
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_DEBUG(log, "Skipping send data over distributed table.");
|
2019-01-04 14:18:49 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
const auto now = std::chrono::system_clock::now();
|
|
|
|
if (now - last_decrease_time > decrease_error_count_period)
|
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2020-06-03 08:22:48 +00:00
|
|
|
|
2021-05-04 19:16:36 +00:00
|
|
|
status.error_count /= 2;
|
2017-04-01 07:20:54 +00:00
|
|
|
last_decrease_time = now;
|
|
|
|
}
|
2020-04-14 18:12:08 +00:00
|
|
|
|
|
|
|
if (do_sleep)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2020-06-02 23:47:32 +00:00
|
|
|
/// Update counters
|
2020-08-26 21:43:00 +00:00
|
|
|
getFiles();
|
2020-06-02 23:47:32 +00:00
|
|
|
|
2020-05-10 09:32:51 +00:00
|
|
|
if (!quit && do_sleep)
|
2020-04-25 23:00:03 +00:00
|
|
|
task_handle->scheduleAfter(sleep_time.count());
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2017-07-27 15:24:39 +00:00
|
|
|
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2021-02-15 07:45:19 +00:00
|
|
|
const auto pool_factory = [&storage, &name] (const Cluster::Address & address) -> ConnectionPoolPtr
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-12-02 02:17:08 +00:00
|
|
|
const auto & cluster = storage.getCluster();
|
|
|
|
const auto & shards_info = cluster->getShardsInfo();
|
2020-01-31 11:16:46 +00:00
|
|
|
const auto & shards_addresses = cluster->getShardsAddresses();
|
|
|
|
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
/// check new format shard{shard_index}_replica{replica_index}
|
2021-02-15 07:45:19 +00:00
|
|
|
/// (shard_index and replica_index starts from 1)
|
2020-01-31 11:16:46 +00:00
|
|
|
if (address.shard_index != 0)
|
|
|
|
{
|
2021-02-15 07:45:19 +00:00
|
|
|
if (!address.replica_index)
|
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
"Wrong replica_index={} ({})", address.replica_index, name);
|
2021-02-15 07:45:19 +00:00
|
|
|
|
|
|
|
if (address.shard_index > shards_info.size())
|
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
|
|
|
|
"No shard with shard_index={} ({})", address.shard_index, name);
|
|
|
|
|
|
|
|
const auto & shard_info = shards_info[address.shard_index - 1];
|
|
|
|
if (address.replica_index > shard_info.per_replica_pools.size())
|
|
|
|
throw Exception(ErrorCodes::INCORRECT_FILE_NAME,
|
|
|
|
"No shard with replica_index={} ({})", address.replica_index, name);
|
|
|
|
|
|
|
|
return shard_info.per_replica_pools[address.replica_index - 1];
|
2020-01-31 11:16:46 +00:00
|
|
|
}
|
2018-12-01 17:25:33 +00:00
|
|
|
|
|
|
|
/// existing connections pool have a higher priority
|
2020-01-31 11:16:46 +00:00
|
|
|
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
|
|
|
{
|
|
|
|
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
|
|
|
|
|
|
|
|
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
|
|
|
|
{
|
|
|
|
const Cluster::Address & replica_address = replicas_addresses[replica_index];
|
|
|
|
|
|
|
|
if (address.user == replica_address.user &&
|
|
|
|
address.password == replica_address.password &&
|
|
|
|
address.host_name == replica_address.host_name &&
|
|
|
|
address.port == replica_address.port &&
|
|
|
|
address.default_database == replica_address.default_database &&
|
|
|
|
address.secure == replica_address.secure)
|
|
|
|
{
|
|
|
|
return shards_info[shard_index].per_replica_pools[replica_index];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2018-12-01 17:25:33 +00:00
|
|
|
|
2020-01-31 11:16:46 +00:00
|
|
|
return std::make_shared<ConnectionPool>(
|
2020-09-14 21:55:43 +00:00
|
|
|
1, /* max_connections */
|
|
|
|
address.host_name,
|
|
|
|
address.port,
|
|
|
|
address.default_database,
|
|
|
|
address.user,
|
|
|
|
address.password,
|
|
|
|
address.cluster,
|
|
|
|
address.cluster_secret,
|
|
|
|
storage.getName() + '_' + address.user, /* client */
|
|
|
|
Protocol::Compression::Enable,
|
|
|
|
address.secure);
|
2017-04-01 07:20:54 +00:00
|
|
|
};
|
|
|
|
|
Drop replicas from dirname for internal_replication=true
Under use_compact_format_in_distributed_parts_names=1 and
internal_replication=true the server encodes all replicas for the
directory name for async INSERT into Distributed, and the directory name
looks like:
shard1_replica1,shard1_replica2,shard3_replica3
This is required for creating connections (to specific replicas only),
but in case of internal_replication=true, this can be avoided, since
this path will always includes all replicas.
This patch replaces all replicas with "_all_replicas" marker.
Note, that initial problem was that this path may overflow the NAME_MAX
if you will have more then 15 replicas, and the server will fail to
create the directory.
Also note, that changed directory name should not be a problem, since:
- empty directories will be removed since #16729
- and replicas encoded in the directory name is also supported anyway.
2021-06-20 13:50:01 +00:00
|
|
|
auto pools = createPoolsForAddresses(name, pool_factory, storage.getCluster()->getShardsInfo(), storage.log);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-04-10 23:33:54 +00:00
|
|
|
const auto settings = storage.getContext()->getSettings();
|
2020-05-13 21:52:34 +00:00
|
|
|
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools,
|
|
|
|
settings.load_balancing,
|
|
|
|
settings.distributed_replica_error_half_life.totalSeconds(),
|
|
|
|
settings.distributed_replica_error_cap);
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2021-03-02 20:18:53 +00:00
|
|
|
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::map<UInt64, std::string> files;
|
2020-06-02 23:47:32 +00:00
|
|
|
size_t new_bytes_count = 0;
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::directory_iterator end;
|
|
|
|
for (fs::directory_iterator it{path}; it != end; ++it)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
|
|
|
const auto & file_path_str = it->path();
|
2021-04-27 17:22:39 +00:00
|
|
|
if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
|
2020-06-02 23:47:32 +00:00
|
|
|
{
|
2021-04-27 17:22:39 +00:00
|
|
|
files[parse<UInt64>(fs::path(file_path_str).stem())] = file_path_str;
|
|
|
|
new_bytes_count += fs::file_size(fs::path(file_path_str));
|
2020-06-02 23:47:32 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2020-06-03 08:22:48 +00:00
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2021-01-26 18:45:37 +00:00
|
|
|
|
2021-05-04 19:16:36 +00:00
|
|
|
if (status.files_count != files.size())
|
|
|
|
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
|
|
|
|
if (status.bytes_count != new_bytes_count)
|
|
|
|
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
|
2021-01-26 18:45:37 +00:00
|
|
|
|
2021-01-27 05:02:04 +00:00
|
|
|
metric_pending_files.changeTo(files.size());
|
2021-05-04 19:16:36 +00:00
|
|
|
status.files_count = files.size();
|
|
|
|
status.bytes_count = new_bytes_count;
|
2020-06-03 08:22:48 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 23:47:32 +00:00
|
|
|
return files;
|
|
|
|
}
|
2020-08-26 21:43:00 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)
|
2020-06-02 23:47:32 +00:00
|
|
|
{
|
2017-07-10 15:28:04 +00:00
|
|
|
if (should_batch_inserts)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-08-26 21:43:00 +00:00
|
|
|
processFilesWithBatching(files);
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
for (const auto & file : files)
|
|
|
|
{
|
|
|
|
if (quit)
|
|
|
|
return true;
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2020-08-26 21:43:00 +00:00
|
|
|
processFile(file.second);
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return true;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2020-08-26 21:43:00 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2021-04-07 20:56:15 +00:00
|
|
|
Stopwatch watch;
|
2021-04-10 23:33:54 +00:00
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.getContext()->getSettingsRef());
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2021-01-10 12:37:54 +00:00
|
|
|
ReadBufferFromFile in(file_path);
|
2021-04-02 04:09:39 +00:00
|
|
|
const auto & distributed_header = readDistributedHeader(in, log);
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2021-04-15 18:00:16 +00:00
|
|
|
LOG_DEBUG(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
|
2021-04-07 20:56:15 +00:00
|
|
|
formatReadableQuantity(distributed_header.rows),
|
|
|
|
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
auto connection = pool->get(timeouts, &distributed_header.insert_settings);
|
2021-09-03 17:29:36 +00:00
|
|
|
RemoteInserter remote{*connection, timeouts,
|
2021-04-02 04:09:39 +00:00
|
|
|
distributed_header.insert_query,
|
|
|
|
distributed_header.insert_settings,
|
|
|
|
distributed_header.client_info};
|
2021-04-13 21:53:39 +00:00
|
|
|
bool compression_expected = connection->getCompression() == Protocol::Compression::Enable;
|
|
|
|
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
|
2021-09-03 17:29:36 +00:00
|
|
|
remote.onFinish();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2021-05-01 08:22:04 +00:00
|
|
|
catch (Exception & e)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2021-05-01 08:22:04 +00:00
|
|
|
e.addMessage(fmt::format("While sending {}", file_path));
|
2017-07-10 15:28:04 +00:00
|
|
|
maybeMarkAsBroken(file_path, e);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
|
2021-01-26 18:45:37 +00:00
|
|
|
markAsSend(file_path);
|
2021-04-07 20:56:15 +00:00
|
|
|
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
struct StorageDistributedDirectoryMonitor::BatchHeader
|
|
|
|
{
|
2019-04-08 10:04:26 +00:00
|
|
|
Settings settings;
|
2017-07-21 12:03:37 +00:00
|
|
|
String query;
|
2020-03-24 22:26:24 +00:00
|
|
|
ClientInfo client_info;
|
2021-04-02 04:09:39 +00:00
|
|
|
Block header;
|
2017-07-21 12:03:37 +00:00
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
BatchHeader(Settings settings_, String query_, ClientInfo client_info_, Block header_)
|
2019-04-08 10:04:26 +00:00
|
|
|
: settings(std::move(settings_))
|
|
|
|
, query(std::move(query_))
|
2020-03-24 22:26:24 +00:00
|
|
|
, client_info(std::move(client_info_))
|
2021-04-02 04:09:39 +00:00
|
|
|
, header(std::move(header_))
|
2017-07-21 12:03:37 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
bool operator==(const BatchHeader & other) const
|
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
return std::tie(settings, query, client_info.query_kind) ==
|
|
|
|
std::tie(other.settings, other.query, other.client_info.query_kind) &&
|
|
|
|
blocksHaveEqualStructure(header, other.header);
|
2017-07-21 12:03:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
struct Hash
|
|
|
|
{
|
|
|
|
size_t operator()(const BatchHeader & batch_header) const
|
|
|
|
{
|
|
|
|
SipHash hash_state;
|
|
|
|
hash_state.update(batch_header.query.data(), batch_header.query.size());
|
2021-04-02 04:09:39 +00:00
|
|
|
batch_header.header.updateHash(hash_state);
|
2017-07-21 12:03:37 +00:00
|
|
|
return hash_state.get64();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
struct StorageDistributedDirectoryMonitor::Batch
|
|
|
|
{
|
|
|
|
std::vector<UInt64> file_indices;
|
|
|
|
size_t total_rows = 0;
|
|
|
|
size_t total_bytes = 0;
|
2018-03-27 18:59:53 +00:00
|
|
|
bool recovered = false;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
StorageDistributedDirectoryMonitor & parent;
|
2018-03-27 18:59:53 +00:00
|
|
|
const std::map<UInt64, String> & file_index_to_path;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2021-05-03 07:52:45 +00:00
|
|
|
bool split_batch_on_failure = true;
|
2021-01-09 12:42:21 +00:00
|
|
|
bool fsync = false;
|
|
|
|
bool dir_fsync = false;
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch(
|
|
|
|
StorageDistributedDirectoryMonitor & parent_,
|
|
|
|
const std::map<UInt64, String> & file_index_to_path_)
|
2021-01-09 12:42:21 +00:00
|
|
|
: parent(parent_)
|
|
|
|
, file_index_to_path(file_index_to_path_)
|
2021-05-03 07:52:45 +00:00
|
|
|
, split_batch_on_failure(parent.split_batch_on_failure)
|
2021-01-09 12:42:21 +00:00
|
|
|
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
|
|
|
|
, dir_fsync(parent.dir_fsync)
|
2018-03-27 18:59:53 +00:00
|
|
|
{}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
bool isEnoughSize() const
|
|
|
|
{
|
|
|
|
return (!parent.min_batched_block_size_rows && !parent.min_batched_block_size_bytes)
|
|
|
|
|| (parent.min_batched_block_size_rows && total_rows >= parent.min_batched_block_size_rows)
|
|
|
|
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
|
|
|
|
}
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
void send()
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
if (file_indices.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2021-04-07 20:56:15 +00:00
|
|
|
Stopwatch watch;
|
|
|
|
|
2021-04-15 18:00:16 +00:00
|
|
|
LOG_DEBUG(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
|
2021-04-07 20:56:15 +00:00
|
|
|
formatReadableQuantity(total_rows),
|
|
|
|
formatReadableSizeWithBinarySuffix(total_bytes));
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
if (!recovered)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-07-10 15:28:04 +00:00
|
|
|
/// For deduplication in Replicated tables to work, in case of error
|
|
|
|
/// we must try to re-send exactly the same batches.
|
|
|
|
/// So we save contents of the current batch into the current_batch_file_path file
|
|
|
|
/// and truncate it afterwards if all went well.
|
Write current batch for distributed send atomically (using .tmp + rename)
Otherwise the following can happen after reboot:
2019.11.01 11:46:12.217143 [ 187 ] {} <Error> dist.Distributed.DirectoryMonitor: Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected \n before: S\'^A\0^]\0\0<BE>4^A\0r<87>\0\0<A2><D7>^D^Y\0<F2>{^E<CD>\0\0Hy\0\0<F2>^_^C\0^_&\0\0<FF><D3>\0\0
<8D><91>\0\0<C0>9\0\0<C0><B0>^A\0^G<AA>\0\0<B5><FE>^A\0<BF><A7>^A\0<9B><CB>^A\0I^R^A\0<B7><AB>^A\0<BC><8F>\0\0˲^B\0Zy\0\0<94><AA>\0\0<98>
<8F>\0\0\f<A5>\0\0^QN\0\0<E3><C6>\0\0<B1>6^B\0ɳ\0\0W<99>\0\0<B9><A2>\0\0:<BB>\0\0)<B1>\0\0#<8B>\0\0aW\0\0<ED>#\0\0<F1>@\0\0ˀ^B\0<D7><FC>\0\0<DF>, Stack trace:
0. 0x559e27222e60 StackTrace::StackTrace() /usr/bin/clickhouse
1. 0x559e27222c45 DB::Exception::Exception(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /usr/bin/clickhouse
2. 0x559e26de4473 ? /usr/bin/clickhouse
3. 0x559e272494b5 DB::assertString(char const*, DB::ReadBuffer&) /usr/bin/clickhouse
4. 0x559e2a5dab45 DB::StorageDistributedDirectoryMonitor::processFilesWithBatching(std::map<unsigned long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<unsigned long>, std::allocator<std::pair<unsigned long const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&) /usr/bin/clickhouse
5. 0x559e2a5db5fa DB::StorageDistributedDirectoryMonitor::processFiles() /usr/bin/clickhouse
6. 0x559e2a5dba78 DB::StorageDistributedDirectoryMonitor::run() /usr/bin/clickhouse
7. 0x559e2a5ddbbc ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::StorageDistributedDirectoryMonitor::*)(), DB::StorageDistributedDirectoryMonitor*>(void (DB::StorageDistributedDirectoryMonitor::*&&)(), DB::StorageDistributedDirectoryMonitor*&&)::{lambda()#1}::operator()() const /usr/bin/clickhouse
8. 0x559e2726b07c ThreadPoolImpl<std::thread>::worker(std::_List_iterator<std::thread>) /usr/bin/clickhouse
9. 0x559e2bbc3640 ? /usr/bin/clickhouse
10. 0x7fbd62b3cfb7 start_thread /lib/x86_64-linux-gnu/libpthread-2.29.so
11. 0x7fbd62a692ef __clone /lib/x86_64-linux-gnu/libc-2.29.so
(version 19.17.1.1)
v2: remove fsync, to avoid possible stalls (https://github.com/ClickHouse/ClickHouse/pull/7600#discussion_r342010874)
2019-11-03 19:26:46 +00:00
|
|
|
|
|
|
|
/// Temporary file is required for atomicity.
|
|
|
|
String tmp_file{parent.current_batch_file_path + ".tmp"};
|
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
|
2021-04-27 00:05:43 +00:00
|
|
|
if (fs::exists(tmp_file))
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_ERROR(parent.log, "Temporary file {} exists. Unclean shutdown?", backQuote(tmp_file));
|
Write current batch for distributed send atomically (using .tmp + rename)
Otherwise the following can happen after reboot:
2019.11.01 11:46:12.217143 [ 187 ] {} <Error> dist.Distributed.DirectoryMonitor: Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected \n before: S\'^A\0^]\0\0<BE>4^A\0r<87>\0\0<A2><D7>^D^Y\0<F2>{^E<CD>\0\0Hy\0\0<F2>^_^C\0^_&\0\0<FF><D3>\0\0
<8D><91>\0\0<C0>9\0\0<C0><B0>^A\0^G<AA>\0\0<B5><FE>^A\0<BF><A7>^A\0<9B><CB>^A\0I^R^A\0<B7><AB>^A\0<BC><8F>\0\0˲^B\0Zy\0\0<94><AA>\0\0<98>
<8F>\0\0\f<A5>\0\0^QN\0\0<E3><C6>\0\0<B1>6^B\0ɳ\0\0W<99>\0\0<B9><A2>\0\0:<BB>\0\0)<B1>\0\0#<8B>\0\0aW\0\0<ED>#\0\0<F1>@\0\0ˀ^B\0<D7><FC>\0\0<DF>, Stack trace:
0. 0x559e27222e60 StackTrace::StackTrace() /usr/bin/clickhouse
1. 0x559e27222c45 DB::Exception::Exception(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /usr/bin/clickhouse
2. 0x559e26de4473 ? /usr/bin/clickhouse
3. 0x559e272494b5 DB::assertString(char const*, DB::ReadBuffer&) /usr/bin/clickhouse
4. 0x559e2a5dab45 DB::StorageDistributedDirectoryMonitor::processFilesWithBatching(std::map<unsigned long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<unsigned long>, std::allocator<std::pair<unsigned long const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&) /usr/bin/clickhouse
5. 0x559e2a5db5fa DB::StorageDistributedDirectoryMonitor::processFiles() /usr/bin/clickhouse
6. 0x559e2a5dba78 DB::StorageDistributedDirectoryMonitor::run() /usr/bin/clickhouse
7. 0x559e2a5ddbbc ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::StorageDistributedDirectoryMonitor::*)(), DB::StorageDistributedDirectoryMonitor*>(void (DB::StorageDistributedDirectoryMonitor::*&&)(), DB::StorageDistributedDirectoryMonitor*&&)::{lambda()#1}::operator()() const /usr/bin/clickhouse
8. 0x559e2726b07c ThreadPoolImpl<std::thread>::worker(std::_List_iterator<std::thread>) /usr/bin/clickhouse
9. 0x559e2bbc3640 ? /usr/bin/clickhouse
10. 0x7fbd62b3cfb7 start_thread /lib/x86_64-linux-gnu/libpthread-2.29.so
11. 0x7fbd62a692ef __clone /lib/x86_64-linux-gnu/libc-2.29.so
(version 19.17.1.1)
v2: remove fsync, to avoid possible stalls (https://github.com/ClickHouse/ClickHouse/pull/7600#discussion_r342010874)
2019-11-03 19:26:46 +00:00
|
|
|
|
|
|
|
{
|
|
|
|
WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
|
|
|
|
writeText(out);
|
2021-01-09 08:29:49 +00:00
|
|
|
|
|
|
|
out.finalize();
|
|
|
|
if (fsync)
|
|
|
|
out.sync();
|
Write current batch for distributed send atomically (using .tmp + rename)
Otherwise the following can happen after reboot:
2019.11.01 11:46:12.217143 [ 187 ] {} <Error> dist.Distributed.DirectoryMonitor: Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected \n before: S\'^A\0^]\0\0<BE>4^A\0r<87>\0\0<A2><D7>^D^Y\0<F2>{^E<CD>\0\0Hy\0\0<F2>^_^C\0^_&\0\0<FF><D3>\0\0
<8D><91>\0\0<C0>9\0\0<C0><B0>^A\0^G<AA>\0\0<B5><FE>^A\0<BF><A7>^A\0<9B><CB>^A\0I^R^A\0<B7><AB>^A\0<BC><8F>\0\0˲^B\0Zy\0\0<94><AA>\0\0<98>
<8F>\0\0\f<A5>\0\0^QN\0\0<E3><C6>\0\0<B1>6^B\0ɳ\0\0W<99>\0\0<B9><A2>\0\0:<BB>\0\0)<B1>\0\0#<8B>\0\0aW\0\0<ED>#\0\0<F1>@\0\0ˀ^B\0<D7><FC>\0\0<DF>, Stack trace:
0. 0x559e27222e60 StackTrace::StackTrace() /usr/bin/clickhouse
1. 0x559e27222c45 DB::Exception::Exception(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /usr/bin/clickhouse
2. 0x559e26de4473 ? /usr/bin/clickhouse
3. 0x559e272494b5 DB::assertString(char const*, DB::ReadBuffer&) /usr/bin/clickhouse
4. 0x559e2a5dab45 DB::StorageDistributedDirectoryMonitor::processFilesWithBatching(std::map<unsigned long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<unsigned long>, std::allocator<std::pair<unsigned long const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&) /usr/bin/clickhouse
5. 0x559e2a5db5fa DB::StorageDistributedDirectoryMonitor::processFiles() /usr/bin/clickhouse
6. 0x559e2a5dba78 DB::StorageDistributedDirectoryMonitor::run() /usr/bin/clickhouse
7. 0x559e2a5ddbbc ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::StorageDistributedDirectoryMonitor::*)(), DB::StorageDistributedDirectoryMonitor*>(void (DB::StorageDistributedDirectoryMonitor::*&&)(), DB::StorageDistributedDirectoryMonitor*&&)::{lambda()#1}::operator()() const /usr/bin/clickhouse
8. 0x559e2726b07c ThreadPoolImpl<std::thread>::worker(std::_List_iterator<std::thread>) /usr/bin/clickhouse
9. 0x559e2bbc3640 ? /usr/bin/clickhouse
10. 0x7fbd62b3cfb7 start_thread /lib/x86_64-linux-gnu/libpthread-2.29.so
11. 0x7fbd62a692ef __clone /lib/x86_64-linux-gnu/libc-2.29.so
(version 19.17.1.1)
v2: remove fsync, to avoid possible stalls (https://github.com/ClickHouse/ClickHouse/pull/7600#discussion_r342010874)
2019-11-03 19:26:46 +00:00
|
|
|
}
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::rename(tmp_file, parent.current_batch_file_path);
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
2021-04-10 23:33:54 +00:00
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.getContext()->getSettingsRef());
|
2019-03-01 23:09:36 +00:00
|
|
|
auto connection = parent.pool->get(timeouts);
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
bool batch_broken = false;
|
2021-05-03 07:52:51 +00:00
|
|
|
bool batch_marked_as_broken = false;
|
2018-03-27 18:59:53 +00:00
|
|
|
try
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2021-05-03 07:52:45 +00:00
|
|
|
try
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2021-05-03 07:52:45 +00:00
|
|
|
sendBatch(*connection, timeouts);
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
2021-05-03 07:52:51 +00:00
|
|
|
if (split_batch_on_failure && isSplittableErrorCode(e.code(), e.isRemoteException()))
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
2021-05-03 07:52:45 +00:00
|
|
|
tryLogCurrentException(parent.log, "Trying to split batch due to");
|
|
|
|
sendSeparateFiles(*connection, timeouts);
|
2018-03-27 18:59:53 +00:00
|
|
|
}
|
2021-05-03 07:52:45 +00:00
|
|
|
else
|
|
|
|
throw;
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
2018-03-27 18:59:53 +00:00
|
|
|
}
|
2021-05-01 08:22:04 +00:00
|
|
|
catch (Exception & e)
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
2021-01-19 19:22:58 +00:00
|
|
|
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
|
|
|
tryLogCurrentException(parent.log, "Failed to send batch due to");
|
|
|
|
batch_broken = true;
|
2021-05-03 07:52:51 +00:00
|
|
|
if (!e.isRemoteException() && e.code() == ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES)
|
|
|
|
batch_marked_as_broken = true;
|
2018-03-27 18:59:53 +00:00
|
|
|
}
|
|
|
|
else
|
2021-05-01 08:22:04 +00:00
|
|
|
{
|
|
|
|
std::vector<std::string> files(file_index_to_path.size());
|
2021-07-16 19:27:30 +00:00
|
|
|
for (const auto && file_info : file_index_to_path | boost::adaptors::indexed())
|
|
|
|
files[file_info.index()] = file_info.value().second;
|
2021-05-01 08:22:04 +00:00
|
|
|
e.addMessage(fmt::format("While sending batch {}", fmt::join(files, "\n")));
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
throw;
|
2021-05-01 08:22:04 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
if (!batch_broken)
|
|
|
|
{
|
2021-04-07 20:56:15 +00:00
|
|
|
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds());
|
2018-03-27 18:59:53 +00:00
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
|
2018-03-27 18:59:53 +00:00
|
|
|
for (UInt64 file_index : file_indices)
|
2021-01-26 18:45:37 +00:00
|
|
|
parent.markAsSend(file_index_to_path.at(file_index));
|
2018-03-27 18:59:53 +00:00
|
|
|
}
|
2021-05-03 07:52:51 +00:00
|
|
|
else if (!batch_marked_as_broken)
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
{
|
|
|
|
auto file_path = file_index_to_path.find(file_idx);
|
|
|
|
if (file_path != file_index_to_path.end())
|
|
|
|
parent.markAsBroken(file_path->second);
|
|
|
|
}
|
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
file_indices.clear();
|
|
|
|
total_rows = 0;
|
|
|
|
total_bytes = 0;
|
2018-03-27 17:46:53 +00:00
|
|
|
recovered = false;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::resize_file(parent.current_batch_file_path, 0);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
void writeText(WriteBuffer & out)
|
|
|
|
{
|
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
out << file_idx << '\n';
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
void readText(ReadBuffer & in)
|
|
|
|
{
|
|
|
|
while (!in.eof())
|
|
|
|
{
|
|
|
|
UInt64 idx;
|
|
|
|
in >> idx >> "\n";
|
|
|
|
file_indices.push_back(idx);
|
|
|
|
}
|
2018-03-27 18:59:53 +00:00
|
|
|
recovered = true;
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
2021-05-03 07:52:45 +00:00
|
|
|
|
|
|
|
private:
|
|
|
|
void sendBatch(Connection & connection, const ConnectionTimeouts & timeouts)
|
|
|
|
{
|
2021-09-03 17:29:36 +00:00
|
|
|
std::unique_ptr<RemoteInserter> remote;
|
2021-05-03 07:52:45 +00:00
|
|
|
|
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
{
|
|
|
|
auto file_path = file_index_to_path.find(file_idx);
|
|
|
|
if (file_path == file_index_to_path.end())
|
|
|
|
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
|
|
|
|
"Failed to send batch: file with index {} is absent", file_idx);
|
|
|
|
|
|
|
|
ReadBufferFromFile in(file_path->second);
|
|
|
|
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
|
|
|
|
|
|
|
if (!remote)
|
|
|
|
{
|
2021-09-03 17:29:36 +00:00
|
|
|
remote = std::make_unique<RemoteInserter>(connection, timeouts,
|
2021-05-03 07:52:45 +00:00
|
|
|
distributed_header.insert_query,
|
|
|
|
distributed_header.insert_settings,
|
|
|
|
distributed_header.client_info);
|
|
|
|
}
|
|
|
|
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
|
|
|
|
writeRemoteConvert(distributed_header, *remote, compression_expected, in, parent.log);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (remote)
|
2021-09-03 17:29:36 +00:00
|
|
|
remote->onFinish();
|
2021-05-03 07:52:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void sendSeparateFiles(Connection & connection, const ConnectionTimeouts & timeouts)
|
|
|
|
{
|
2021-05-03 07:52:51 +00:00
|
|
|
size_t broken_files = 0;
|
|
|
|
|
2021-05-03 07:52:45 +00:00
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
{
|
|
|
|
auto file_path = file_index_to_path.find(file_idx);
|
|
|
|
if (file_path == file_index_to_path.end())
|
|
|
|
{
|
|
|
|
LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
|
2021-05-03 07:52:51 +00:00
|
|
|
++broken_files;
|
2021-05-03 07:52:45 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2021-05-03 07:52:51 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
ReadBufferFromFile in(file_path->second);
|
|
|
|
const auto & distributed_header = readDistributedHeader(in, parent.log);
|
2021-05-03 07:52:45 +00:00
|
|
|
|
2021-09-03 17:29:36 +00:00
|
|
|
RemoteInserter remote(connection, timeouts,
|
2021-05-03 07:52:51 +00:00
|
|
|
distributed_header.insert_query,
|
|
|
|
distributed_header.insert_settings,
|
|
|
|
distributed_header.client_info);
|
|
|
|
bool compression_expected = connection.getCompression() == Protocol::Compression::Enable;
|
|
|
|
writeRemoteConvert(distributed_header, remote, compression_expected, in, parent.log);
|
2021-09-03 17:29:36 +00:00
|
|
|
remote.onFinish();
|
2021-05-03 07:52:51 +00:00
|
|
|
}
|
|
|
|
catch (Exception & e)
|
|
|
|
{
|
|
|
|
e.addMessage(fmt::format("While sending {}", file_path->second));
|
|
|
|
parent.maybeMarkAsBroken(file_path->second, e);
|
|
|
|
++broken_files;
|
|
|
|
}
|
2021-05-03 07:52:45 +00:00
|
|
|
}
|
2021-05-03 07:52:51 +00:00
|
|
|
|
|
|
|
if (broken_files)
|
|
|
|
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_FILES,
|
|
|
|
"Failed to send {} files", broken_files);
|
2021-05-03 07:52:45 +00:00
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
};
|
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
class DirectoryMonitorSource : public SourceWithProgress
|
2020-01-04 14:45:11 +00:00
|
|
|
{
|
|
|
|
public:
|
2021-07-20 18:18:43 +00:00
|
|
|
|
|
|
|
struct Data
|
2020-01-04 14:45:11 +00:00
|
|
|
{
|
2021-07-20 18:18:43 +00:00
|
|
|
std::unique_ptr<ReadBufferFromFile> in;
|
|
|
|
std::unique_ptr<CompressedReadBuffer> decompressing_in;
|
|
|
|
std::unique_ptr<NativeBlockInputStream> block_in;
|
2020-01-04 18:33:16 +00:00
|
|
|
|
2021-07-21 12:16:13 +00:00
|
|
|
Poco::Logger * log = nullptr;
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
Block first_block;
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-22 10:38:22 +00:00
|
|
|
explicit Data(const String & file_name)
|
|
|
|
{
|
|
|
|
in = std::make_unique<ReadBufferFromFile>(file_name);
|
|
|
|
decompressing_in = std::make_unique<CompressedReadBuffer>(*in);
|
|
|
|
block_in = std::make_unique<NativeBlockInputStream>(*decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
|
|
|
log = &Poco::Logger::get("DirectoryMonitorSource");
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-22 10:38:22 +00:00
|
|
|
readDistributedHeader(*in, log);
|
2021-07-20 18:18:43 +00:00
|
|
|
|
2021-07-22 10:38:22 +00:00
|
|
|
block_in->readPrefix();
|
|
|
|
first_block = block_in->read();
|
|
|
|
}
|
|
|
|
|
|
|
|
Data(Data &&) = default;
|
|
|
|
};
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
explicit DirectoryMonitorSource(const String & file_name)
|
2021-07-22 10:38:22 +00:00
|
|
|
: DirectoryMonitorSource(Data(file_name))
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
explicit DirectoryMonitorSource(Data data_)
|
|
|
|
: SourceWithProgress(data_.first_block.cloneEmpty())
|
|
|
|
, data(std::move(data_))
|
2021-07-20 18:18:43 +00:00
|
|
|
{
|
|
|
|
}
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
String getName() const override { return "DirectoryMonitorSource"; }
|
2020-01-04 14:45:11 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
protected:
|
|
|
|
Chunk generate() override
|
|
|
|
{
|
|
|
|
if (data.first_block)
|
|
|
|
{
|
|
|
|
size_t num_rows = data.first_block.rows();
|
|
|
|
Chunk res(data.first_block.getColumns(), num_rows);
|
|
|
|
data.first_block.clear();
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
auto block = data.block_in->read();
|
|
|
|
if (!block)
|
|
|
|
{
|
|
|
|
data.block_in->readSuffix();
|
|
|
|
return {};
|
|
|
|
}
|
2020-01-04 18:33:16 +00:00
|
|
|
|
2021-07-20 18:18:43 +00:00
|
|
|
size_t num_rows = block.rows();
|
|
|
|
return Chunk(block.getColumns(), num_rows);
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
Data data;
|
2020-01-04 14:45:11 +00:00
|
|
|
};
|
|
|
|
|
2021-09-16 17:40:42 +00:00
|
|
|
std::shared_ptr<ISource> StorageDistributedDirectoryMonitor::createSourceFromFile(const String & file_name)
|
2020-01-04 14:45:11 +00:00
|
|
|
{
|
2021-07-20 18:18:43 +00:00
|
|
|
return std::make_shared<DirectoryMonitorSource>(file_name);
|
2020-01-04 14:45:11 +00:00
|
|
|
}
|
2018-04-21 00:35:20 +00:00
|
|
|
|
2021-01-26 18:45:37 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
|
2020-04-14 18:12:08 +00:00
|
|
|
{
|
|
|
|
if (quit)
|
|
|
|
return false;
|
2021-01-26 18:45:37 +00:00
|
|
|
|
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2021-01-26 19:21:36 +00:00
|
|
|
metric_pending_files.add();
|
2021-05-04 19:16:36 +00:00
|
|
|
status.bytes_count += file_size;
|
|
|
|
++status.files_count;
|
2021-01-26 18:45:37 +00:00
|
|
|
}
|
|
|
|
|
2020-04-24 20:00:00 +00:00
|
|
|
return task_handle->scheduleAfter(ms, false);
|
2020-04-14 18:12:08 +00:00
|
|
|
}
|
|
|
|
|
2021-03-02 20:18:53 +00:00
|
|
|
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus()
|
2020-06-03 08:22:48 +00:00
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
|
|
|
Status current_status{status, path, monitor_blocker.isCancelled()};
|
|
|
|
return current_status;
|
2020-06-03 08:22:48 +00:00
|
|
|
}
|
|
|
|
|
2020-08-26 21:43:00 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
std::unordered_set<UInt64> file_indices_to_skip;
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
if (fs::exists(current_batch_file_path))
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch batch(*this, files);
|
2017-07-10 15:28:04 +00:00
|
|
|
ReadBufferFromFile in{current_batch_file_path};
|
|
|
|
batch.readText(in);
|
|
|
|
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
for (const auto & file : files)
|
|
|
|
{
|
|
|
|
if (quit)
|
|
|
|
return;
|
|
|
|
|
|
|
|
UInt64 file_idx = file.first;
|
|
|
|
const String & file_path = file.second;
|
|
|
|
|
|
|
|
if (file_indices_to_skip.count(file_idx))
|
|
|
|
continue;
|
|
|
|
|
|
|
|
size_t total_rows = 0;
|
|
|
|
size_t total_bytes = 0;
|
2021-04-02 04:09:39 +00:00
|
|
|
Block header;
|
|
|
|
DistributedHeader distributed_header;
|
2017-07-10 15:28:04 +00:00
|
|
|
try
|
|
|
|
{
|
2018-03-27 17:46:53 +00:00
|
|
|
/// Determine metadata of the current file and check if it is not broken.
|
|
|
|
ReadBufferFromFile in{file_path};
|
2021-04-02 04:09:39 +00:00
|
|
|
distributed_header = readDistributedHeader(in, log);
|
2018-03-27 17:46:53 +00:00
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
if (distributed_header.rows)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2021-04-02 04:09:39 +00:00
|
|
|
total_rows += distributed_header.rows;
|
|
|
|
total_bytes += distributed_header.bytes;
|
2021-01-10 12:03:22 +00:00
|
|
|
}
|
2021-04-02 04:09:39 +00:00
|
|
|
|
|
|
|
if (distributed_header.block_header)
|
|
|
|
header = distributed_header.block_header;
|
|
|
|
|
|
|
|
if (!total_rows || !header)
|
2021-01-10 12:03:22 +00:00
|
|
|
{
|
2021-04-15 18:00:16 +00:00
|
|
|
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
|
2021-04-02 04:09:39 +00:00
|
|
|
|
2021-01-10 12:03:22 +00:00
|
|
|
CompressedReadBuffer decompressing_in(in);
|
|
|
|
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);
|
|
|
|
block_in.readPrefix();
|
|
|
|
|
|
|
|
while (Block block = block_in.read())
|
|
|
|
{
|
|
|
|
total_rows += block.rows();
|
|
|
|
total_bytes += block.bytes();
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
if (!header)
|
|
|
|
header = block.cloneEmpty();
|
2021-01-10 12:03:22 +00:00
|
|
|
}
|
|
|
|
block_in.readSuffix();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (maybeMarkAsBroken(file_path, e))
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, "File is marked broken due to");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2021-04-02 04:09:39 +00:00
|
|
|
BatchHeader batch_header(
|
|
|
|
std::move(distributed_header.insert_settings),
|
|
|
|
std::move(distributed_header.insert_query),
|
|
|
|
std::move(distributed_header.client_info),
|
|
|
|
std::move(header)
|
|
|
|
);
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
|
2017-07-21 12:03:37 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
batch.file_indices.push_back(file_idx);
|
|
|
|
batch.total_rows += total_rows;
|
|
|
|
batch.total_bytes += total_bytes;
|
|
|
|
|
|
|
|
if (batch.isEnoughSize())
|
2020-05-27 10:07:38 +00:00
|
|
|
{
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2020-05-27 10:07:38 +00:00
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
for (auto & kv : header_to_batch)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
Batch & batch = kv.second;
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2021-01-09 12:42:21 +00:00
|
|
|
{
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
|
2021-01-09 12:42:21 +00:00
|
|
|
|
|
|
|
/// current_batch.txt will not exist if there was no send
|
|
|
|
/// (this is the case when all batches that was pending has been marked as pending)
|
2021-04-27 00:05:43 +00:00
|
|
|
if (fs::exists(current_batch_file_path))
|
|
|
|
fs::remove(current_batch_file_path);
|
2021-01-09 12:42:21 +00:00
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2021-03-02 20:18:53 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path)
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
|
|
|
const auto last_path_separator_pos = file_path.rfind('/');
|
2019-01-04 12:10:00 +00:00
|
|
|
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
|
2018-03-27 18:59:53 +00:00
|
|
|
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
|
2021-05-09 11:59:49 +00:00
|
|
|
const String & broken_path = fs::path(base_path) / "broken/";
|
|
|
|
const String & broken_file_path = fs::path(broken_path) / file_name;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::create_directory(broken_path);
|
2021-01-09 12:42:21 +00:00
|
|
|
|
2021-01-26 13:29:45 +00:00
|
|
|
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
|
2021-05-09 11:59:49 +00:00
|
|
|
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, fs::path(relative_path) / "broken/");
|
2021-01-09 12:42:21 +00:00
|
|
|
|
2021-01-26 18:45:37 +00:00
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2021-01-26 18:45:37 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
size_t file_size = fs::file_size(file_path);
|
2021-05-04 19:16:36 +00:00
|
|
|
|
|
|
|
--status.files_count;
|
|
|
|
status.bytes_count -= file_size;
|
|
|
|
|
|
|
|
++status.broken_files_count;
|
|
|
|
status.broken_bytes_count += file_size;
|
2021-05-04 19:42:07 +00:00
|
|
|
|
|
|
|
metric_broken_files.add();
|
2021-01-26 18:45:37 +00:00
|
|
|
}
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::rename(file_path, broken_file_path);
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_ERROR(log, "Renamed `{}` to `{}`", file_path, broken_file_path);
|
2018-03-27 18:59:53 +00:00
|
|
|
}
|
2021-04-27 00:05:43 +00:00
|
|
|
|
2021-03-02 20:18:53 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_path)
|
2021-01-26 18:45:37 +00:00
|
|
|
{
|
2021-04-27 00:05:43 +00:00
|
|
|
size_t file_size = fs::file_size(file_path);
|
2021-01-27 05:02:04 +00:00
|
|
|
|
2021-01-26 18:45:37 +00:00
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2021-01-27 05:02:04 +00:00
|
|
|
metric_pending_files.sub();
|
2021-05-04 19:16:36 +00:00
|
|
|
--status.files_count;
|
|
|
|
status.bytes_count -= file_size;
|
2021-01-26 18:45:37 +00:00
|
|
|
}
|
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
fs::remove(file_path);
|
2021-01-26 18:45:37 +00:00
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2021-03-02 20:18:53 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
|
|
|
/// mark file as broken if necessary
|
2021-01-19 19:22:58 +00:00
|
|
|
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
|
2018-03-27 18:59:53 +00:00
|
|
|
{
|
|
|
|
markAsBroken(file_path);
|
2017-07-10 15:28:04 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return false;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
|
|
|
|
{
|
2019-12-03 16:25:32 +00:00
|
|
|
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2021-01-09 12:26:37 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_relative_path)
|
2019-12-19 19:39:49 +00:00
|
|
|
{
|
2020-04-14 18:12:08 +00:00
|
|
|
task_handle->deactivate();
|
2020-08-20 13:36:22 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2020-04-14 18:12:08 +00:00
|
|
|
|
2020-06-03 08:22:48 +00:00
|
|
|
{
|
2021-05-04 19:16:36 +00:00
|
|
|
std::lock_guard status_lock(status_mutex);
|
2021-01-09 12:26:37 +00:00
|
|
|
relative_path = new_relative_path;
|
2021-05-08 10:59:55 +00:00
|
|
|
path = fs::path(disk->getPath()) / relative_path / "";
|
2020-06-03 08:22:48 +00:00
|
|
|
}
|
2019-12-19 19:39:49 +00:00
|
|
|
current_batch_file_path = path + "current_batch.txt";
|
2020-04-14 18:12:08 +00:00
|
|
|
|
|
|
|
task_handle->activateAndSchedule();
|
2019-12-19 19:39:49 +00:00
|
|
|
}
|
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|