ClickHouse/dbms/src/Storages/Distributed/DirectoryMonitor.cpp

659 lines
21 KiB
C++
Raw Normal View History

#include <DataStreams/RemoteBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/ClickHouseRevision.h>
#include <Common/SipHash.h>
2019-11-05 19:31:07 +00:00
#include <Common/quoteString.h>
#include <Common/hex.h>
#include <common/StringRef.h>
#include <Interpreters/Context.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <IO/ReadBufferFromFile.h>
2019-11-16 23:12:35 +00:00
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
2018-12-28 18:15:26 +00:00
#include <Compression/CompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
2016-12-12 03:33:34 +00:00
#include <boost/algorithm/string/find_iterator.hpp>
#include <boost/algorithm/string/finder.hpp>
#include <Poco/DirectoryIterator.h>
namespace CurrentMetrics
{
extern const Metric DistributedSend;
extern const Metric DistributedFilesToInsert;
2016-12-12 03:33:34 +00:00
}
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
extern const int UNKNOWN_CODEC;
extern const int CANNOT_DECOMPRESS;
extern const int INCORRECT_FILE_NAME;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CORRUPTED_DATA;
2016-12-12 03:33:34 +00:00
}
namespace
{
static constexpr const std::chrono::minutes decrease_error_count_period{5};
template <typename PoolFactory>
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory)
{
ConnectionPoolPtrs pools;
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
{
2019-01-21 19:45:26 +00:00
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
2018-12-02 02:17:08 +00:00
pools.emplace_back(factory(address));
}
return pools;
}
void assertChecksum(CityHash_v1_0_2::uint128 expected, CityHash_v1_0_2::uint128 calculated)
{
if (expected != calculated)
{
String message = "Checksum of extra info doesn't match: corrupted data."
" Reference: " + getHexUIntLowercase(expected.first) + getHexUIntLowercase(expected.second)
+ ". Actual: " + getHexUIntLowercase(calculated.first) + getHexUIntLowercase(calculated.second)
+ ".";
throw Exception(message, ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}
2016-12-12 03:33:34 +00:00
}
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
2019-08-03 11:02:40 +00:00
StorageDistributed & storage_, const std::string & name_, const ConnectionPoolPtr & pool_, ActionBlocker & monitor_blocker_)
: storage(storage_), pool{pool_}, path{storage.path + name_ + '/'}
, current_batch_file_path{path + "current_batch.txt"}
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
, sleep_time{default_sleep_time}
, max_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_max_sleep_time_ms.totalMilliseconds()}
, log{&Logger::get(getLoggerName())}
2019-08-03 11:02:40 +00:00
, monitor_blocker(monitor_blocker_)
2016-12-12 03:33:34 +00:00
{
const Settings & settings = storage.global_context.getSettingsRef();
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
min_batched_block_size_rows = settings.min_insert_block_size_rows;
min_batched_block_size_bytes = settings.min_insert_block_size_bytes;
2016-12-12 03:33:34 +00:00
}
2017-02-13 10:38:50 +00:00
2016-12-12 03:33:34 +00:00
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
2018-04-21 00:35:20 +00:00
if (!quit)
{
2018-04-21 00:35:20 +00:00
{
quit = true;
2019-01-02 06:44:36 +00:00
std::lock_guard lock{mutex};
2018-04-21 00:35:20 +00:00
}
cond.notify_one();
thread.join();
}
2018-04-21 00:35:20 +00:00
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
2019-04-22 15:11:16 +00:00
if (!quit)
{
std::unique_lock lock{mutex};
processFiles();
2019-04-22 15:11:16 +00:00
}
}
2018-04-21 00:35:20 +00:00
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
if (!quit)
{
{
quit = true;
2019-01-02 06:44:36 +00:00
std::lock_guard lock{mutex};
2018-04-21 00:35:20 +00:00
}
cond.notify_one();
thread.join();
}
Poco::File(path).remove(true);
2016-12-12 03:33:34 +00:00
}
2017-02-13 10:38:50 +00:00
2016-12-12 03:33:34 +00:00
void StorageDistributedDirectoryMonitor::run()
{
setThreadName("DistrDirMonitor");
2019-01-02 06:44:36 +00:00
std::unique_lock lock{mutex};
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
while (!quit_requested())
{
auto do_sleep = true;
if (!monitor_blocker.isCancelled())
{
try
{
do_sleep = !processFiles();
}
catch (...)
{
do_sleep = true;
++error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
max_sleep_time);
tryLogCurrentException(getLoggerName().data());
}
}
2019-04-22 15:11:16 +00:00
else
{
2019-04-22 15:11:16 +00:00
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
if (do_sleep)
cond.wait_for(lock, sleep_time, quit_requested);
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
error_count /= 2;
last_decrease_time = now;
}
}
2016-12-12 03:33:34 +00:00
}
2017-02-13 10:38:50 +00:00
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
2016-12-12 03:33:34 +00:00
{
const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr
{
2018-12-02 02:17:08 +00:00
const auto & cluster = storage.getCluster();
const auto & shards_info = cluster->getShardsInfo();
const auto & shards_addresses = cluster->getShardsAddresses();
/// existing connections pool have a higher priority
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
{
2018-12-02 02:17:08 +00:00
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
{
2018-12-02 02:17:08 +00:00
const Cluster::Address & replica_address = replicas_addresses[replica_index];
2018-12-02 02:17:08 +00:00
if (address == replica_address)
return shards_info[shard_index].per_replica_pools[replica_index];
}
}
2018-12-02 02:17:08 +00:00
return std::make_shared<ConnectionPool>(
1, address.host_name, address.port, address.default_database, address.user, address.password,
2018-12-02 02:17:08 +00:00
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
};
auto pools = createPoolsForAddresses(name, pool_factory);
const auto settings = storage.global_context.getSettings();
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
2016-12-12 03:33:34 +00:00
}
2017-02-13 10:38:50 +00:00
bool StorageDistributedDirectoryMonitor::processFiles()
2016-12-12 03:33:34 +00:00
{
std::map<UInt64, std::string> files;
2016-12-12 03:33:34 +00:00
Poco::DirectoryIterator end;
for (Poco::DirectoryIterator it{path}; it != end; ++it)
{
const auto & file_path_str = it->path();
Poco::Path file_path{file_path_str};
2016-12-12 03:33:34 +00:00
if (!it->isDirectory() && startsWith(file_path.getExtension().data(), "bin"))
files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
}
2016-12-12 03:33:34 +00:00
if (files.empty())
return false;
2016-12-12 03:33:34 +00:00
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedFilesToInsert, CurrentMetrics::Value(files.size())};
if (should_batch_inserts)
{
processFilesWithBatching(files);
}
else
{
for (const auto & file : files)
{
if (quit)
return true;
2016-12-12 03:33:34 +00:00
processFile(file.second);
}
}
2016-12-12 03:33:34 +00:00
return true;
2016-12-12 03:33:34 +00:00
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
{
LOG_TRACE(log, "Started processing `" << file_path << '`');
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
auto connection = pool->get(timeouts);
2016-12-12 03:33:34 +00:00
try
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
2016-12-12 03:33:34 +00:00
ReadBufferFromFile in{file_path};
2016-12-12 03:33:34 +00:00
Settings insert_settings;
std::string insert_query;
readQueryAndSettings(in, insert_settings, insert_query);
2016-12-12 03:33:34 +00:00
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings};
2016-12-12 03:33:34 +00:00
remote.writePrefix();
remote.writePrepared(in);
remote.writeSuffix();
}
catch (const Exception & e)
{
maybeMarkAsBroken(file_path, e);
throw;
}
Poco::File{file_path}.remove();
LOG_TRACE(log, "Finished processing `" << file_path << '`');
}
void StorageDistributedDirectoryMonitor::readQueryAndSettings(
ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
{
2019-11-16 23:12:35 +00:00
UInt64 query_size;
readVarUInt(query_size, in);
2019-11-16 23:12:35 +00:00
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_EXTRA_INFO)
{
UInt64 initiator_revision;
CityHash_v1_0_2::uint128 expected;
CityHash_v1_0_2::uint128 calculated;
2019-11-16 23:12:35 +00:00
/// Read extra information.
String extra_info_as_string;
readStringBinary(extra_info_as_string, in);
/// To avoid out-of-bound, other cases will be checked in read*() helpers.
if (extra_info_as_string.size() < sizeof(expected))
throw Exception("Not enough data", ErrorCodes::CORRUPTED_DATA);
StringRef extra_info_ref(extra_info_as_string.data(), extra_info_as_string.size() - sizeof(expected));
ReadBufferFromMemory extra_info(extra_info_ref.data, extra_info_ref.size);
ReadBuffer checksum(extra_info_as_string.data(), sizeof(expected), extra_info_ref.size);
2019-11-16 23:12:35 +00:00
readVarUInt(initiator_revision, extra_info);
if (ClickHouseRevision::get() < initiator_revision)
{
LOG_WARNING(
log,
"ClickHouse shard version is older than ClickHouse initiator version. "
<< "It may lack support for new features.");
}
/// Extra checksum (all data except itself -- this checksum)
readPODBinary(expected, checksum);
calculated = CityHash_v1_0_2::CityHash128(extra_info_ref.data, extra_info_ref.size);
assertChecksum(expected, calculated);
2019-11-16 23:12:35 +00:00
insert_settings.deserialize(extra_info);
/// Read query
readStringBinary(insert_query, in);
/// Query checksum
readPODBinary(expected, extra_info);
calculated = CityHash_v1_0_2::CityHash128(insert_query.data(), insert_query.size());
assertChecksum(expected, calculated);
2019-11-16 23:12:35 +00:00
/// Add handling new data here, for example:
/// if (initiator_revision >= DBMS_MIN_REVISION_WITH_MY_NEW_DATA)
/// readVarUInt(my_new_data, extra_info);
return;
2019-11-16 23:12:35 +00:00
}
if (query_size == DBMS_DISTRIBUTED_SIGNATURE_SETTINGS_OLD_FORMAT)
{
2019-11-16 23:12:35 +00:00
insert_settings.deserialize(in, SettingsBinaryFormat::OLD);
readVarUInt(query_size, in);
}
2019-11-16 23:12:35 +00:00
insert_query.resize(query_size);
in.readStrict(insert_query.data(), query_size);
}
struct StorageDistributedDirectoryMonitor::BatchHeader
{
Settings settings;
String query;
Block sample_block;
BatchHeader(Settings settings_, String query_, Block sample_block_)
: settings(std::move(settings_))
, query(std::move(query_))
, sample_block(std::move(sample_block_))
{
}
bool operator==(const BatchHeader & other) const
{
return settings == other.settings && query == other.query &&
blocksHaveEqualStructure(sample_block, other.sample_block);
}
struct Hash
{
size_t operator()(const BatchHeader & batch_header) const
{
SipHash hash_state;
hash_state.update(batch_header.query.data(), batch_header.query.size());
size_t num_columns = batch_header.sample_block.columns();
for (size_t i = 0; i < num_columns; ++i)
{
const String & type_name = batch_header.sample_block.getByPosition(i).type->getName();
hash_state.update(type_name.data(), type_name.size());
}
return hash_state.get64();
}
};
};
struct StorageDistributedDirectoryMonitor::Batch
{
std::vector<UInt64> file_indices;
size_t total_rows = 0;
size_t total_bytes = 0;
bool recovered = false;
StorageDistributedDirectoryMonitor & parent;
const std::map<UInt64, String> & file_index_to_path;
Batch(
StorageDistributedDirectoryMonitor & parent_,
const std::map<UInt64, String> & file_index_to_path_)
: parent(parent_), file_index_to_path(file_index_to_path_)
{}
bool isEnoughSize() const
{
return (!parent.min_batched_block_size_rows && !parent.min_batched_block_size_bytes)
|| (parent.min_batched_block_size_rows && total_rows >= parent.min_batched_block_size_rows)
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
}
void send()
{
if (file_indices.empty())
return;
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
2016-12-12 03:33:34 +00:00
if (!recovered)
{
/// For deduplication in Replicated tables to work, in case of error
/// we must try to re-send exactly the same batches.
/// So we save contents of the current batch into the current_batch_file_path file
/// and truncate it afterwards if all went well.
Write current batch for distributed send atomically (using .tmp + rename) Otherwise the following can happen after reboot: 2019.11.01 11:46:12.217143 [ 187 ] {} <Error> dist.Distributed.DirectoryMonitor: Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected \n before: S\'^A\0^]\0\0<BE>4^A\0r<87>\0\0<A2><D7>^D^Y\0<F2>{^E<CD>\0\0Hy\0\0<F2>^_^C\0^_&\0\0<FF><D3>\0\0 <8D><91>\0\0<C0>9\0\0<C0><B0>^A\0^G<AA>\0\0<B5><FE>^A\0<BF><A7>^A\0<9B><CB>^A\0I^R^A\0<B7><AB>^A\0<BC><8F>\0\0˲^B\0Zy\0\0<94><AA>\0\0<98> <8F>\0\0\f<A5>\0\0^QN\0\0<E3><C6>\0\0<B1>6^B\0ɳ\0\0W<99>\0\0<B9><A2>\0\0:<BB>\0\0)<B1>\0\0#<8B>\0\0aW\0\0<ED>#\0\0<F1>@\0\0ˀ^B\0<D7><FC>\0\0<DF>, Stack trace: 0. 0x559e27222e60 StackTrace::StackTrace() /usr/bin/clickhouse 1. 0x559e27222c45 DB::Exception::Exception(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /usr/bin/clickhouse 2. 0x559e26de4473 ? /usr/bin/clickhouse 3. 0x559e272494b5 DB::assertString(char const*, DB::ReadBuffer&) /usr/bin/clickhouse 4. 0x559e2a5dab45 DB::StorageDistributedDirectoryMonitor::processFilesWithBatching(std::map<unsigned long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<unsigned long>, std::allocator<std::pair<unsigned long const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&) /usr/bin/clickhouse 5. 0x559e2a5db5fa DB::StorageDistributedDirectoryMonitor::processFiles() /usr/bin/clickhouse 6. 0x559e2a5dba78 DB::StorageDistributedDirectoryMonitor::run() /usr/bin/clickhouse 7. 0x559e2a5ddbbc ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::StorageDistributedDirectoryMonitor::*)(), DB::StorageDistributedDirectoryMonitor*>(void (DB::StorageDistributedDirectoryMonitor::*&&)(), DB::StorageDistributedDirectoryMonitor*&&)::{lambda()#1}::operator()() const /usr/bin/clickhouse 8. 0x559e2726b07c ThreadPoolImpl<std::thread>::worker(std::_List_iterator<std::thread>) /usr/bin/clickhouse 9. 0x559e2bbc3640 ? /usr/bin/clickhouse 10. 0x7fbd62b3cfb7 start_thread /lib/x86_64-linux-gnu/libpthread-2.29.so 11. 0x7fbd62a692ef __clone /lib/x86_64-linux-gnu/libc-2.29.so (version 19.17.1.1) v2: remove fsync, to avoid possible stalls (https://github.com/ClickHouse/ClickHouse/pull/7600#discussion_r342010874)
2019-11-03 19:26:46 +00:00
/// Temporary file is required for atomicity.
String tmp_file{parent.current_batch_file_path + ".tmp"};
if (Poco::File{tmp_file}.exists())
2019-11-05 19:31:07 +00:00
LOG_ERROR(parent.log, "Temporary file " << backQuote(tmp_file) << " exists. Unclean shutdown?");
Write current batch for distributed send atomically (using .tmp + rename) Otherwise the following can happen after reboot: 2019.11.01 11:46:12.217143 [ 187 ] {} <Error> dist.Distributed.DirectoryMonitor: Code: 27, e.displayText() = DB::Exception: Cannot parse input: expected \n before: S\'^A\0^]\0\0<BE>4^A\0r<87>\0\0<A2><D7>^D^Y\0<F2>{^E<CD>\0\0Hy\0\0<F2>^_^C\0^_&\0\0<FF><D3>\0\0 <8D><91>\0\0<C0>9\0\0<C0><B0>^A\0^G<AA>\0\0<B5><FE>^A\0<BF><A7>^A\0<9B><CB>^A\0I^R^A\0<B7><AB>^A\0<BC><8F>\0\0˲^B\0Zy\0\0<94><AA>\0\0<98> <8F>\0\0\f<A5>\0\0^QN\0\0<E3><C6>\0\0<B1>6^B\0ɳ\0\0W<99>\0\0<B9><A2>\0\0:<BB>\0\0)<B1>\0\0#<8B>\0\0aW\0\0<ED>#\0\0<F1>@\0\0ˀ^B\0<D7><FC>\0\0<DF>, Stack trace: 0. 0x559e27222e60 StackTrace::StackTrace() /usr/bin/clickhouse 1. 0x559e27222c45 DB::Exception::Exception(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, int) /usr/bin/clickhouse 2. 0x559e26de4473 ? /usr/bin/clickhouse 3. 0x559e272494b5 DB::assertString(char const*, DB::ReadBuffer&) /usr/bin/clickhouse 4. 0x559e2a5dab45 DB::StorageDistributedDirectoryMonitor::processFilesWithBatching(std::map<unsigned long, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<unsigned long>, std::allocator<std::pair<unsigned long const, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&) /usr/bin/clickhouse 5. 0x559e2a5db5fa DB::StorageDistributedDirectoryMonitor::processFiles() /usr/bin/clickhouse 6. 0x559e2a5dba78 DB::StorageDistributedDirectoryMonitor::run() /usr/bin/clickhouse 7. 0x559e2a5ddbbc ThreadFromGlobalPool::ThreadFromGlobalPool<void (DB::StorageDistributedDirectoryMonitor::*)(), DB::StorageDistributedDirectoryMonitor*>(void (DB::StorageDistributedDirectoryMonitor::*&&)(), DB::StorageDistributedDirectoryMonitor*&&)::{lambda()#1}::operator()() const /usr/bin/clickhouse 8. 0x559e2726b07c ThreadPoolImpl<std::thread>::worker(std::_List_iterator<std::thread>) /usr/bin/clickhouse 9. 0x559e2bbc3640 ? /usr/bin/clickhouse 10. 0x7fbd62b3cfb7 start_thread /lib/x86_64-linux-gnu/libpthread-2.29.so 11. 0x7fbd62a692ef __clone /lib/x86_64-linux-gnu/libc-2.29.so (version 19.17.1.1) v2: remove fsync, to avoid possible stalls (https://github.com/ClickHouse/ClickHouse/pull/7600#discussion_r342010874)
2019-11-03 19:26:46 +00:00
{
WriteBufferFromFile out{tmp_file, O_WRONLY | O_TRUNC | O_CREAT};
writeText(out);
}
Poco::File{tmp_file}.renameTo(parent.current_batch_file_path);
}
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef());
auto connection = parent.pool->get(timeouts);
2016-12-12 03:33:34 +00:00
bool batch_broken = false;
try
{
Settings insert_settings;
String insert_query;
std::unique_ptr<RemoteBlockOutputStream> remote;
bool first = true;
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
{
LOG_ERROR(parent.log, "Failed to send batch: file with index " << file_idx << " is absent");
batch_broken = true;
break;
}
ReadBufferFromFile in(file_path->second);
parent.readQueryAndSettings(in, insert_settings, insert_query);
if (first)
{
first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings);
remote->writePrefix();
}
remote->writePrepared(in);
}
if (remote)
remote->writeSuffix();
}
catch (const Exception & e)
{
if (isFileBrokenErrorCode(e.code()))
{
tryLogCurrentException(parent.log, "Failed to send batch due to");
batch_broken = true;
}
else
throw;
}
2016-12-12 03:33:34 +00:00
if (!batch_broken)
{
LOG_TRACE(parent.log, "Sent a batch of " << file_indices.size() << " files.");
for (UInt64 file_index : file_indices)
Poco::File{file_index_to_path.at(file_index)}.remove();
}
else
{
LOG_ERROR(parent.log, "Marking a batch of " << file_indices.size() << " files as broken.");
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path != file_index_to_path.end())
parent.markAsBroken(file_path->second);
}
}
file_indices.clear();
total_rows = 0;
total_bytes = 0;
recovered = false;
Poco::File{parent.current_batch_file_path}.setSize(0);
}
2016-12-12 03:33:34 +00:00
void writeText(WriteBuffer & out)
{
for (UInt64 file_idx : file_indices)
out << file_idx << '\n';
}
2016-12-12 03:33:34 +00:00
void readText(ReadBuffer & in)
{
while (!in.eof())
{
UInt64 idx;
in >> idx >> "\n";
file_indices.push_back(idx);
}
recovered = true;
}
};
2018-04-21 00:35:20 +00:00
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
{
std::unordered_set<UInt64> file_indices_to_skip;
if (Poco::File{current_batch_file_path}.exists())
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
Batch batch(*this, files);
ReadBufferFromFile in{current_batch_file_path};
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
for (const auto & file : files)
{
if (quit)
return;
UInt64 file_idx = file.first;
const String & file_path = file.second;
if (file_indices_to_skip.count(file_idx))
continue;
size_t total_rows = 0;
size_t total_bytes = 0;
Block sample_block;
Settings insert_settings;
String insert_query;
try
{
/// Determine metadata of the current file and check if it is not broken.
ReadBufferFromFile in{file_path};
readQueryAndSettings(in, insert_settings, insert_query);
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
block_in.readPrefix();
while (Block block = block_in.read())
{
total_rows += block.rows();
total_bytes += block.bytes();
if (!sample_block)
sample_block = block.cloneEmpty();
}
block_in.readSuffix();
}
catch (const Exception & e)
{
if (maybeMarkAsBroken(file_path, e))
{
tryLogCurrentException(log, "File is marked broken due to");
continue;
}
else
throw;
}
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.file_indices.push_back(file_idx);
batch.total_rows += total_rows;
batch.total_bytes += total_bytes;
if (batch.isEnoughSize())
batch.send();
}
for (auto & kv : header_to_batch)
{
Batch & batch = kv.second;
batch.send();
}
Poco::File{current_batch_file_path}.remove();
}
bool StorageDistributedDirectoryMonitor::isFileBrokenErrorCode(int code)
{
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::UNKNOWN_CODEC
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF;
}
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const
{
const auto last_path_separator_pos = file_path.rfind('/');
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
const auto & broken_path = base_path + "broken/";
const auto & broken_file_path = broken_path + file_name;
Poco::File{broken_path}.createDirectory();
Poco::File{file_path}.renameTo(broken_file_path);
LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
{
/// mark file as broken if necessary
if (isFileBrokenErrorCode(e.code()))
{
markAsBroken(file_path);
return true;
}
else
return false;
2016-12-12 03:33:34 +00:00
}
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
{
return storage.table_name + '.' + storage.getName() + ".DirectoryMonitor";
2016-12-12 03:33:34 +00:00
}
}