Merge branch 'master' into normalize-bigint

This commit is contained in:
Alexey Milovidov 2021-05-05 15:01:23 +03:00
commit 35aba776e5
16 changed files with 114 additions and 151 deletions

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit cf11d0aa36d4738f2c9bf4377807661660f1be76
Subproject commit 43491d33ca2826531d1e3cae70d4bf1e5249e3c9

View File

@ -430,7 +430,7 @@ Keys for syslog:
Default value: `LOG_USER` if `address` is specified, `LOG_DAEMON` otherwise.
- format Message format. Possible values: `bsd` and `syslog.`
## send_crash_reports {#server_configuration_parameters-logger}
## send_crash_reports {#server_configuration_parameters-send_crash_reports}
Settings for opt-in sending crash reports to the ClickHouse core developers team via [Sentry](https://sentry.io).
Enabling it, especially in pre-production environments, is highly appreciated.

View File

@ -18,6 +18,10 @@ Columns:
- `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in local files, in bytes.
- `broken_data_files` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of files that has been marked as broken (due to an error).
- `broken_data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in broken files, in bytes.
- `last_exception` ([String](../../sql-reference/data-types/string.md)) — Text message about the last error that occurred (if any).
**Example**

View File

@ -415,7 +415,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
Значения по умолчанию: при указанном `address` - `LOG_USER`, иначе - `LOG_DAEMON`
- format - формат сообщений. Возможные значения - `bsd` и `syslog`
## send_crash_reports {#server_configuration_parameters-logger}
## send_crash_reports {#server_configuration_parameters-send_crash_reports}
Настройки для отправки сообщений о сбоях в команду разработчиков ядра ClickHouse через [Sentry](https://sentry.io).
Включение этих настроек, особенно в pre-production среде, может дать очень ценную информацию и поможет развитию ClickHouse.

View File

@ -23,19 +23,9 @@ function _complete_for_clickhouse_entrypoint_bin()
fi
util="${words[1]}"
case "$prev" in
-C|--config-file|--config)
return
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return
;;
esac
if _complete_for_clickhouse_generic_bin_impl "$prev"; then
COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd" "$util")" -- "$cur") )
fi
return 0
}

View File

@ -15,6 +15,13 @@ shopt -s extglob
export _CLICKHOUSE_COMPLETION_LOADED=1
CLICKHOUSE_QueryProcessingStage=(
complete
fetch_columns
with_mergeable_state
with_mergeable_state_after_aggregation
)
function _clickhouse_bin_exist()
{ [ -x "$1" ] || command -v "$1" >& /dev/null; }
@ -30,6 +37,33 @@ function _clickhouse_get_options()
"$@" --help 2>&1 | awk -F '[ ,=<>]' '{ for (i=1; i <= NF; ++i) { if (substr($i, 0, 1) == "-" && length($i) > 1) print $i; } }' | sort -u
}
function _complete_for_clickhouse_generic_bin_impl()
{
local prev=$1 && shift
case "$prev" in
-C|--config-file|--config)
return 1
;;
--stage)
COMPREPLY=( $(compgen -W "${CLICKHOUSE_QueryProcessingStage[*]}" -- "$cur") )
return 1
;;
--host)
COMPREPLY=( $(compgen -A hostname -- "$cur") )
return 1
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return 1
;;
esac
return 0
}
function _complete_for_clickhouse_generic_bin()
{
local cur prev
@ -39,19 +73,9 @@ function _complete_for_clickhouse_generic_bin()
COMPREPLY=()
_get_comp_words_by_ref cur prev
case "$prev" in
-C|--config-file|--config)
return
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return
;;
esac
if _complete_for_clickhouse_generic_bin_impl "$prev"; then
COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd")" -- "$cur") )
fi
return 0
}

View File

@ -55,6 +55,7 @@
M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \
M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \

View File

