Revert "Merge pull request #44922 from azat/dist/async-INSERT-metrics"

There are the following problems with this patch:
- Looses files on exception
- Existing current_batch.txt on startup leads to ENOENT error and hung
  of distributed sends without ATTACH/DETACH
- Race between creating the queue for sending at table startup and
  INSERT, if it had been created from INSERT, then it will not be
  initialized from disk

They were addressed in #45491, but it makes code more cmoplex and plus
since, likely, the release is comming, it is better to revert the
change.

This reverts commit 94604f71b7, reversing
changes made to 80f6a45376.
This commit is contained in:
Azat Khuzhin 2023-01-21 22:37:46 +01:00
parent f0fda580d0
commit a55798626a
6 changed files with 167 additions and 189 deletions

View File

@ -60,7 +60,6 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
}
@ -366,22 +365,18 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk)
BackgroundSchedulePool & bg_pool)
: storage(storage_)
, pool(std::move(pool_))
, disk(disk_)
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
, broken_relative_path(fs::path(relative_path) / "broken")
, broken_path(fs::path(path) / "broken" / "")
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
, pending_files(std::numeric_limits<size_t>::max())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
@ -390,11 +385,6 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{
fs::create_directory(broken_path);
if (initialize_from_disk)
initializeFilesFromDisk();
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
}
@ -402,29 +392,35 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
if (!pending_files.isFinished())
if (!quit)
{
pending_files.clearAndFinish();
quit = true;
task_handle->deactivate();
}
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
if (pending_files.isFinished())
if (quit)
return;
std::lock_guard lock{mutex};
if (!hasPendingFiles())
return;
processFiles();
const auto & files = getFiles();
if (!files.empty())
{
processFiles(files);
/// Update counters.
getFiles();
}
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
if (!pending_files.isFinished())
if (!quit)
{
pending_files.clearAndFinish();
quit = true;
task_handle->deactivate();
}
@ -438,21 +434,19 @@ void StorageDistributedDirectoryMonitor::run()
std::lock_guard lock{mutex};
bool do_sleep = false;
while (!pending_files.isFinished())
while (!quit)
{
do_sleep = true;
if (!hasPendingFiles())
const auto & files = getFiles();
if (files.empty())
break;
if (!monitor_blocker.isCancelled())
{
try
{
processFiles();
/// No errors while processing existing files.
/// Let's see maybe there are more files to process.
do_sleep = false;
do_sleep = !processFiles(files);
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
@ -476,7 +470,9 @@ void StorageDistributedDirectoryMonitor::run()
}
}
else
{
LOG_DEBUG(log, "Skipping send data over distributed table.");
}
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
@ -491,7 +487,10 @@ void StorageDistributedDirectoryMonitor::run()
break;
}
if (!pending_files.isFinished() && do_sleep)
/// Update counters.
getFiles();
if (!quit && do_sleep)
task_handle->scheduleAfter(sleep_time.count());
}
@ -569,83 +568,41 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
settings.distributed_replica_error_cap);
}
bool StorageDistributedDirectoryMonitor::hasPendingFiles() const
{
return fs::exists(current_batch_file_path) || !current_batch_file.empty() || !pending_files.empty();
}
void StorageDistributedDirectoryMonitor::initializeFilesFromDisk()
std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
{
/// NOTE: This method does not requires to hold status_mutex, hence, no TSA
/// annotations in the header file.
std::map<UInt64, std::string> files;
fs::directory_iterator end;
/// Initialize pending files
for (fs::directory_iterator it{path}; it != end; ++it)
{
size_t bytes_count = 0;
for (fs::directory_iterator it{path}; it != end; ++it)
const auto & file_path_str = it->path();
if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
{
const auto & file_path = it->path();
const auto & base_name = file_path.stem().string();
if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse<UInt64>(base_name))
{
const std::string & file_path_str = file_path.string();
if (!pending_files.push(file_path_str))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
bytes_count += fs::file_size(file_path);
}
else if (base_name != "tmp" && base_name != "broken")
{
/// It is OK to log current_batch.txt here too (useful for debugging).
LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), path);
}
files[parse<UInt64>(fs::path(file_path_str).stem())] = file_path_str;
}
LOG_TRACE(log, "Files set to {}", pending_files.size());
LOG_TRACE(log, "Bytes set to {}", bytes_count);
metric_pending_files.changeTo(pending_files.size());
status.files_count = pending_files.size();
status.bytes_count = bytes_count;
}
/// Initialize broken files
{
size_t broken_bytes_count = 0;
size_t broken_files = 0;
for (fs::directory_iterator it{broken_path}; it != end; ++it)
{
const auto & file_path = it->path();
if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse<UInt64>(file_path.stem()))
broken_bytes_count += fs::file_size(file_path);
else
LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), broken_path);
}
LOG_TRACE(log, "Broken files set to {}", broken_files);
LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count);
metric_broken_files.changeTo(broken_files);
status.broken_files_count = broken_files;
status.broken_bytes_count = broken_bytes_count;
}
return files;
}
void StorageDistributedDirectoryMonitor::processFiles()
bool StorageDistributedDirectoryMonitor::processFiles(const std::map<UInt64, std::string> & files)
{
if (should_batch_inserts)
processFilesWithBatching();
{
processFilesWithBatching(files);
}
else
{
/// Process unprocessed file.
if (!current_batch_file.empty())
processFile(current_batch_file);
for (const auto & file : files)
{
if (quit)
return true;
while (pending_files.tryPop(current_batch_file))
processFile(current_batch_file);
processFile(file.second);
}
}
return true;
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
@ -692,11 +649,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
thread_trace_context->root_span.addAttribute(std::current_exception());
e.addMessage(fmt::format("While sending {}", file_path));
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
current_batch_file.clear();
}
maybeMarkAsBroken(file_path, e);
throw;
}
catch (...)
@ -709,7 +662,6 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
markAsSend(file_path);
current_batch_file.clear();
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
}
@ -749,19 +701,23 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
struct StorageDistributedDirectoryMonitor::Batch
{
std::vector<UInt64> file_indices;
size_t total_rows = 0;
size_t total_bytes = 0;
bool recovered = false;
StorageDistributedDirectoryMonitor & parent;
std::vector<std::string> files;
const std::map<UInt64, String> & file_index_to_path;
bool split_batch_on_failure = true;
bool fsync = false;
bool dir_fsync = false;
explicit Batch(StorageDistributedDirectoryMonitor & parent_)
Batch(
StorageDistributedDirectoryMonitor & parent_,
const std::map<UInt64, String> & file_index_to_path_)
: parent(parent_)
, file_index_to_path(file_index_to_path_)
, split_batch_on_failure(parent.split_batch_on_failure)
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
, dir_fsync(parent.dir_fsync)
@ -776,7 +732,7 @@ struct StorageDistributedDirectoryMonitor::Batch
void send()
{
if (files.empty())
if (file_indices.empty())
return;
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
@ -819,7 +775,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
catch (const Exception & e)
{
if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
if (split_batch_on_failure && file_indices.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles();
@ -839,28 +795,44 @@ struct StorageDistributedDirectoryMonitor::Batch
}
else
{
e.addMessage(fmt::format("While sending a batch of {} files, files: {}", files.size(), fmt::join(files, "\n")));
std::vector<std::string> files;
for (const auto && file_info : file_index_to_path | boost::adaptors::indexed())
{
if (file_info.index() > 8)
{
files.push_back("...");
break;
}
files.push_back(file_info.value().second);
}
e.addMessage(fmt::format("While sending batch, nums: {}, files: {}", file_index_to_path.size(), fmt::join(files, "\n")));
throw;
}
}
if (!batch_broken)
{
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", files.size(), watch.elapsedMilliseconds());
LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds());
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (const auto & file : files)
parent.markAsSend(file);
for (UInt64 file_index : file_indices)
parent.markAsSend(file_index_to_path.at(file_index));
}
else if (!batch_marked_as_broken)
{
LOG_ERROR(parent.log, "Marking a batch of {} files as broken, files: {}", files.size(), fmt::join(files, "\n"));
LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
for (const auto & file : files)
parent.markAsBroken(file);
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path != file_index_to_path.end())
parent.markAsBroken(file_path->second);
}
}
files.clear();
file_indices.clear();
total_rows = 0;
total_bytes = 0;
recovered = false;
@ -870,11 +842,8 @@ struct StorageDistributedDirectoryMonitor::Batch
void writeText(WriteBuffer & out)
{
for (const auto & file : files)
{
UInt64 file_index = parse<UInt64>(fs::path(file).stem());
out << file_index << '\n';
}
for (UInt64 file_idx : file_indices)
out << file_idx << '\n';
}
void readText(ReadBuffer & in)
@ -883,9 +852,8 @@ struct StorageDistributedDirectoryMonitor::Batch
{
UInt64 idx;
in >> idx >> "\n";
files.push_back(fmt::format("{}/{}.bin", parent.path, idx));
file_indices.push_back(idx);
}
recovered = true;
}
@ -897,9 +865,14 @@ private:
IConnectionPool::Entry connection;
for (const auto & file : files)
for (UInt64 file_idx : file_indices)
{
ReadBufferFromFile in(file);
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
"Failed to send batch: file with index {} is absent", file_idx);
ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log);
OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
@ -913,7 +886,7 @@ private:
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
files.size(),
file_indices.size(),
connection->getDescription(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@ -934,11 +907,19 @@ private:
{
size_t broken_files = 0;
for (const auto & file : files)
for (UInt64 file_idx : file_indices)
{
auto file_path = file_index_to_path.find(file_idx);
if (file_path == file_index_to_path.end())
{
LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
++broken_files;
continue;
}
try
{
ReadBufferFromFile in(file);
ReadBufferFromFile in(file_path->second);
const auto & distributed_header = readDistributedHeader(in, parent.log);
// this function is called in a separated thread, so we set up the trace context from the file
@ -960,11 +941,9 @@ private:
}
catch (Exception & e)
{
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
parent.markAsBroken(file);
++broken_files;
}
e.addMessage(fmt::format("While sending {}", file_path->second));
parent.maybeMarkAsBroken(file_path->second, e);
++broken_files;
}
}
@ -1044,18 +1023,13 @@ std::shared_ptr<ISource> StorageDistributedDirectoryMonitor::createSourceFromFil
return std::make_shared<DirectoryMonitorSource>(file_name);
}
bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms)
bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
{
/// NOTE: It is better not to throw in this case, since the file is already
/// on disk (see DistributedSink), and it will be processed next time.
if (pending_files.isFinished())
if (quit)
return false;
if (!pending_files.push(file_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
{
std::lock_guard lock(status_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.add();
status.bytes_count += file_size;
++status.files_count;
@ -1071,25 +1045,33 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g
return current_status;
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching()
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
{
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
std::unordered_set<UInt64> file_indices_to_skip;
if (fs::exists(current_batch_file_path))
{
Batch batch(*this);
/// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
Batch batch(*this, files);
ReadBufferFromFile in{current_batch_file_path};
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
fs::remove(current_batch_file_path);
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
std::string file_path;
while (pending_files.tryPop(file_path))
for (const auto & file : files)
{
if (quit)
return;
UInt64 file_idx = file.first;
const String & file_path = file.second;
if (file_indices_to_skip.contains(file_idx))
continue;
size_t total_rows = 0;
size_t total_bytes = 0;
Block header;
@ -1128,9 +1110,8 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching()
}
catch (const Exception & e)
{
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
if (maybeMarkAsBroken(file_path, e))
{
markAsBroken(file_path);
tryLogCurrentException(log, "File is marked broken due to");
continue;
}
@ -1144,9 +1125,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching()
std::move(distributed_header.client_info),
std::move(header)
);
Batch & batch = header_to_batch.try_emplace(batch_header, *this).first->second;
Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
batch.files.push_back(file_path);
batch.file_indices.push_back(file_idx);
batch.total_rows += total_rows;
batch.total_bytes += total_bytes;
@ -1174,10 +1155,16 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching()
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path)
{
const String & broken_file_path = fs::path(broken_path) / fs::path(file_path).filename();
const auto last_path_separator_pos = file_path.rfind('/');
const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
const auto & file_name = file_path.substr(last_path_separator_pos + 1);
const String & broken_path = fs::path(base_path) / "broken/";
const String & broken_file_path = fs::path(broken_path) / file_name;
fs::create_directory(broken_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, broken_relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, fs::path(relative_path) / "broken/");
{
std::lock_guard status_lock(status_mutex);
@ -1211,9 +1198,21 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
fs::remove(file_path);
}
bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)
{
/// Mark file as broken if necessary.
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
markAsBroken(file_path);
return true;
}
else
return false;
}
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
{
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor." + disk->getName();
return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
}
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_relative_path)

View File

@ -1,7 +1,6 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Client/ConnectionPool.h>
#include <atomic>
@ -39,8 +38,7 @@ public:
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
BackgroundSchedulePool & bg_pool,
bool initialize_from_disk);
BackgroundSchedulePool & bg_pool);
~StorageDistributedDirectoryMonitor();
@ -55,7 +53,7 @@ public:
static std::shared_ptr<ISource> createSourceFromFile(const String & file_name);
/// For scheduling via DistributedSink.
bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
bool addAndSchedule(size_t file_size, size_t ms);
struct InternalStatus
{
@ -80,15 +78,14 @@ public:
private:
void run();
bool hasPendingFiles() const;
void initializeFilesFromDisk();
void processFiles();
std::map<UInt64, std::string> getFiles();
bool processFiles(const std::map<UInt64, std::string> & files);
void processFile(const std::string & file_path);
void processFilesWithBatching();
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);
bool maybeMarkAsBroken(const std::string & file_path, const Exception & e);
std::string getLoggerName() const;
@ -98,33 +95,25 @@ private:
DiskPtr disk;
std::string relative_path;
std::string path;
std::string broken_relative_path;
std::string broken_path;
const bool should_batch_inserts = false;
const bool split_batch_on_failure = true;
const bool dir_fsync = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
/// This is pending data (due to some error) for should_batch_inserts==true
std::string current_batch_file_path;
/// This is pending data (due to some error) for should_batch_inserts==false
std::string current_batch_file;
String current_batch_file_path;
struct BatchHeader;
struct Batch;
std::mutex status_mutex;
InternalStatus status;
ConcurrentBoundedQueue<std::string> pending_files;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point<std::chrono::system_clock> last_decrease_time {std::chrono::system_clock::now()};
std::atomic<bool> quit {false};
std::mutex mutex;
Poco::Logger * log;
ActionBlocker & monitor_blocker;

View File

@ -724,9 +724,6 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
return guard;
};
std::vector<std::string> bin_files;
bin_files.reserve(dir_names.size());
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
@ -805,8 +802,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
}
// Create hardlink here to reuse increment number
bin_files.push_back(fs::path(path) / file_name);
createHardLink(first_file_tmp_path, bin_files.back());
const std::string block_file_path(fs::path(path) / file_name);
createHardLink(first_file_tmp_path, block_file_path);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
++it;
@ -817,8 +814,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
const std::string path(fs::path(disk_path) / (data_path + *it));
fs::create_directory(path);
bin_files.push_back(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
createHardLink(first_file_tmp_path, bin_files.back());
const std::string block_file_path(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
createHardLink(first_file_tmp_path, block_file_path);
auto dir_sync_guard = make_directory_sync_guard(*it);
}
@ -829,13 +826,10 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
/// Notify
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
for (size_t i = 0; i < dir_names.size(); ++i)
for (const auto & dir_name : dir_names)
{
const auto & dir_name = dir_names[i];
const auto & bin_file = bin_files[i];
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name);
directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
}
}

