Revert "Revert "Merge pull request #44922 from azat/dist/async-INSERT-metrics""

This is the revert of revert since there will be follow up patches to
address the issues.

This reverts commit a55798626a.
This commit is contained in:
Azat Khuzhin 2023-01-22 13:20:38 +01:00
parent 66634cfd7c
commit 3f892e52ab
6 changed files with 190 additions and 175 deletions

View File

@ -59,6 +59,7 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -363,18 +364,22 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool)
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk)
: storage(storage_)
, pool(std::move(pool_))
, disk(disk_)
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
, broken_relative_path(fs::path(relative_path) / "broken")
, broken_path(fs::path(path) / "broken" / "")
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
, pending_files(std::numeric_limits<size_t>::max())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
@ -383,6 +388,11 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{
fs::create_directory(broken_path);
if (initialize_from_disk)
initializeFilesFromDisk();
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
}
@ -390,35 +400,29 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
if (!quit)
if (!pending_files.isFinished())
{
quit = true;
pending_files.clearAndFinish();
task_handle->deactivate();
}
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
if (quit)
if (pending_files.isFinished())
return;
std::lock_guard lock{mutex};
const auto & files = getFiles();
if (!files.empty())
{
processFiles(files);
/// Update counters.
getFiles();
}
if (!hasPendingFiles())
return;
processFiles();
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
if (!quit)
if (!pending_files.isFinished())
{
quit = true;
pending_files.clearAndFinish();
task_handle->deactivate();
}
@ -432,19 +436,21 @@ void StorageDistributedDirectoryMonitor::run()
std::lock_guard lock{mutex};
bool do_sleep = false;
while (!quit)
while (!pending_files.isFinished())
{
do_sleep = true;
const auto & files = getFiles();
if (files.empty())
if (!hasPendingFiles())
break;
if (!monitor_blocker.isCancelled())
{
try
{
do_sleep = !processFiles(files);
processFiles();
/// No errors while processing existing files.
/// Let's see maybe there are more files to process.
do_sleep = false;
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
@ -469,9 +475,7 @@ void StorageDistributedDirectoryMonitor::run()
}
}
else
{
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
@ -486,10 +490,7 @@ void StorageDistributedDirectoryMonitor::run()
break;
}
/// Update counters.
getFiles();
if (!quit && do_sleep)
if (!pending_files.isFinished() && do_sleep)
task_handle->scheduleAfter(sleep_time.count());
}
@ -567,41 +568,83 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
settings.distributed_replica_error_cap);
}
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
bool StorageDistributedDirectoryMonitor::hasPendingFiles() const
{
std::map<UInt64, std::string> files;
return fs::exists(current_batch_file_path) || !current_batch_file.empty() || !pending_files.empty();
}
void StorageDistributedDirectoryMonitor::initializeFilesFromDisk()
{
/// NOTE: This method does not requires to hold status_mutex, hence, no TSA
/// annotations in the header file.
fs::directory_iterator end;
for (fs::directory_iterator it{path}; it != end; ++it)
/// Initialize pending files
{
const auto & file_path_str = it->path();
if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
size_t bytes_count = 0;
for (fs::directory_iterator it{path}; it != end; ++it)
{
files[parse<UInt64>(fs::path(file_path_str).stem())] = file_path_str;
const auto & file_path = it->path();
const auto & base_name = file_path.stem().string();
if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse<UInt64>(base_name))
{
const std::string & file_path_str = file_path.string();
if (!pending_files.push(file_path_str))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
bytes_count += fs::file_size(file_path);
}
else if (base_name != "tmp" && base_name != "broken")
{
/// It is OK to log current_batch.txt here too (useful for debugging).
LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), path);
}
}
LOG_TRACE(log, "Files set to {}", pending_files.size());
LOG_TRACE(log, "Bytes set to {}", bytes_count);
metric_pending_files.changeTo(pending_files.size());
status.files_count = pending_files.size();
status.bytes_count = bytes_count;
}
return files;
/// Initialize broken files
{
size_t broken_bytes_count = 0;
size_t broken_files = 0;
for (fs::directory_iterator it{broken_path}; it != end; ++it)
{
const auto & file_path = it->path();
if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse<UInt64>(file_path.stem()))
broken_bytes_count += fs::file_size(file_path);
else
LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), broken_path);
}
LOG_TRACE(log, "Broken files set to {}", broken_files);
LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count);
metric_broken_files.changeTo(broken_files);
status.broken_files_count = broken_files;
status.broken_bytes_count = broken_bytes_count;
}
}
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)
void StorageDistributedDirectoryMonitor::processFiles()
{
if (should_batch_inserts)
{
processFilesWithBatching(files);
}
processFilesWithBatching();
else
{
for (const auto & file : files)
{
if (quit)
return true;
/// Process unprocessed file.
if (!current_batch_file.empty())
processFile(current_batch_file);
processFile(file.second);
}
while (pending_files.tryPop(current_batch_file))
processFile(current_batch_file);
}
return true;
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
@ -648,7 +691,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
thread_trace_context->root_span.addAttribute(std::current_exception());
e.addMessage(fmt::format("While sending {}", file_path));
maybeMarkAsBroken(file_path, e);
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
current_batch_file.clear();
}
throw;
}
catch (...)
@ -661,6 +708,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
markAsSend(file_path);
current_batch_file.clear();
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
}
@ -700,25 +748,19 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
struct StorageDistributedDirectoryMonitor::Batch
{
/// File indexes for this batch.
std::vector<UInt64> file_indices;
size_t total_rows = 0;
size_t total_bytes = 0;
bool recovered = false;
StorageDistributedDirectoryMonitor & parent;
/// Information about all available indexes (not only for the current batch).
const std::map<UInt64, String> & file_index_to_path;
std::vector<std::string> files;
bool split_batch_on_failure = true;
bool fsync = false;
bool dir_fsync = false;
Batch(
StorageDistributedDirectoryMonitor & parent_,
const std::map<UInt64, String> & file_index_to_path_)
explicit Batch(StorageDistributedDirectoryMonitor & parent_)
: parent(parent_)
, file_index_to_path(file_index_to_path_)
, split_batch_on_failure(parent.split_batch_on_failure)
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
, dir_fsync(parent.dir_fsync)
@ -733,7 +775,7 @@ struct StorageDistributedDirectoryMonitor::Batch
void send()
{
if (file_indices.empty())
if (files.empty())
return;
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
@ -776,7 +818,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
catch (const Exception & e)
{
if (split_batch_on_failure && file_indices.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles();
@ -796,49 +838,28 @@ struct StorageDistributedDirectoryMonitor::Batch
}
else
{
std::vector<std::string> files;
for (auto file_index_info : file_indices | boost::adaptors::indexed())
{
if (file_index_info.index() > 8)
{
files.push_back("...");
break;
}
auto file_index = file_index_info.value();
auto file_path = file_index_to_path.find(file_index);
if (file_path != file_index_to_path.end())
files.push_back(file_path->second);
else
files.push_back(fmt::format("#{}.bin (deleted)", file_index));
}
e.addMessage(fmt::format("While sending batch, size: {}, files: {}", file_indices.size(), fmt::join(files, "\n")));
e.addMessage(fmt::format("While sending a batch of {} files, files: {}", files.size(), fmt::join(files, "\n")));
throw;
}
}
if (!batch_broken)
{
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds());
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", files.size(), watch.elapsedMilliseconds());
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (UInt64 file_index : file_indices)
parent.markAsSend(file_index_to_path.at(file_index));
for (const auto & file : files)
parent.markAsSend(file);
}
else if (!batch_marked_as_broken)
{
LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
LOG_ERROR(parent.log, "Marking a batch of {} files as broken, files: {}", files.size(), fmt::join(files, "\n"));
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path != file_index_to_path.end())
parent.markAsBroken(file_path->second);
}
for (const auto & file : files)
parent.markAsBroken(file);
}
file_indices.clear();
files.clear();
total_rows = 0;
total_bytes = 0;
recovered = false;
@ -848,8 +869,11 @@ struct StorageDistributedDirectoryMonitor::Batch
void writeText(WriteBuffer & out)
{
for (UInt64 file_idx : file_indices)
out << file_idx << '\n';
for (const auto & file : files)
{
UInt64 file_index = parse<UInt64>(fs::path(file).stem());
out << file_index << '\n';
}
}
void readText(ReadBuffer & in)
@ -858,8 +882,9 @@ struct StorageDistributedDirectoryMonitor::Batch
{
UInt64 idx;
in >> idx >> "\n";
file_indices.push_back(idx);
files.push_back(fmt::format("{}/{}.bin", parent.path, idx));
}
recovered = true;
}
@ -871,14 +896,9 @@ private:
IConnectionPool::Entry connection;
for (UInt64 file_idx : file_indices)
for (const auto & file : files)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
"Failed to send batch: file with index {} is absent", file_idx);
ReadBufferFromFile in(file_path->second);
ReadBufferFromFile in(file);
const auto & distributed_header = readDistributedHeader(in, parent.log);
OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
@ -892,7 +912,7 @@ private:
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
file_indices.size(),
files.size(),
connection->getDescription(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@ -913,19 +933,11 @@ private:
{
size_t broken_files = 0;
for (UInt64 file_idx : file_indices)
for (const auto & file : files)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
{
LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
++broken_files;
continue;
}
try
{
ReadBufferFromFile in(file_path->second);
ReadBufferFromFile in(file);
const auto & distributed_header = readDistributedHeader(in, parent.log);
// this function is called in a separated thread, so we set up the trace context from the file
@ -947,9 +959,11 @@ private:
}
catch (Exception & e)
{
e.addMessage(fmt::format("While sending {}", file_path->second));
parent.maybeMarkAsBroken(file_path->second, e);
++broken_files;
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
parent.markAsBroken(file);
++broken_files;
}
}
}
@ -1029,13 +1043,18 @@ std::shared_ptr<ISource> StorageDistributedDirectoryMonitor::createSourceFromFil
return std::make_shared<DirectoryMonitorSource>(file_name);
}
bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms)
{
if (quit)
/// NOTE: It is better not to throw in this case, since the file is already
/// on disk (see DistributedSink), and it will be processed next time.
if (pending_files.isFinished())
return false;
if (!pending_files.push(file_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
{
std::lock_guard status_lock(status_mutex);
std::lock_guard lock(status_mutex);
metric_pending_files.add();
status.bytes_count += file_size;
++status.files_count;
@ -1051,33 +1070,25 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g
return current_status;
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
void StorageDistributedDirectoryMonitor::processFilesWithBatching()
{
std::unordered_set<UInt64> file_indices_to_skip;
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
if (fs::exists(current_batch_file_path))
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
Batch batch(*this, files);
Batch batch(*this);
ReadBufferFromFile in{current_batch_file_path};
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
fs::remove(current_batch_file_path);
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
for (const auto & file : files)
std::string file_path;
while (pending_files.tryPop(file_path))
{
if (quit)
return;
UInt64 file_idx = file.first;
const String & file_path = file.second;
if (file_indices_to_skip.contains(file_idx))
continue;
size_t total_rows = 0;
size_t total_bytes = 0;
Block header;
@ -1116,8 +1127,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
}
catch (const Exception & e)
{
if (maybeMarkAsBroken(file_path, e))
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
tryLogCurrentException(log, "File is marked broken due to");
continue;
}
@ -1131,9 +1143,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
std::move(distributed_header.client_info),
std::move(header)
);
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
Batch & batch = header_to_batch.try_emplace(batch_header, *this).first->second;
batch.file_indices.push_back(file_idx);
batch.files.push_back(file_path);
batch.total_rows += total_rows;
batch.total_bytes += total_bytes;
@ -1161,16 +1173,10 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path)
{
const auto last_path_separator_pos = file_path.rfind('/');
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
const String & broken_path = fs::path(base_path) / "broken/";
const String & broken_file_path = fs::path(broken_path) / file_name;
fs::create_directory(broken_path);
const String & broken_file_path = fs::path(broken_path) / fs::path(file_path).filename();
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, fs::path(relative_path) / "broken/");
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, broken_relative_path);
{
std::lock_guard status_lock(status_mutex);
@ -1204,21 +1210,9 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
fs::remove(file_path);
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)
{
/// Mark file as broken if necessary.
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
return true;
}
else
return false;
}
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
{
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor." + disk->getName();
}
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_relative_path)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <atomic>
@ -38,7 +39,8 @@ public:
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool);
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk);
~StorageDistributedDirectoryMonitor();
@ -53,7 +55,7 @@ public:
static std::shared_ptr<ISource> createSourceFromFile(const String & file_name);
/// For scheduling via DistributedSink.
bool addAndSchedule(size_t file_size, size_t ms);
bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
struct InternalStatus
{
@ -79,14 +81,15 @@ public:
private:
void run();
std::map<UInt64, std::string> getFiles();
bool processFiles(const std::map<UInt64, std::string> & files);
bool hasPendingFiles() const;
void initializeFilesFromDisk();
void processFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
void processFilesWithBatching();
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e);
std::string getLoggerName() const;
@ -96,25 +99,33 @@ private:
DiskPtr disk;
std::string relative_path;
std::string path;
std::string broken_relative_path;
std::string broken_path;
const bool should_batch_inserts = false;
const bool split_batch_on_failure = true;
const bool dir_fsync = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
String current_batch_file_path;
/// This is pending data (due to some error) for should_batch_inserts==true
std::string current_batch_file_path;
/// This is pending data (due to some error) for should_batch_inserts==false
std::string current_batch_file;
struct BatchHeader;
struct Batch;
std::mutex status_mutex;
InternalStatus status;
ConcurrentBoundedQueue<std::string> pending_files;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::atomic<bool> quit {false};
std::mutex mutex;
Poco::Logger * log;
ActionBlocker & monitor_blocker;