@ -14,6 +14,7 @@ SRCS(
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
DiskRestartProxy.cpp
DiskSelector.cpp
IDisk.cpp
IVolume.cpp

View File

@ -47,6 +47,7 @@ SRCS(
ReadBufferAIO.cpp
ReadBufferFromFile.cpp
ReadBufferFromFileBase.cpp
ReadBufferFromFileDecorator.cpp
ReadBufferFromFileDescriptor.cpp
ReadBufferFromIStream.cpp
ReadBufferFromMemory.cpp
@ -57,6 +58,7 @@ SRCS(
UseSSL.cpp
WriteBufferFromFile.cpp
WriteBufferFromFileBase.cpp
WriteBufferFromFileDecorator.cpp
WriteBufferFromFileDescriptor.cpp
WriteBufferFromFileDescriptorDiscardOnFailure.cpp
WriteBufferFromHTTP.cpp

View File

@ -36,6 +36,7 @@ namespace CurrentMetrics
{
extern const Metric DistributedSend;
extern const Metric DistributedFilesToInsert;
extern const Metric BrokenDistributedFilesToInsert;
}
namespace DB
@ -304,6 +305,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
@ -368,20 +370,20 @@ void StorageDistributedDirectoryMonitor::run()
{
do_sleep = !processFiles(files);
std::lock_guard metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{};
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
do_sleep = true;
++error_count;
++status.error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))},
max_sleep_time);
tryLogCurrentException(getLoggerName().data());
last_exception = std::current_exception();
status.last_exception = std::current_exception();
}
}
else
@ -392,9 +394,9 @@ void StorageDistributedDirectoryMonitor::run()
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
error_count /= 2;
status.error_count /= 2;
last_decrease_time = now;
}
@ -502,16 +504,16 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
}
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
if (files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count);
if (bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count);
if (status.files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
if (status.bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
metric_pending_files.changeTo(files.size());
files_count = files.size();
bytes_count = new_bytes_count;
status.files_count = files.size();
status.bytes_count = new_bytes_count;
}
return files;
@ -828,10 +830,10 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
return false;
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.add();
bytes_count += file_size;
++files_count;
status.bytes_count += file_size;
++status.files_count;
}
return task_handle->scheduleAfter(ms, false);
@ -839,16 +841,9 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus()
{
std::lock_guard metrics_lock(metrics_mutex);
return Status{
path,
last_exception,
error_count,
files_count,
bytes_count,
monitor_blocker.isCancelled(),
};
std::lock_guard status_lock(status_mutex);
Status current_status{status, path, monitor_blocker.isCancelled()};
return current_status;
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
@ -977,11 +972,17 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
Poco::File file(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
size_t file_size = file.getSize();
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
++status.broken_files_count;
status.broken_bytes_count += file_size;
metric_broken_files.add();
}
file.renameTo(broken_file_path);
@ -995,10 +996,10 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
size_t file_size = file.getSize();
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.sub();
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
}
file.remove();
@ -1027,7 +1028,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela
std::lock_guard lock{mutex};
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
relative_path = new_relative_path;
path = disk->getPath() + relative_path + '/';
}

View File

@ -50,15 +50,23 @@ public:
/// For scheduling via DistributedBlockOutputStream
bool addAndSchedule(size_t file_size, size_t ms);
struct InternalStatus
{
std::exception_ptr last_exception;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
size_t broken_files_count = 0;
size_t broken_bytes_count = 0;
};
/// system.distribution_queue interface
struct Status
struct Status : InternalStatus
{
std::string path;
std::exception_ptr last_exception;
size_t error_count;
size_t files_count;
size_t bytes_count;
bool is_blocked;
bool is_blocked = false;
};
Status getStatus();
@ -92,11 +100,8 @@ private:
struct BatchHeader;
struct Batch;
std::mutex metrics_mutex;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
std::exception_ptr last_exception;
std::mutex status_mutex;
InternalStatus status;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
@ -110,6 +115,7 @@ private:
BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_files;
CurrentMetrics::Increment metric_broken_files;
friend class DirectoryMonitorBlockInputStream;
};