View File

@ -1204,15 +1204,12 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
const auto & dir_path = it->path();
if (std::filesystem::is_directory(dir_path))
{
/// Created by DistributedSink
const auto & tmp_path = dir_path / "tmp";
/// "tmp" created by DistributedSink
if (std::filesystem::is_directory(tmp_path) && std::filesystem::is_empty(tmp_path))
std::filesystem::remove(tmp_path);
const auto & broken_path = dir_path / "broken";
if (std::filesystem::is_directory(broken_path) && std::filesystem::is_empty(broken_path))
std::filesystem::remove(broken_path);
if (std::filesystem::is_empty(dir_path))
{
LOG_DEBUG(log, "Removing {} (used for async INSERT into Distributed)", dir_path.string());
@ -1221,14 +1218,14 @@ void StorageDistributed::createDirectoryMonitors(const DiskPtr & disk)
}
else
{
requireDirectoryMonitor(disk, dir_path.filename().string(), /* startup= */ true);
requireDirectoryMonitor(disk, dir_path.filename().string());
}
}
}
}
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup)
StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(const DiskPtr & disk, const std::string & name)
{
const std::string & disk_path = disk->getPath();
const std::string key(disk_path + name);
@ -1242,8 +1239,7 @@ StorageDistributedDirectoryMonitor& StorageDistributed::requireDirectoryMonitor(
*this, disk, relative_data_path + name,
node_data.connection_pool,
monitors_blocker,
getContext()->getDistributedSchedulePool(),
/* initialize_from_disk= */ startup);
getContext()->getDistributedSchedulePool());
}
return *node_data.directory_monitor;
}

View File

@ -166,7 +166,7 @@ private:
/// create directory monitors for each existing subdirectory
void createDirectoryMonitors(const DiskPtr & disk);
/// ensure directory monitor thread and connectoin pool creation by disk and subdirectory name
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name, bool startup);
StorageDistributedDirectoryMonitor & requireDirectoryMonitor(const DiskPtr & disk, const std::string & name);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)

View File

@ -1,7 +1,7 @@
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> DistributedSink: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 1)), implicit conversion will be done.
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor.default: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
<Warning> default.dist_01683.DirectoryMonitor: Structure does not match (remote: n Int8 Int8(size = 0), local: n UInt64 UInt64(size = 0)), implicit conversion will be done
1
1
2