Add broken_data_files/broken_data_compressed_bytes into distribution_queue

This commit is contained in:
Azat Khuzhin 2021-05-04 22:16:36 +03:00
parent 164cad403c
commit 74269882f7
6 changed files with 62 additions and 52 deletions

View File

@ -18,6 +18,10 @@ Columns:
- `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in local files, in bytes.
- `broken_data_files` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of files that has been marked as broken (due to an error).
- `broken_data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in broken files, in bytes.
- `last_exception` ([String](../../sql-reference/data-types/string.md)) — Text message about the last error that occurred (if any).
**Example**

View File

@ -368,20 +368,20 @@ void StorageDistributedDirectoryMonitor::run()
{
do_sleep = !processFiles(files);
std::lock_guard metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{};
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
do_sleep = true;
++error_count;
++status.error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))},
max_sleep_time);
tryLogCurrentException(getLoggerName().data());
last_exception = std::current_exception();
status.last_exception = std::current_exception();
}
}
else
@ -392,9 +392,9 @@ void StorageDistributedDirectoryMonitor::run()
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
error_count /= 2;
status.error_count /= 2;
last_decrease_time = now;
}
@ -502,16 +502,16 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
}
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
if (files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count);
if (bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count);
if (status.files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
if (status.bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
metric_pending_files.changeTo(files.size());
files_count = files.size();
bytes_count = new_bytes_count;
status.files_count = files.size();
status.bytes_count = new_bytes_count;
}
return files;
@ -828,10 +828,10 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
return false;
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.add();
bytes_count += file_size;
++files_count;
status.bytes_count += file_size;
++status.files_count;
}
return task_handle->scheduleAfter(ms, false);
@ -839,16 +839,9 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus()
{
std::lock_guard metrics_lock(metrics_mutex);
return Status{
path,
last_exception,
error_count,
files_count,
bytes_count,
monitor_blocker.isCancelled(),
};
std::lock_guard status_lock(status_mutex);
Status current_status{status, path, monitor_blocker.isCancelled()};
return current_status;
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
@ -977,11 +970,15 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
Poco::File file(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
size_t file_size = file.getSize();
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
++status.broken_files_count;
status.broken_bytes_count += file_size;
}
file.renameTo(broken_file_path);
@ -995,10 +992,10 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
size_t file_size = file.getSize();
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.sub();
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
}
file.remove();
@ -1027,7 +1024,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela
std::lock_guard lock{mutex};
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
relative_path = new_relative_path;
path = disk->getPath() + relative_path + '/';
}

View File

@ -50,15 +50,23 @@ public:
/// For scheduling via DistributedBlockOutputStream
bool addAndSchedule(size_t file_size, size_t ms);
struct InternalStatus
{
std::exception_ptr last_exception;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
size_t broken_files_count = 0;
size_t broken_bytes_count = 0;
};
/// system.distribution_queue interface
struct Status
struct Status : InternalStatus
{
std::string path;
std::exception_ptr last_exception;
size_t error_count;
size_t files_count;
size_t bytes_count;
bool is_blocked;
bool is_blocked = false;
};
Status getStatus();
@ -92,11 +100,8 @@ private:
struct BatchHeader;
struct Batch;
std::mutex metrics_mutex;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
std::exception_ptr last_exception;
std::mutex status_mutex;
InternalStatus status;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;

View File

@ -98,6 +98,8 @@ NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes()
{ "error_count", std::make_shared<DataTypeUInt64>() },
{ "data_files", std::make_shared<DataTypeUInt64>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "broken_data_files", std::make_shared<DataTypeUInt64>() },
{ "broken_data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "last_exception", std::make_shared<DataTypeString>() },
};
}
@ -181,6 +183,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont
res_columns[col_num++]->insert(status.error_count);
res_columns[col_num++]->insert(status.files_count);
res_columns[col_num++]->insert(status.bytes_count);
res_columns[col_num++]->insert(status.broken_files_count);
res_columns[col_num++]->insert(status.broken_bytes_count);
if (status.last_exception)
res_columns[col_num++]->insert(getExceptionMessage(status.last_exception, false));

View File

@ -1,6 +1,6 @@
INSERT
1 0 1 1
1 0 1 1 0 0
FLUSH
1 0 0 0
1 0 0 0 0 0
UNBLOCK
0 0 0 0
0 0 0 0 0 0

View File

@ -10,15 +10,15 @@ select * from system.distribution_queue;
select 'INSERT';
system stop distributed sends dist_01293;
insert into dist_01293 select * from numbers(10);
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes>100, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
system flush distributed dist_01293;
select 'FLUSH';
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select 'UNBLOCK';
system start distributed sends dist_01293;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
drop table null_01293;
drop table dist_01293;