View File

@ -98,6 +98,8 @@ NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes()
{ "error_count", std::make_shared<DataTypeUInt64>() },
{ "data_files", std::make_shared<DataTypeUInt64>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "broken_data_files", std::make_shared<DataTypeUInt64>() },
{ "broken_data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "last_exception", std::make_shared<DataTypeString>() },
};
}
@ -181,6 +183,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont
res_columns[col_num++]->insert(status.error_count);
res_columns[col_num++]->insert(status.files_count);
res_columns[col_num++]->insert(status.bytes_count);
res_columns[col_num++]->insert(status.broken_files_count);
res_columns[col_num++]->insert(status.broken_bytes_count);
if (status.last_exception)
res_columns[col_num++]->insert(getExceptionMessage(status.last_exception, false));

View File

@ -376,74 +376,6 @@ def test_in_memory(start_cluster):
"Wide\t1\n")
def test_in_memory_wal(start_cluster):
# Merges are disabled in config
for i in range(5):
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
def check(node, rows, parts):
node.query("SELECT count() FROM wal_table") == "{}\n".format(rows)
node.query(
"SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == "{}\n".format(
parts)
check(node11, 250, 5)
check(node12, 250, 5)
# WAL works at inserts
node11.restart_clickhouse(kill=True)
check(node11, 250, 5)
# WAL works at fetches
node12.restart_clickhouse(kill=True)
check(node12, 250, 5)
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
# Disable replication
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
check(node11, 300, 6)
wal_file = "/var/lib/clickhouse/data/default/wal_table/wal.bin"
# Corrupt wal file
# Truncate it to it's size minus 10 bytes
node11.exec_in_container(['bash', '-c', 'truncate --size="$(($(stat -c "%s" {}) - 10))" {}'.format(wal_file, wal_file)],
privileged=True, user='root')
node11.restart_clickhouse(kill=True)
# Broken part is lost, but other restored successfully
check(node11, 250, 5)
# WAL with blocks from 0 to 4
broken_wal_file = "/var/lib/clickhouse/data/default/wal_table/wal_0_4.bin"
# Check file exists
node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)])
# Fetch lost part from replica
node11.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
check(node11, 300, 6)
# Check that new data is written to new wal, but old is still exists for restoring
# Check file not empty
node11.exec_in_container(['bash', '-c', 'test -s {}'.format(wal_file)])
# Check file exists
node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)])
# Data is lost without WAL
node11.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
insert_random_data('wal_table', node11, 50)
check(node11, 350, 7)
node11.restart_clickhouse(kill=True)
check(node11, 300, 6)
def test_in_memory_wal_rotate(start_cluster):
# Write every part to single wal
node11.query("ALTER TABLE restore_table MODIFY SETTING write_ahead_log_max_bytes = 10")

View File

@ -2467,8 +2467,6 @@ def test_kafka_issue14202(kafka_cluster):
kafka_format = 'JSONEachRow';
''')
time.sleep(3)
instance.query(
'INSERT INTO test.kafka_q SELECT t, some_string FROM ( SELECT dt AS t, some_string FROM test.empty_table )')
# check instance is alive

View File

@ -1,6 +1,6 @@
INSERT
1 0 1 1
1 0 1 1 0 0
FLUSH
1 0 0 0
1 0 0 0 0 0
UNBLOCK
0 0 0 0
0 0 0 0 0 0

View File

@ -10,15 +10,15 @@ select * from system.distribution_queue;
select 'INSERT';
system stop distributed sends dist_01293;
insert into dist_01293 select * from numbers(10);
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes>100, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
system flush distributed dist_01293;
select 'FLUSH';
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select 'UNBLOCK';
system start distributed sends dist_01293;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
drop table null_01293;
drop table dist_01293;