2017-04-01 09:19:00 +00:00
|
|
|
#include <DataStreams/RemoteBlockOutputStream.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <DataStreams/NativeBlockInputStream.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/escapeForFileName.h>
|
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
#include <Common/CurrentMetrics.h>
|
2018-01-15 19:07:47 +00:00
|
|
|
#include <Common/StringUtils/StringUtils.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <Common/ClickHouseRevision.h>
|
2017-07-21 12:03:37 +00:00
|
|
|
#include <Common/SipHash.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Interpreters/Context.h>
|
|
|
|
#include <Storages/Distributed/DirectoryMonitor.h>
|
|
|
|
#include <IO/ReadBufferFromFile.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <IO/WriteBufferFromFile.h>
|
2018-12-28 18:15:26 +00:00
|
|
|
#include <Compression/CompressedReadBuffer.h>
|
2017-07-10 15:28:04 +00:00
|
|
|
#include <IO/Operators.h>
|
2016-12-12 03:33:34 +00:00
|
|
|
|
|
|
|
#include <boost/algorithm/string/find_iterator.hpp>
|
|
|
|
#include <boost/algorithm/string/finder.hpp>
|
|
|
|
|
|
|
|
#include <Poco/DirectoryIterator.h>
|
|
|
|
|
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric DistributedSend;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int INCORRECT_FILE_NAME;
|
|
|
|
extern const int CHECKSUM_DOESNT_MATCH;
|
|
|
|
extern const int TOO_LARGE_SIZE_COMPRESSED;
|
2017-06-22 00:56:17 +00:00
|
|
|
extern const int ATTEMPT_TO_READ_AFTER_EOF;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
static constexpr const std::chrono::seconds max_sleep_time{30};
|
|
|
|
static constexpr const std::chrono::minutes decrease_error_count_period{5};
|
|
|
|
|
|
|
|
template <typename PoolFactory>
|
2017-04-19 17:40:55 +00:00
|
|
|
ConnectionPoolPtrs createPoolsForAddresses(const std::string & name, PoolFactory && factory)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-04-19 17:40:55 +00:00
|
|
|
ConnectionPoolPtrs pools;
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it)
|
|
|
|
{
|
2019-01-21 19:45:26 +00:00
|
|
|
Cluster::Address address = Cluster::Address::fromFullString(boost::copy_range<std::string>(*it));
|
2018-12-02 02:17:08 +00:00
|
|
|
pools.emplace_back(factory(address));
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return pools;
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2017-08-10 03:45:57 +00:00
|
|
|
StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(StorageDistributed & storage, const std::string & name, const ConnectionPoolPtr & pool)
|
2017-07-27 15:24:39 +00:00
|
|
|
: storage(storage), pool{pool}, path{storage.path + name + '/'}
|
2017-07-10 15:28:04 +00:00
|
|
|
, current_batch_file_path{path + "current_batch.txt"}
|
2019-01-04 12:10:00 +00:00
|
|
|
, default_sleep_time{storage.global_context.getSettingsRef().distributed_directory_monitor_sleep_time_ms.totalMilliseconds()}
|
2017-04-01 07:20:54 +00:00
|
|
|
, sleep_time{default_sleep_time}
|
|
|
|
, log{&Logger::get(getLoggerName())}
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
const Settings & settings = storage.global_context.getSettingsRef();
|
2017-07-10 15:28:04 +00:00
|
|
|
should_batch_inserts = settings.distributed_directory_monitor_batch_inserts;
|
|
|
|
min_batched_block_size_rows = settings.min_insert_block_size_rows;
|
|
|
|
min_batched_block_size_bytes = settings.min_insert_block_size_bytes;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
|
|
|
|
{
|
2018-04-21 00:35:20 +00:00
|
|
|
if (!quit)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-04-21 00:35:20 +00:00
|
|
|
{
|
|
|
|
quit = true;
|
2019-01-02 06:44:36 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
|
|
|
cond.notify_one();
|
|
|
|
thread.join();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
|
|
|
|
{
|
|
|
|
if (!quit)
|
|
|
|
{
|
|
|
|
{
|
|
|
|
quit = true;
|
2019-01-02 06:44:36 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2018-04-21 00:35:20 +00:00
|
|
|
}
|
|
|
|
cond.notify_one();
|
|
|
|
thread.join();
|
|
|
|
}
|
|
|
|
|
|
|
|
Poco::File(path).remove(true);
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::run()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
setThreadName("DistrDirMonitor");
|
|
|
|
|
2019-01-02 06:44:36 +00:00
|
|
|
std::unique_lock lock{mutex};
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-09-09 23:07:32 +00:00
|
|
|
const auto quit_requested = [this] { return quit.load(std::memory_order_relaxed); };
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
while (!quit_requested())
|
|
|
|
{
|
|
|
|
auto do_sleep = true;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
|
|
|
do_sleep = !findFiles();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
do_sleep = true;
|
|
|
|
++error_count;
|
|
|
|
sleep_time = std::min(
|
|
|
|
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
|
|
|
|
std::chrono::milliseconds{max_sleep_time});
|
|
|
|
tryLogCurrentException(getLoggerName().data());
|
2019-01-04 14:18:49 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (do_sleep)
|
|
|
|
cond.wait_for(lock, sleep_time, quit_requested);
|
|
|
|
|
|
|
|
const auto now = std::chrono::system_clock::now();
|
|
|
|
if (now - last_decrease_time > decrease_error_count_period)
|
|
|
|
{
|
|
|
|
error_count /= 2;
|
|
|
|
last_decrease_time = now;
|
|
|
|
}
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2017-07-27 15:24:39 +00:00
|
|
|
ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage)
|
2016-12-12 03:33:34 +00:00
|
|
|
{
|
2019-01-04 12:10:00 +00:00
|
|
|
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef());
|
2019-01-18 02:04:16 +00:00
|
|
|
const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-12-02 02:17:08 +00:00
|
|
|
const auto & cluster = storage.getCluster();
|
|
|
|
const auto & shards_info = cluster->getShardsInfo();
|
|
|
|
const auto & shards_addresses = cluster->getShardsAddresses();
|
2018-12-01 17:25:33 +00:00
|
|
|
|
|
|
|
/// existing connections pool have a higher priority
|
|
|
|
for (size_t shard_index = 0; shard_index < shards_info.size(); ++shard_index)
|
|
|
|
{
|
2018-12-02 02:17:08 +00:00
|
|
|
const Cluster::Addresses & replicas_addresses = shards_addresses[shard_index];
|
2018-12-01 17:25:33 +00:00
|
|
|
|
|
|
|
for (size_t replica_index = 0; replica_index < replicas_addresses.size(); ++replica_index)
|
|
|
|
{
|
2018-12-02 02:17:08 +00:00
|
|
|
const Cluster::Address & replica_address = replicas_addresses[replica_index];
|
2018-12-01 17:25:33 +00:00
|
|
|
|
2018-12-02 02:17:08 +00:00
|
|
|
if (address == replica_address)
|
2018-12-01 17:25:33 +00:00
|
|
|
return shards_info[shard_index].per_replica_pools[replica_index];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-02 02:17:08 +00:00
|
|
|
return std::make_shared<ConnectionPool>(
|
|
|
|
1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts,
|
|
|
|
storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure);
|
2017-04-01 07:20:54 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
auto pools = createPoolsForAddresses(name, pool_factory);
|
|
|
|
|
|
|
|
return pools.size() == 1 ? pools.front() : std::make_shared<ConnectionPoolWithFailover>(pools, LoadBalancing::RANDOM);
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
2017-02-13 10:38:50 +00:00
|
|
|
|
2016-12-12 03:33:34 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::findFiles()
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::map<UInt64, std::string> files;
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
Poco::DirectoryIterator end;
|
|
|
|
for (Poco::DirectoryIterator it{path}; it != end; ++it)
|
|
|
|
{
|
|
|
|
const auto & file_path_str = it->path();
|
|
|
|
Poco::Path file_path{file_path_str};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (!it->isDirectory() && startsWith(file_path.getExtension().data(), "bin"))
|
|
|
|
files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
if (files.empty())
|
|
|
|
return false;
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
if (should_batch_inserts)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-07-10 15:28:04 +00:00
|
|
|
processFilesWithBatching(files);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
for (const auto & file : files)
|
|
|
|
{
|
|
|
|
if (quit)
|
|
|
|
return true;
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
processFile(file.second);
|
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
return true;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
LOG_TRACE(log, "Started processing `" << file_path << '`');
|
|
|
|
auto connection = pool->get();
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
ReadBufferFromFile in{file_path};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2019-04-08 10:04:26 +00:00
|
|
|
Settings insert_settings;
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string insert_query;
|
2019-04-08 10:04:26 +00:00
|
|
|
readQueryAndSettings(in, insert_settings, insert_query);
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2019-04-08 10:04:26 +00:00
|
|
|
RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
remote.writePrefix();
|
|
|
|
remote.writePrepared(in);
|
|
|
|
remote.writeSuffix();
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
2017-07-10 15:28:04 +00:00
|
|
|
maybeMarkAsBroken(file_path, e);
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
|
|
|
Poco::File{file_path}.remove();
|
|
|
|
|
|
|
|
LOG_TRACE(log, "Finished processing `" << file_path << '`');
|
|
|
|
}
|
|
|
|
|
2019-04-08 10:04:26 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::readQueryAndSettings(
|
|
|
|
ReadBuffer & in, Settings & insert_settings, std::string & insert_query) const
|
|
|
|
{
|
|
|
|
UInt64 magic_number_or_query_size;
|
|
|
|
|
|
|
|
readVarUInt(magic_number_or_query_size, in);
|
|
|
|
|
|
|
|
if (magic_number_or_query_size == UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER))
|
|
|
|
{
|
|
|
|
insert_settings.deserialize(in);
|
|
|
|
readVarUInt(magic_number_or_query_size, in);
|
|
|
|
}
|
|
|
|
insert_query.resize(magic_number_or_query_size);
|
|
|
|
in.readStrict(insert_query.data(), magic_number_or_query_size);
|
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
struct StorageDistributedDirectoryMonitor::BatchHeader
|
|
|
|
{
|
2019-04-08 10:04:26 +00:00
|
|
|
Settings settings;
|
2017-07-21 12:03:37 +00:00
|
|
|
String query;
|
|
|
|
Block sample_block;
|
|
|
|
|
2019-04-08 10:04:26 +00:00
|
|
|
BatchHeader(Settings settings_, String query_, Block sample_block_)
|
|
|
|
: settings(std::move(settings_))
|
|
|
|
, query(std::move(query_))
|
2017-07-21 12:03:37 +00:00
|
|
|
, sample_block(std::move(sample_block_))
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
bool operator==(const BatchHeader & other) const
|
|
|
|
{
|
2019-04-08 10:04:26 +00:00
|
|
|
return settings == other.settings && query == other.query &&
|
|
|
|
blocksHaveEqualStructure(sample_block, other.sample_block);
|
2017-07-21 12:03:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
struct Hash
|
|
|
|
{
|
|
|
|
size_t operator()(const BatchHeader & batch_header) const
|
|
|
|
{
|
|
|
|
SipHash hash_state;
|
|
|
|
hash_state.update(batch_header.query.data(), batch_header.query.size());
|
|
|
|
|
|
|
|
size_t num_columns = batch_header.sample_block.columns();
|
|
|
|
for (size_t i = 0; i < num_columns; ++i)
|
|
|
|
{
|
|
|
|
const String & type_name = batch_header.sample_block.getByPosition(i).type->getName();
|
|
|
|
hash_state.update(type_name.data(), type_name.size());
|
|
|
|
}
|
|
|
|
|
|
|
|
return hash_state.get64();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
struct StorageDistributedDirectoryMonitor::Batch
|
|
|
|
{
|
|
|
|
std::vector<UInt64> file_indices;
|
|
|
|
size_t total_rows = 0;
|
|
|
|
size_t total_bytes = 0;
|
2018-03-27 18:59:53 +00:00
|
|
|
bool recovered = false;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
StorageDistributedDirectoryMonitor & parent;
|
2018-03-27 18:59:53 +00:00
|
|
|
const std::map<UInt64, String> & file_index_to_path;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch(
|
|
|
|
StorageDistributedDirectoryMonitor & parent_,
|
|
|
|
const std::map<UInt64, String> & file_index_to_path_)
|
|
|
|
: parent(parent_), file_index_to_path(file_index_to_path_)
|
|
|
|
{}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
bool isEnoughSize() const
|
|
|
|
{
|
|
|
|
return (!parent.min_batched_block_size_rows && !parent.min_batched_block_size_bytes)
|
|
|
|
|| (parent.min_batched_block_size_rows && total_rows >= parent.min_batched_block_size_rows)
|
|
|
|
|| (parent.min_batched_block_size_bytes && total_bytes >= parent.min_batched_block_size_bytes);
|
|
|
|
}
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
void send()
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
if (file_indices.empty())
|
|
|
|
return;
|
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
if (!recovered)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2017-07-10 15:28:04 +00:00
|
|
|
/// For deduplication in Replicated tables to work, in case of error
|
|
|
|
/// we must try to re-send exactly the same batches.
|
|
|
|
/// So we save contents of the current batch into the current_batch_file_path file
|
|
|
|
/// and truncate it afterwards if all went well.
|
|
|
|
WriteBufferFromFile out{parent.current_batch_file_path};
|
|
|
|
writeText(out);
|
|
|
|
}
|
|
|
|
|
|
|
|
auto connection = parent.pool->get();
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
bool batch_broken = false;
|
|
|
|
try
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2019-04-08 10:04:26 +00:00
|
|
|
Settings insert_settings;
|
2018-03-27 18:59:53 +00:00
|
|
|
String insert_query;
|
|
|
|
std::unique_ptr<RemoteBlockOutputStream> remote;
|
|
|
|
bool first = true;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
for (UInt64 file_idx : file_indices)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2018-03-27 18:59:53 +00:00
|
|
|
auto file_path = file_index_to_path.find(file_idx);
|
|
|
|
if (file_path == file_index_to_path.end())
|
|
|
|
{
|
|
|
|
LOG_ERROR(parent.log, "Failed to send batch: file with index " << file_idx << " is absent");
|
|
|
|
batch_broken = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
ReadBufferFromFile in(file_path->second);
|
2019-04-08 10:04:26 +00:00
|
|
|
parent.readQueryAndSettings(in, insert_settings, insert_query);
|
2018-03-27 18:59:53 +00:00
|
|
|
|
|
|
|
if (first)
|
|
|
|
{
|
|
|
|
first = false;
|
2019-04-08 10:04:26 +00:00
|
|
|
remote = std::make_unique<RemoteBlockOutputStream>(*connection, insert_query, &insert_settings);
|
2018-03-27 18:59:53 +00:00
|
|
|
remote->writePrefix();
|
|
|
|
}
|
|
|
|
|
|
|
|
remote->writePrepared(in);
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
remote->writeSuffix();
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (isFileBrokenErrorCode(e.code()))
|
|
|
|
{
|
|
|
|
tryLogCurrentException(parent.log, "Failed to send batch due to");
|
|
|
|
batch_broken = true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
throw;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
if (!batch_broken)
|
|
|
|
{
|
|
|
|
LOG_TRACE(parent.log, "Sent a batch of " << file_indices.size() << " files.");
|
|
|
|
|
|
|
|
for (UInt64 file_index : file_indices)
|
|
|
|
Poco::File{file_index_to_path.at(file_index)}.remove();
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_ERROR(parent.log, "Marking a batch of " << file_indices.size() << " files as broken.");
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
{
|
|
|
|
auto file_path = file_index_to_path.find(file_idx);
|
|
|
|
if (file_path != file_index_to_path.end())
|
|
|
|
parent.markAsBroken(file_path->second);
|
|
|
|
}
|
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
file_indices.clear();
|
|
|
|
total_rows = 0;
|
|
|
|
total_bytes = 0;
|
2018-03-27 17:46:53 +00:00
|
|
|
recovered = false;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
Poco::File{parent.current_batch_file_path}.setSize(0);
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
void writeText(WriteBuffer & out)
|
|
|
|
{
|
|
|
|
for (UInt64 file_idx : file_indices)
|
|
|
|
out << file_idx << '\n';
|
|
|
|
}
|
2016-12-12 03:33:34 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
void readText(ReadBuffer & in)
|
|
|
|
{
|
|
|
|
while (!in.eof())
|
|
|
|
{
|
|
|
|
UInt64 idx;
|
|
|
|
in >> idx >> "\n";
|
|
|
|
file_indices.push_back(idx);
|
|
|
|
}
|
2018-03-27 18:59:53 +00:00
|
|
|
recovered = true;
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2018-04-21 00:35:20 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
|
|
|
|
{
|
|
|
|
std::unordered_set<UInt64> file_indices_to_skip;
|
|
|
|
|
|
|
|
if (Poco::File{current_batch_file_path}.exists())
|
|
|
|
{
|
|
|
|
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch batch(*this, files);
|
2017-07-10 15:28:04 +00:00
|
|
|
ReadBufferFromFile in{current_batch_file_path};
|
|
|
|
batch.readText(in);
|
|
|
|
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
|
|
|
for (const auto & file : files)
|
|
|
|
{
|
|
|
|
if (quit)
|
|
|
|
return;
|
|
|
|
|
|
|
|
UInt64 file_idx = file.first;
|
|
|
|
const String & file_path = file.second;
|
|
|
|
|
|
|
|
if (file_indices_to_skip.count(file_idx))
|
|
|
|
continue;
|
|
|
|
|
|
|
|
size_t total_rows = 0;
|
|
|
|
size_t total_bytes = 0;
|
2017-07-21 12:03:37 +00:00
|
|
|
Block sample_block;
|
2019-04-08 10:04:26 +00:00
|
|
|
Settings insert_settings;
|
2018-03-27 17:46:53 +00:00
|
|
|
String insert_query;
|
2017-07-10 15:28:04 +00:00
|
|
|
try
|
|
|
|
{
|
2018-03-27 17:46:53 +00:00
|
|
|
/// Determine metadata of the current file and check if it is not broken.
|
|
|
|
ReadBufferFromFile in{file_path};
|
2019-04-08 10:04:26 +00:00
|
|
|
insert_settings.deserialize(in);
|
2018-03-27 17:46:53 +00:00
|
|
|
readStringBinary(insert_query, in);
|
|
|
|
|
|
|
|
CompressedReadBuffer decompressing_in(in);
|
|
|
|
NativeBlockInputStream block_in(decompressing_in, ClickHouseRevision::get());
|
|
|
|
block_in.readPrefix();
|
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
while (Block block = block_in.read())
|
|
|
|
{
|
|
|
|
total_rows += block.rows();
|
|
|
|
total_bytes += block.bytes();
|
2017-07-21 12:03:37 +00:00
|
|
|
|
|
|
|
if (!sample_block)
|
|
|
|
sample_block = block.cloneEmpty();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
2018-03-27 17:46:53 +00:00
|
|
|
block_in.readSuffix();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (maybeMarkAsBroken(file_path, e))
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log, "File is marked broken due to");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
|
2019-04-08 10:04:26 +00:00
|
|
|
BatchHeader batch_header(std::move(insert_settings), std::move(insert_query), std::move(sample_block));
|
2018-03-27 18:59:53 +00:00
|
|
|
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
|
2017-07-21 12:03:37 +00:00
|
|
|
|
2017-07-10 15:28:04 +00:00
|
|
|
batch.file_indices.push_back(file_idx);
|
|
|
|
batch.total_rows += total_rows;
|
|
|
|
batch.total_bytes += total_bytes;
|
|
|
|
|
|
|
|
if (batch.isEnoughSize())
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
2017-07-21 12:03:37 +00:00
|
|
|
for (auto & kv : header_to_batch)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
|
|
|
Batch & batch = kv.second;
|
2018-03-27 18:59:53 +00:00
|
|
|
batch.send();
|
2017-07-10 15:28:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Poco::File{current_batch_file_path}.remove();
|
|
|
|
}
|
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::isFileBrokenErrorCode(int code)
|
2017-07-10 15:28:04 +00:00
|
|
|
{
|
2018-03-27 18:59:53 +00:00
|
|
|
return code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|
2017-07-10 15:28:04 +00:00
|
|
|
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|
|
|
|
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|
2018-03-27 18:59:53 +00:00
|
|
|
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF;
|
|
|
|
}
|
|
|
|
|
|
|
|
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path) const
|
|
|
|
{
|
|
|
|
const auto last_path_separator_pos = file_path.rfind('/');
|
2019-01-04 12:10:00 +00:00
|
|
|
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
|
2018-03-27 18:59:53 +00:00
|
|
|
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
|
2019-01-04 12:10:00 +00:00
|
|
|
const auto & broken_path = base_path + "broken/";
|
2018-03-27 18:59:53 +00:00
|
|
|
const auto & broken_file_path = broken_path + file_name;
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
Poco::File{broken_path}.createDirectory();
|
|
|
|
Poco::File{file_path}.renameTo(broken_file_path);
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
LOG_ERROR(log, "Renamed `" << file_path << "` to `" << broken_file_path << '`');
|
|
|
|
}
|
2017-07-10 15:28:04 +00:00
|
|
|
|
2018-03-27 18:59:53 +00:00
|
|
|
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e) const
|
|
|
|
{
|
|
|
|
/// mark file as broken if necessary
|
|
|
|
if (isFileBrokenErrorCode(e.code()))
|
|
|
|
{
|
|
|
|
markAsBroken(file_path);
|
2017-07-10 15:28:04 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
return false;
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
|
|
|
|
{
|
2018-03-06 20:18:34 +00:00
|
|
|
return storage.table_name + '.' + storage.getName() + ".DirectoryMonitor";
|
2016-12-12 03:33:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|