Merge branch 'master' into patch-2

This commit is contained in:
Alexey Milovidov 2023-03-15 02:03:52 +03:00 committed by GitHub
commit f4f7e110b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 177 additions and 99 deletions

View File

@ -750,12 +750,9 @@ bool BackupCoordinationRemote::hasConcurrentBackups(const std::atomic<size_t> &)
if (existing_backup_uuid == toString(backup_uuid))
continue;
String status;
if (zk->tryGet(root_zookeeper_path + "/" + existing_backup_path + "/stage", status))
{
if (status != Stage::COMPLETED)
return true;
}
const auto status = zk->get(root_zookeeper_path + "/" + existing_backup_path + "/stage");
if (status != Stage::COMPLETED)
return true;
}
zk->createIfNotExists(backup_stage_path, "");

View File

@ -800,6 +800,7 @@ class IColumn;
M(Bool, input_format_tsv_detect_header, true, "Automatically detect header with names and types in TSV format", 0) \
M(Bool, input_format_custom_detect_header, true, "Automatically detect header with names and types in CustomSeparated format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \

View File

@ -117,6 +117,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;

View File

@ -211,6 +211,7 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
} parquet;

View File

@ -53,6 +53,10 @@ struct ToDateImpl
{
static constexpr auto name = "toDate";
static inline UInt16 execute(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return static_cast<UInt16>(time_zone.toDayNum(t.whole));
}
static inline UInt16 execute(Int64 t, const DateLUTImpl & time_zone)
{
return UInt16(time_zone.toDayNum(t));
@ -69,6 +73,10 @@ struct ToDateImpl
{
return d;
}
static inline DecimalUtils::DecimalComponents<DateTime64> executeExtendedResult(const DecimalUtils::DecimalComponents<DateTime64> & t, const DateLUTImpl & time_zone)
{
return {time_zone.toDayNum(t.whole), 0};
}
using FactorTransform = ZeroTransform;
};

View File

@ -46,36 +46,57 @@ public:
{
if constexpr (std::is_same_v<typename Transform::FactorTransform, ZeroTransform>)
return { .is_monotonic = true, .is_always_monotonic = true };
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(&type))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(&type))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
const IFunction::Monotonicity is_monotonic = { .is_monotonic = true };
const IFunction::Monotonicity is_not_monotonic;
const DateLUTImpl * date_lut = &DateLUT::instance();
if (const auto * timezone = dynamic_cast<const TimezoneMixin *>(&type))
date_lut = &timezone->getTimeZone();
if (left.isNull() || right.isNull())
return is_not_monotonic;
const auto * type_ptr = &type;
if (const auto * nullable_type = checkAndGetDataType<DataTypeNullable>(type_ptr))
type_ptr = nullable_type->getNestedType().get();
/// The function is monotonous on the [left, right] segment, if the factor transformation returns the same values for them.
if (checkAndGetDataType<DataTypeDate>(type_ptr))
{
return Transform::FactorTransform::execute(UInt16(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt16(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDate32>(type_ptr))
{
return Transform::FactorTransform::execute(Int32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(Int32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else if (checkAndGetDataType<DataTypeDateTime>(type_ptr))
{
return Transform::FactorTransform::execute(UInt32(left.get<UInt64>()), *date_lut)
== Transform::FactorTransform::execute(UInt32(right.get<UInt64>()), *date_lut)
? is_monotonic : is_not_monotonic;
}
else
{
assert(checkAndGetDataType<DataTypeDateTime64>(type_ptr));
const auto & left_date_time = left.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_left(left_date_time.getScale());
const auto & right_date_time = right.get<DateTime64>();
TransformDateTime64<typename Transform::FactorTransform> transformer_right(right_date_time.getScale());
return transformer_left.execute(left_date_time.getValue(), *date_lut)
== transformer_right.execute(right_date_time.getValue(), *date_lut)
? is_monotonic : is_not_monotonic;
}
}
}

View File

@ -20,5 +20,6 @@ using FunctionPositionCaseInsensitive = FunctionsStringSearch<PositionImpl<NameP
REGISTER_FUNCTION(PositionCaseInsensitive)
{
factory.registerFunction<FunctionPositionCaseInsensitive>();
factory.registerAlias("instr", NamePositionCaseInsensitive::name, FunctionFactory::CaseInsensitive);
}
}

View File

@ -1442,7 +1442,8 @@ void Aggregator::prepareAggregateInstructions(
aggregate_columns[i][j] = materialized_columns.back().get();
/// Sparse columns without defaults may be handled incorrectly.
if (aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
if (aggregate_columns[i][j]->isSparse()
&& aggregate_columns[i][j]->getNumberOfDefaultRows() == 0)
allow_sparse_arguments = false;
auto full_column = allow_sparse_arguments

View File

@ -93,7 +93,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
@ -159,8 +159,8 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
arrow::FixedSizeBinaryArray & chunk = dynamic_cast<arrow::FixedSizeBinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.values();
column_chars_t.insert_assume_reserved(buffer->data(), buffer->data() + buffer->size());
const uint8_t * raw_data = chunk.raw_values();
column_chars_t.insert_assume_reserved(raw_data, raw_data + fixed_len * chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
}
@ -178,9 +178,6 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
if (chunk.length() == 0)
continue;
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
column_data.emplace_back(chunk.Value(bool_i));
}
@ -402,7 +399,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr<arrow
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
/// Check that indexes are correct (protection against corrupted files)
/// Note that on null values index can be arbitrary value.
@ -554,8 +551,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
{
auto & chunk = dynamic_cast<arrow::BinaryArray &>(*(arrow_column->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const auto * raw_data = reinterpret_cast<const IPv6 *>(buffer->data());
const auto * raw_data = reinterpret_cast<const IPv6 *>(chunk.raw_data() + chunk.raw_value_offsets()[0]);
data.insert_assume_reserved(raw_data, raw_data + chunk.length());
}
return {std::move(internal_column), std::move(internal_type), column_name};

View File

@ -45,38 +45,42 @@ Chunk ParquetBlockInputFormat::generate()
block_missing_values.clear();
if (!file_reader)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}
if (is_stopped)
return {};
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
++row_group_current;
if (row_group_current >= row_group_total)
return res;
std::shared_ptr<arrow::Table> table;
std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);
if (!get_batch_reader_status.ok())
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
get_batch_reader_status.ToString());
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
}
else
{
return {};
}
arrow::Status read_status = rbr->ReadAll(&table);
if (!read_status.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
++row_group_current;
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, table->num_rows(), block_missing_values_ptr);
return res;
}
@ -85,6 +89,7 @@ void ParquetBlockInputFormat::resetParser()
IInputFormat::resetParser();
file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();

View File

@ -205,16 +205,10 @@ void DistributedAsyncInsertDirectoryQueue::run()
/// No errors while processing existing files.
/// Let's see maybe there are more files to process.
do_sleep = false;
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard status_lock(status_mutex);
do_sleep = true;
++status.error_count;
tryLogCurrentException(getLoggerName().data());
UInt64 q = doubleToUInt64(std::exp2(status.error_count));
std::chrono::milliseconds new_sleep_time(default_sleep_time.count() * q);
@ -223,9 +217,7 @@ void DistributedAsyncInsertDirectoryQueue::run()
else
sleep_time = std::min(new_sleep_time, max_sleep_time);
tryLogCurrentException(getLoggerName().data());
status.last_exception = std::current_exception();
status.last_exception_time = std::chrono::system_clock::now();
do_sleep = true;
}
}
else
@ -393,6 +385,7 @@ void DistributedAsyncInsertDirectoryQueue::initializeFilesFromDisk()
}
}
void DistributedAsyncInsertDirectoryQueue::processFiles()
try
{
if (should_batch_inserts)
processFilesWithBatching();
@ -405,6 +398,19 @@ void DistributedAsyncInsertDirectoryQueue::processFiles()
while (pending_files.tryPop(current_file))
processFile(current_file);
}
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard status_lock(status_mutex);
++status.error_count;
status.last_exception = std::current_exception();
status.last_exception_time = std::chrono::system_clock::now();
throw;
}
void DistributedAsyncInsertDirectoryQueue::processFile(const std::string & file_path)