View File

@ -724,6 +724,9 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
return guard;
};
std::vector<std::string> bin_files;
bin_files.reserve(dir_names.size());
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
@ -802,8 +805,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
}
// Create hardlink here to reuse increment number
const std::string block_file_path(fs::path(path) / file_name);
createHardLink(first_file_tmp_path, block_file_path);
bin_files.push_back(fs::path(path) / file_name);
createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
}
++it;
@ -814,8 +817,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
const std::string path(fs::path(disk_path) / (data_path + *it));
fs::create_directory(path);
const std::string block_file_path(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
createHardLink(first_file_tmp_path, block_file_path);
bin_files.push_back(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
}
@ -826,10 +829,13 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
/// Notify
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (const auto & dir_name : dir_names)
for (size_t i = 0; i < dir_names.size(); ++i)
{
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
const auto & dir_name = dir_names[i];
const auto & bin_file = bin_files[i];
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
}
}

View File

@ -1208,12 +1208,15 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
const auto & dir_path = it->path();
if (std::filesystem::is_directory(dir_path))
{
/// Created by DistributedSink
const auto & tmp_path = dir_path / "tmp";
/// "tmp" created by DistributedSink
if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path))
std::filesystem::remove(tmp_path);
const auto & broken_path = dir_path / "broken";
if (std::filesystem::is_directory(broken_path) && std::filesystem::is_empty(broken_path))
std::filesystem::remove(broken_path);
if (std::filesystem::is_empty(dir_path))
{
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
@ -1222,14 +1225,14 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
}
else
{
requireDirectoryMonitor(disk, dir_path.filename().string());
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
}
}
}
}
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
@ -1243,7 +1246,8 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(
*this, disk, relative_data_path + name,
node_data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool());
getContext()->getDistributedSchedulePool(),
/* initialize_from_disk= */ startup);
}
return *node_data.directory_monitor;
}

View File

@ -166,7 +166,7 @@ private:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const DiskPtr & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)

View File

@ -1,7 +1,7 @@
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
1
1
2