View File

@ -1946,6 +1946,8 @@ def reportCoverage(args):
def reportLogStats(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
query = """
WITH
120 AS mins,

View File

@ -9,8 +9,7 @@ from helpers.test_tools import TSV, assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
num_nodes = 4
ddl_task_timeout = 640
num_nodes = 10
def generate_cluster_def():
@ -86,7 +85,7 @@ def drop_after_test():
node0.query(
"DROP TABLE IF EXISTS tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
@ -108,7 +107,7 @@ def create_and_fill_table():
"ORDER BY x"
)
for i in range(num_nodes):
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(80000000)")
nodes[i].query(f"INSERT INTO tbl SELECT number FROM numbers(40000000)")
# All the tests have concurrent backup/restores with same backup names
@ -146,7 +145,7 @@ def test_concurrent_backups_on_same_node():
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
nodes[0].query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
@ -203,7 +202,7 @@ def test_concurrent_restores_on_same_node():
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
restore_id = (
@ -227,44 +226,44 @@ def test_concurrent_restores_on_different_node():
backup_name = new_backup_name()
id = (
nodes[1]
nodes[0]
.query(f"BACKUP TABLE tbl ON CLUSTER 'cluster' TO {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'CREATING_BACKUP' AND id = '{id}'",
"CREATING_BACKUP",
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'BACKUP_CREATED' AND id = '{id}'",
"BACKUP_CREATED",
)
nodes[1].query(
nodes[0].query(
f"DROP TABLE tbl ON CLUSTER 'cluster' NO DELAY",
settings={
"distributed_ddl_task_timeout": ddl_task_timeout,
"distributed_ddl_task_timeout": 360,
},
)
restore_id = (
nodes[1]
nodes[0]
.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name} ASYNC")
.split("\t")[0]
)
assert_eq_with_retry(
nodes[1],
f"SELECT status FROM system.backups WHERE status == 'RESTORING' AND id == '{restore_id}'",
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORING'",
"RESTORING",
)
assert "Concurrent restores not supported" in nodes[0].query_and_get_error(
assert "Concurrent restores not supported" in nodes[1].query_and_get_error(
f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}"
)
assert_eq_with_retry(
nodes[1],
nodes[0],
f"SELECT status FROM system.backups WHERE status == 'RESTORED' AND id == '{restore_id}'",
"RESTORED",
)

View File

@ -1,3 +1,5 @@
masked flush only
3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000","AUTHENTICATION_FAILED",1
masked
3,"default:*@127%2E0%2E0%2E1:9000,default:*@127%2E0%2E0%2E2:9000","AUTHENTICATION_FAILED",1
no masking

View File

@ -9,6 +9,20 @@ drop table if exists dist_01555;
drop table if exists data_01555;
create table data_01555 (key Int) Engine=Null();
--
-- masked flush only
--
SELECT 'masked flush only';
create table dist_01555 (key Int) Engine=Distributed(test_cluster_with_incorrect_pw, currentDatabase(), data_01555, key);
system stop distributed sends dist_01555;
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555; -- { serverError 516 }
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1'), extract(last_exception, 'AUTHENTICATION_FAILED'), dateDiff('s', last_exception_time, now()) < 5 from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;
drop table dist_01555;
--
-- masked
--
@ -29,7 +43,6 @@ SELECT 'no masking';
create table dist_01555 (key Int) Engine=Distributed(test_shard_localhost, currentDatabase(), data_01555, key);
insert into dist_01555 values (1)(2);
-- since test_cluster_with_incorrect_pw contains incorrect password ignore error
system flush distributed dist_01555;
select length(splitByChar('*', data_path)), replaceRegexpOne(data_path, '^.*/([^/]*)/' , '\\1') from system.distribution_queue where database = currentDatabase() and table = 'dist_01555' format CSV;

View File

@ -1,2 +1,2 @@
10 querystart OK
10 queryfinish OK
11 queryfinish OK
11 querystart OK

View File

@ -9,6 +9,8 @@ set log_queries=1;
drop table if exists log_proxy_02572;
drop table if exists push_to_logs_proxy_mv_02572;
-- create log tables
system flush logs;
create table log_proxy_02572 as system.query_log engine=Distributed('test_shard_localhost', currentDatabase(), 'receiver_02572');
create materialized view push_to_logs_proxy_mv_02572 to log_proxy_02572 as select * from system.query_log;
@ -23,4 +25,6 @@ system flush logs;
-- lower() to pass through clickhouse-test "exception" check
select count(), lower(type::String), errorCodeToName(exception_code)
from system.query_log
where current_database = currentDatabase() group by 2, 3;
where current_database = currentDatabase()
group by 2, 3
order by 2;

View File

@ -0,0 +1 @@
22 0 1

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS 02680_datetime64_monotonic_check;
CREATE TABLE 02680_datetime64_monotonic_check (`t` DateTime64(3), `x` Nullable(Decimal(18, 14)))
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(t)
ORDER BY x SETTINGS allow_nullable_key = 1;
INSERT INTO 02680_datetime64_monotonic_check VALUES (toDateTime64('2023-03-13 00:00:00', 3, 'Asia/Jerusalem'), 123);
SELECT toHour(toTimeZone(t, 'UTC')) AS toHour_UTC, toHour(toTimeZone(t, 'Asia/Jerusalem')) AS toHour_Israel, count()
FROM 02680_datetime64_monotonic_check
WHERE toHour_Israel = 0
GROUP BY toHour_UTC, toHour_Israel;
DROP TABLE 02680_datetime64_monotonic_check;

View File

@ -0,0 +1,2 @@
select INSTR('hello', 'e');
select INSTR('hELlo', 'L');