Merge remote-tracking branch 'ClickHouse/master' into change_date

This commit is contained in:
Robert Schulze 2024-05-22 21:07:20 +00:00
commit bb4f373038
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
90 changed files with 1151 additions and 275 deletions

View File

@ -42,25 +42,25 @@ At a minimum, the following information should be added (but add more as needed)
> Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/ > Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/
<details> <details>
<summary>Modify your CI run</summary> <summary>CI Settings</summary>
**NOTE:** If your merge the PR with modified CI you **MUST KNOW** what you are doing **NOTE:** If your merge the PR with modified CI you **MUST KNOW** what you are doing
**NOTE:** Checked options will be applied if set before CI RunConfig/PrepareRunConfig step **NOTE:** Checked options will be applied if set before CI RunConfig/PrepareRunConfig step
#### Include tests (required builds will be added automatically): #### Run these jobs only (required builds will be added automatically):
- [ ] <!---ci_include_fast--> Fast test
- [ ] <!---ci_include_integration--> Integration Tests - [ ] <!---ci_include_integration--> Integration Tests
- [ ] <!---ci_include_stateless--> Stateless tests - [ ] <!---ci_include_stateless--> Stateless tests
- [ ] <!---ci_include_stateful--> Stateful tests - [ ] <!---ci_include_stateful--> Stateful tests
- [ ] <!---ci_include_unit--> Unit tests - [ ] <!---ci_include_unit--> Unit tests
- [ ] <!---ci_include_performance--> Performance tests - [ ] <!---ci_include_performance--> Performance tests
- [ ] <!---ci_include_aarch64--> All with aarch64
- [ ] <!---ci_include_asan--> All with ASAN - [ ] <!---ci_include_asan--> All with ASAN
- [ ] <!---ci_include_tsan--> All with TSAN - [ ] <!---ci_include_tsan--> All with TSAN
- [ ] <!---ci_include_analyzer--> All with Analyzer - [ ] <!---ci_include_analyzer--> All with Analyzer
- [ ] <!---ci_include_azure --> All with Azure - [ ] <!---ci_include_azure --> All with Azure
- [ ] <!---ci_include_KEYWORD--> Add your option here - [ ] <!---ci_include_KEYWORD--> Add your option here
#### Exclude tests: #### Deny these jobs:
- [ ] <!---ci_exclude_fast--> Fast test - [ ] <!---ci_exclude_fast--> Fast test
- [ ] <!---ci_exclude_integration--> Integration Tests - [ ] <!---ci_exclude_integration--> Integration Tests
- [ ] <!---ci_exclude_stateless--> Stateless tests - [ ] <!---ci_exclude_stateless--> Stateless tests
@ -72,7 +72,6 @@ At a minimum, the following information should be added (but add more as needed)
- [ ] <!---ci_exclude_ubsan--> All with UBSAN - [ ] <!---ci_exclude_ubsan--> All with UBSAN
- [ ] <!---ci_exclude_coverage--> All with Coverage - [ ] <!---ci_exclude_coverage--> All with Coverage
- [ ] <!---ci_exclude_aarch64--> All with Aarch64 - [ ] <!---ci_exclude_aarch64--> All with Aarch64
- [ ] <!---ci_exclude_KEYWORD--> Add your option here
#### Extra options: #### Extra options:
- [ ] <!---do_not_test--> do not test (only style check) - [ ] <!---do_not_test--> do not test (only style check)

View File

@ -22,6 +22,9 @@ jobs:
clear-repository: true # to ensure correct digests clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version fetch-depth: 0 # to get version
filter: tree:0 filter: tree:0
- name: Cancel PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run
- name: Python unit tests - name: Python unit tests
run: | run: |
cd "$GITHUB_WORKSPACE/tests/ci" cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -197,6 +197,7 @@ SELECT * FROM nestedt FORMAT TSV
- [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`. - [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`.
- [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`. - [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`.
- [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`. - [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV input format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`. - [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`.
- [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`. - [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`.
- [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`. - [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.

View File

@ -561,6 +561,25 @@ Default value: 5000
<max_table_num_to_warn>400</max_table_num_to_warn> <max_table_num_to_warn>400</max_table_num_to_warn>
``` ```
## max\_view\_num\_to\_warn {#max-view-num-to-warn}
If the number of attached views exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.
Default value: 10000
**Example**
``` xml
<max_view_num_to_warn>400</max_view_num_to_warn>
```
## max\_dictionary\_num\_to\_warn {#max-dictionary-num-to-warn}
If the number of attached dictionaries exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.
Default value: 1000
**Example**
``` xml
<max_dictionary_num_to_warn>400</max_dictionary_num_to_warn>
```
## max\_part\_num\_to\_warn {#max-part-num-to-warn} ## max\_part\_num\_to\_warn {#max-part-num-to-warn}
If the number of active parts exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table. If the number of active parts exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.

View File

@ -831,7 +831,13 @@ Default value: `0`.
### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line} ### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line}
Use DOC/Windows-style line separator (CRLF) in TSV instead of Unix style (LF). Use DOS/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Disabled by default.
### input_format_tsv_crlf_end_of_line {#input_format_tsv_crlf_end_of_line}
Use DOS/Windows-style line separator (CRLF) for TSV input files instead of Unix style (LF).
Disabled by default. Disabled by default.

View File

@ -119,6 +119,7 @@ Hello\nworld
Hello\ Hello\
world world
``` ```
`\n\r` (CRLF) поддерживается с помощью настройки `input_format_tsv_crlf_end_of_line`.
Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа. Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа.

View File

@ -1178,7 +1178,7 @@ void Client::processConfig()
pager = config().getString("pager", ""); pager = config().getString("pager", "");
setDefaultFormatsFromConfiguration(); setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME)); global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial(); global_context->setQueryKindInitial();

View File

@ -607,7 +607,7 @@ void LocalServer::processConfig()
if (config().has("macros")) if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log)); global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
setDefaultFormatsFromConfiguration(); setDefaultFormatsAndCompressionFromConfiguration();
/// Sets external authenticators config (LDAP, Kerberos). /// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config()); global_context->setExternalAuthenticatorsConfig(config());

View File

@ -1476,6 +1476,8 @@ try
global_context->setMaxTableSizeToDrop(new_server_settings.max_table_size_to_drop); global_context->setMaxTableSizeToDrop(new_server_settings.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(new_server_settings.max_partition_size_to_drop); global_context->setMaxPartitionSizeToDrop(new_server_settings.max_partition_size_to_drop);
global_context->setMaxTableNumToWarn(new_server_settings.max_table_num_to_warn); global_context->setMaxTableNumToWarn(new_server_settings.max_table_num_to_warn);
global_context->setMaxViewNumToWarn(new_server_settings.max_view_num_to_warn);
global_context->setMaxDictionaryNumToWarn(new_server_settings.max_dictionary_num_to_warn);
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn); global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn); global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);

View File

@ -4617,6 +4617,36 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
std::unordered_set<std::string> table_expression_column_names_to_skip; std::unordered_set<std::string> table_expression_column_names_to_skip;
QueryTreeNodesWithNames result;
if (matcher_node_typed.getMatcherType() == MatcherNodeType::COLUMNS_LIST)
{
auto identifiers = matcher_node_typed.getColumnsIdentifiers();
result.reserve(identifiers.size());
for (const auto & identifier : identifiers)
{
auto resolve_result = tryResolveIdentifier(IdentifierLookup{identifier, IdentifierLookupContext::EXPRESSION}, scope);
if (!resolve_result.isResolved())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown identifier '{}' inside COLUMNS matcher. In scope {}",
identifier.getFullName(), scope.dump());
// TODO: Introduce IdentifierLookupContext::COLUMN and get rid of this check
auto * resolved_column = resolve_result.resolved_identifier->as<ColumnNode>();
if (!resolved_column)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Identifier '{}' inside COLUMNS matcher must resolve into a column, but got {}. In scope {}",
identifier.getFullName(),
resolve_result.resolved_identifier->getNodeTypeName(),
scope.scope_node->formatASTForErrorMessage());
result.emplace_back(resolve_result.resolved_identifier, resolved_column->getColumnName());
}
return result;
}
result.resize(matcher_node_typed.getColumnsIdentifiers().size());
for (auto & table_expression : table_expressions_stack) for (auto & table_expression : table_expressions_stack)
{ {
bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get()); bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get());
@ -4784,8 +4814,6 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
table_expressions_column_nodes_with_names_stack.push_back(std::move(matched_column_nodes_with_names)); table_expressions_column_nodes_with_names_stack.push_back(std::move(matched_column_nodes_with_names));
} }
QueryTreeNodesWithNames result;
for (auto & table_expression_column_nodes_with_names : table_expressions_column_nodes_with_names_stack) for (auto & table_expression_column_nodes_with_names : table_expressions_column_nodes_with_names_stack)
{ {
for (auto && table_expression_column_node_with_name : table_expression_column_nodes_with_names) for (auto && table_expression_column_node_with_name : table_expression_column_nodes_with_names)

View File

@ -21,6 +21,7 @@
#include <Common/StringUtils.h> #include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/NetException.h> #include <Common/NetException.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
@ -643,6 +644,9 @@ try
bool extras_into_stdout = need_render_progress || logs_into_stdout; bool extras_into_stdout = need_render_progress || logs_into_stdout;
bool select_only_into_file = select_into_file && !select_into_file_and_stdout; bool select_only_into_file = select_into_file && !select_into_file_and_stdout;
if (!out_file_buf && default_output_compression_method != CompressionMethod::None)
out_file_buf = wrapWriteBufferWithCompressionMethod(out_buf, default_output_compression_method, 3, 0);
/// It is not clear how to write progress and logs /// It is not clear how to write progress and logs
/// intermixed with data with parallel formatting. /// intermixed with data with parallel formatting.
/// It may increase code complexity significantly. /// It may increase code complexity significantly.
@ -735,7 +739,7 @@ bool ClientBase::isRegularFile(int fd)
return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode); return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
} }
void ClientBase::setDefaultFormatsFromConfiguration() void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
{ {
if (config().has("output-format")) if (config().has("output-format"))
{ {
@ -759,6 +763,10 @@ void ClientBase::setDefaultFormatsFromConfiguration()
default_output_format = *format_from_file_name; default_output_format = *format_from_file_name;
else else
default_output_format = "TSV"; default_output_format = "TSV";
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(STDOUT_FILENO);
if (file_name)
default_output_compression_method = chooseCompressionMethod(*file_name, "");
} }
else if (is_interactive) else if (is_interactive)
{ {

View File

@ -190,7 +190,7 @@ protected:
/// Adjust some settings after command line options and config had been processed. /// Adjust some settings after command line options and config had been processed.
void adjustSettings(); void adjustSettings();
void setDefaultFormatsFromConfiguration(); void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress); void initTTYBuffer(ProgressOption progress);
@ -224,6 +224,7 @@ protected:
String pager; String pager;
String default_output_format; /// Query results output format. String default_output_format; /// Query results output format.
CompressionMethod default_output_compression_method = CompressionMethod::None;
String default_input_format; /// Tables' format for clickhouse-local. String default_input_format; /// Tables' format for clickhouse-local.
bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering. bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering.

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int ASYNC_LOAD_CYCLE; extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED; extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED; extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
@ -433,7 +434,7 @@ void AsyncLoader::wait(const LoadJobPtr & job, bool no_throw)
std::unique_lock job_lock{job->mutex}; std::unique_lock job_lock{job->mutex};
wait(job_lock, job); wait(job_lock, job);
if (!no_throw && job->load_exception) if (!no_throw && job->load_exception)
std::rethrow_exception(job->load_exception); throw Exception(ErrorCodes::ASYNC_LOAD_WAIT_FAILED, "Waited job failed: {}", getExceptionMessage(job->load_exception, /* with_stacktrace = */ false));
} }
void AsyncLoader::remove(const LoadJobSet & jobs) void AsyncLoader::remove(const LoadJobSet & jobs)

View File

@ -224,6 +224,8 @@
M(PartsActive, "Active data part, used by current and upcoming SELECTs.") \ M(PartsActive, "Active data part, used by current and upcoming SELECTs.") \
M(AttachedDatabase, "Active database, used by current and upcoming SELECTs.") \ M(AttachedDatabase, "Active database, used by current and upcoming SELECTs.") \
M(AttachedTable, "Active table, used by current and upcoming SELECTs.") \ M(AttachedTable, "Active table, used by current and upcoming SELECTs.") \
M(AttachedView, "Active view, used by current and upcoming SELECTs.") \
M(AttachedDictionary, "Active dictionary, used by current and upcoming SELECTs.") \
M(PartsOutdated, "Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.") \ M(PartsOutdated, "Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.") \
M(PartsDeleting, "Not active data part with identity refcounter, it is deleting right now by a cleaner.") \ M(PartsDeleting, "Not active data part with identity refcounter, it is deleting right now by a cleaner.") \
M(PartsDeleteOnDestroy, "Part was moved to another disk and should be deleted in own destructor.") \ M(PartsDeleteOnDestroy, "Part was moved to another disk and should be deleted in own destructor.") \

View File

@ -600,6 +600,7 @@
M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \ M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \
M(720, USER_EXPIRED) \ M(720, USER_EXPIRED) \
M(721, DEPRECATED_FUNCTION) \ M(721, DEPRECATED_FUNCTION) \
M(722, ASYNC_LOAD_WAIT_FAILED) \
\ \
M(900, DISTRIBUTED_CACHE_ERROR) \ M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -40,6 +40,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \ REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \ REGULAR(cluster_discovery_faults) \
REGULAR(replicated_sends_failpoint) \ REGULAR(replicated_sends_failpoint) \
REGULAR(stripe_log_sink_write_fallpoint)\
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \ ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \ ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \ ONCE(smt_commit_write_zk_fail_after_op) \
@ -58,6 +59,7 @@ static struct InitFiu
ONCE(execute_query_calling_empty_set_result_func_on_exception) \ ONCE(execute_query_calling_empty_set_result_func_on_exception) \
ONCE(receive_timeout_on_table_status_response) ONCE(receive_timeout_on_table_status_response)
namespace FailPoints namespace FailPoints
{ {
#define M(NAME) extern const char(NAME)[] = #NAME ""; #define M(NAME) extern const char(NAME)[] = #NAME "";

View File

@ -35,6 +35,7 @@ namespace DB::ErrorCodes
extern const int ASYNC_LOAD_CYCLE; extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED; extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED; extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
} }
struct Initializer { struct Initializer {
@ -262,7 +263,8 @@ TEST(AsyncLoader, CancelPendingJob)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
} }
@ -288,7 +290,8 @@ TEST(AsyncLoader, CancelPendingTask)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
try try
@ -298,7 +301,8 @@ TEST(AsyncLoader, CancelPendingTask)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
} }
@ -325,7 +329,8 @@ TEST(AsyncLoader, CancelPendingDependency)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
try try
@ -335,7 +340,8 @@ TEST(AsyncLoader, CancelPendingDependency)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
} }
@ -451,8 +457,9 @@ TEST(AsyncLoader, JobFailure)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos); ASSERT_TRUE(e.message().contains(error_message));
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_FAILED"));
} }
} }
@ -489,8 +496,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos); ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
} }
try try
{ {
@ -499,8 +507,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos); ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
} }
} }
@ -531,7 +540,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
try try
{ {
@ -540,7 +550,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
} }
catch (Exception & e) catch (Exception & e)
{ {
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
} }
} }

View File

@ -0,0 +1,33 @@
#include <Common/tryGetFileNameByFileDescriptor.h>
#ifdef OS_LINUX
# include <unistd.h>
#elif defined(OS_DARWIN)
# include <fcntl.h>
#endif
#include <fmt/format.h>
namespace DB
{
std::optional<String> tryGetFileNameFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return file_path;
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return file_path;
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <optional>
#include <base/types.h>
namespace DB
{
/// Supports only Linux/MacOS. On other platforms, returns nullopt.
std::optional<String> tryGetFileNameFromFileDescriptor(int fd);
}

View File

@ -97,6 +97,8 @@ namespace DB
M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \ M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \
M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \ M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \
M(UInt64, max_table_num_to_warn, 5000lu, "If number of tables is greater than this value, server will create a warning that will displayed to user.", 0) \ M(UInt64, max_table_num_to_warn, 5000lu, "If number of tables is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_view_num_to_warn, 10000lu, "If number of views is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_dictionary_num_to_warn, 1000lu, "If number of dictionaries is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_database_num_to_warn, 1000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \ M(UInt64, max_database_num_to_warn, 1000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_part_num_to_warn, 100000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \ M(UInt64, max_part_num_to_warn, 100000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \ M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \

View File

@ -1079,6 +1079,7 @@ class IColumn;
M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \ M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \ M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \ M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
\ \
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \ M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
\ \

View File

@ -87,6 +87,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{ {
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"}, {"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."}, {"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
{"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."}, {"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."},
{"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."}, {"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."},
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."}, {"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},

View File

@ -146,10 +146,10 @@ void SerializationAggregateFunction::serializeTextEscaped(const IColumn & column
} }
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
String s; String s;
readEscapedString(s, istr); settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr);
deserializeFromString(function, column, s, version); deserializeFromString(function, column, s, version);
} }

View File

@ -242,8 +242,10 @@ void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & is
{ {
if (istr.eof()) if (istr.eof())
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF."); throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF.");
if (settings.tsv.crlf_end_of_line_input)
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; }); deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || *buf.position() == '\r'; });
else
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
} }
bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const

View File

@ -75,7 +75,7 @@ void SerializationCustomSimpleText::serializeTextEscaped(const IColumn & column,
void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
String str; String str;
readEscapedString(str, istr); settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(str, istr) : readEscapedString(str, istr);
deserializeFromString(*this, column, str, settings); deserializeFromString(*this, column, str, settings);
} }

View File

@ -29,7 +29,7 @@ void SerializationEnum<Type>::deserializeTextEscaped(IColumn & column, ReadBuffe
{ {
/// NOTE It would be nice to do without creating a temporary object - at least extract std::string out. /// NOTE It would be nice to do without creating a temporary object - at least extract std::string out.
std::string field_name; std::string field_name;
readEscapedString(field_name, istr); settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field_name, istr) : readEscapedString(field_name, istr);
assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true)); assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true));
} }
} }

View File

@ -10,8 +10,10 @@
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include "Common/PODArray.h"
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/assert_cast.h> #include <Common/assert_cast.h>
#include "base/types.h"
namespace DB namespace DB
{ {
@ -183,14 +185,17 @@ static inline bool tryRead(const SerializationFixedString & self, IColumn & colu
} }
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
read(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); }); read(*this, column, [&istr, &settings](ColumnFixedString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<ColumnFixedString::Chars,true>(data, istr) : readEscapedStringInto<ColumnFixedString::Chars,false>(data, istr);
});
} }
bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{ {
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); return true; }); return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr); return true; });
} }

View File

@ -286,7 +286,7 @@ bool SerializationNullable::tryDeserializeNullRaw(DB::ReadBuffer & istr, const D
} }
template<typename ReturnType, bool escaped> template<typename ReturnType, bool escaped>
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null) ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
{ {
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>; static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
@ -319,10 +319,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Check if we have enough data in buffer to check if it's a null. /// Check if we have enough data in buffer to check if it's a null.
if (istr.available() > null_representation.size()) if (istr.available() > null_representation.size())
{ {
auto check_for_null = [&null_representation](ReadBuffer & buf) auto check_for_null = [&null_representation, &settings](ReadBuffer & buf)
{ {
auto * pos = buf.position(); auto * pos = buf.position();
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n')) if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true; return true;
buf.position() = pos; buf.position() = pos;
return false; return false;
@ -334,14 +334,14 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Use PeekableReadBuffer to make a checkpoint before checking null /// Use PeekableReadBuffer to make a checkpoint before checking null
/// representation and rollback if check was failed. /// representation and rollback if check was failed.
PeekableReadBuffer peekable_buf(istr, true); PeekableReadBuffer peekable_buf(istr, true);
auto check_for_null = [&null_representation](ReadBuffer & buf_) auto check_for_null = [&null_representation, &settings](ReadBuffer & buf_)
{ {
auto & buf = assert_cast<PeekableReadBuffer &>(buf_); auto & buf = assert_cast<PeekableReadBuffer &>(buf_);
buf.setCheckpoint(); buf.setCheckpoint();
SCOPE_EXIT(buf.dropCheckpoint()); SCOPE_EXIT(buf.dropCheckpoint());
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'))
return true;
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.rollbackToCheckpoint(); buf.rollbackToCheckpoint();
return false; return false;
}; };
@ -371,7 +371,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos) if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation " throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\t' or '\\n' may not work correctly for large input."); "containing '\\t' or '\\n' may not work correctly for large input.");
if (settings.tsv.crlf_end_of_line_input && null_representation.find('\r') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\r' may not work correctly for large input.");
WriteBufferFromOwnString parsed_value; WriteBufferFromOwnString parsed_value;
if constexpr (escaped) if constexpr (escaped)

View File

@ -104,9 +104,9 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
} }
template <typename Parser> template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); }); deserializeTextImpl(column, [&](String & s) { settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); });
} }
template <typename Parser> template <typename Parser>

View File

@ -147,7 +147,6 @@ void SerializationString::serializeBinaryBulk(const IColumn & column, WriteBuffe
} }
} }
template <int UNROLL_TIMES> template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit) static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
{ {
@ -324,14 +323,17 @@ bool SerializationString::tryDeserializeWholeText(IColumn & column, ReadBuffer &
return read<bool>(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; }); return read<bool>(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; });
} }
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
read<void>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); }); read<void>(column, [&](ColumnString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr) : readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr);
});
} }
bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{ {
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); return true; }); return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr); return true; });
} }
void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -599,14 +599,14 @@ void SerializationVariant::serializeTextEscaped(const IColumn & column, size_t r
bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
String field; String field;
readEscapedString(field, istr); settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
return tryDeserializeTextEscapedImpl(column, field, settings); return tryDeserializeTextEscapedImpl(column, field, settings);
} }
void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{ {
String field; String field;
readEscapedString(field, istr); settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
if (!tryDeserializeTextEscapedImpl(column, field, settings)) if (!tryDeserializeTextEscapedImpl(column, field, settings))
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field); throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field);
} }

View File

@ -1,6 +1,14 @@
#include <Databases/DatabaseLazy.h>
#include <base/sort.h>
#include <iomanip>
#include <filesystem>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Databases/DatabaseFactory.h> #include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseOnDisk.h> #include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabasesCommon.h> #include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -10,13 +18,7 @@
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <base/sort.h>
#include <iomanip>
#include <filesystem>
namespace fs = std::filesystem; namespace fs = std::filesystem;

View File

@ -5,6 +5,7 @@
#include <span> #include <span>
#include <Databases/DatabaseAtomic.h> #include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h> #include <Databases/DatabaseOrdinary.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h> #include <IO/WriteBufferFromFile.h>
@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
StoragePtr table = detachTable(local_context, table_name); StoragePtr table = detachTable(local_context, table_name);
/// This is possible for Lazy database.
if (!table)
return;
bool renamed = false; bool renamed = false;
try try
{ {
fs::rename(table_metadata_path, table_metadata_path_drop); fs::rename(table_metadata_path, table_metadata_path_drop);
renamed = true; renamed = true;
table->drop(); // The table might be not loaded for Lazy database engine.
table->is_dropped = true; if (table)
{
fs::path table_data_dir(local_context->getPath() + table_data_path_relative); table->drop();
if (fs::exists(table_data_dir)) table->is_dropped = true;
(void)fs::remove_all(table_data_dir); }
} }
catch (...) catch (...)
{ {
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true)); LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
attachTable(local_context, table_name, table, table_data_path_relative); if (table)
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed) if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path); fs::rename(table_metadata_path_drop, table_metadata_path);
throw; throw;
} }
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isReadOnly() || !disk->exists(table_data_path_relative))
continue;
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
disk->removeRecursive(table_data_path_relative);
}
(void)fs::remove(table_metadata_path_drop); (void)fs::remove(table_metadata_path_drop);
} }

View File

@ -76,20 +76,6 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex
String replica_path = server_settings.default_replica_path; String replica_path = server_settings.default_replica_path;
String replica_name = server_settings.default_replica_name; String replica_name = server_settings.default_replica_name;
/// Check that replica path doesn't exist
Macros::MacroExpansionInfo info;
StorageID table_id = StorageID(create_query->getDatabase(), create_query->getTable(), create_query->uuid);
info.table_id = table_id;
info.expand_special_macros_only = false;
String zookeeper_path = context->getMacros()->expand(replica_path, info);
if (context->getZooKeeper()->exists(zookeeper_path))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Found existing ZooKeeper path {} while trying to convert table {} to replicated. Table will not be converted.",
zookeeper_path, backQuote(table_id.getFullTableName())
);
auto args = std::make_shared<ASTExpressionList>(); auto args = std::make_shared<ASTExpressionList>();
args->children.push_back(std::make_shared<ASTLiteral>(replica_path)); args->children.push_back(std::make_shared<ASTLiteral>(replica_path));
args->children.push_back(std::make_shared<ASTLiteral>(replica_name)); args->children.push_back(std::make_shared<ASTLiteral>(replica_name));

View File

@ -1,4 +1,10 @@
#include <Databases/DatabasesCommon.h> #include <Databases/DatabasesCommon.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
@ -8,17 +14,8 @@
#include <Parsers/formatAST.h> #include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h> #include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h> #include <Storages/Utils.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/RestorerFromBackup.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
}
namespace DB namespace DB
@ -263,7 +260,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
res = it->second; res = it->second;
tables.erase(it); tables.erase(it);
res->is_detached = true; res->is_detached = true;
CurrentMetrics::sub(CurrentMetrics::AttachedTable, 1); CurrentMetrics::sub(getAttachedCounterForStorage(res), 1);
auto table_id = res->getStorageID(); auto table_id = res->getStorageID();
if (table_id.hasUUID()) if (table_id.hasUUID())
@ -304,7 +301,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
/// It is important to reset is_detached here since in case of RENAME in /// It is important to reset is_detached here since in case of RENAME in
/// non-Atomic database the is_detached is set to true before RENAME. /// non-Atomic database the is_detached is set to true before RENAME.
table->is_detached = false; table->is_detached = false;
CurrentMetrics::add(CurrentMetrics::AttachedTable, 1); CurrentMetrics::add(getAttachedCounterForStorage(table), 1);
} }
void DatabaseWithOwnTablesBase::shutdown() void DatabaseWithOwnTablesBase::shutdown()

View File

@ -244,6 +244,13 @@ public:
return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings); return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings);
} }
/// Truncate file to the target size.
void truncateFile(const std::string & src_path, size_t target_size) override
{
auto wrapped_path = wrappedPath(src_path);
delegate_transaction->truncateFile(wrapped_path, target_size);
}
private: private:

View File

@ -2,10 +2,16 @@
#include <Disks/IDiskTransaction.h> #include <Disks/IDiskTransaction.h>
#include <IO/WriteBufferFromFileBase.h> #include <IO/WriteBufferFromFileBase.h>
#include <Common/Exception.h>
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Fake disk transaction implementation. /// Fake disk transaction implementation.
/// Just execute all operations immediately, commit is noop operation. /// Just execute all operations immediately, commit is noop operation.
/// No support for atomicity and rollback. /// No support for atomicity and rollback.
@ -134,6 +140,11 @@ public:
disk.createHardLink(src_path, dst_path); disk.createHardLink(src_path, dst_path);
} }
void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented");
}
private: private:
IDisk & disk; IDisk & disk;
}; };

View File

@ -128,6 +128,9 @@ public:
/// Create hardlink from `src_path` to `dst_path`. /// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0; virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
}; };
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>; using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;

View File

@ -133,6 +133,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit(); transaction->commit();
} }
void DiskObjectStorage::truncateFile(const String & path, size_t size)
{
LOG_TEST(log, "Truncate file operation {} to size : {}", path, size);
auto transaction = createObjectStorageTransaction();
transaction->truncateFile(path, size);
transaction->commit();
}
void DiskObjectStorage::copyFile( /// NOLINT void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path, const String & from_file_path,
IDisk & to_disk, IDisk & to_disk,

View File

@ -84,6 +84,8 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override; void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void truncateFile(const String & path, size_t size) override;
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; } MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override; UInt32 getRefCount(const String & path) const override;

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int UNKNOWN_FORMAT; extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
} }
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf) void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}}); keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
} }
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()
{
if (keys_with_meta.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path);
ObjectKeyWithMetadata object = std::move(keys_with_meta.back());
keys_with_meta.pop_back();
total_size -= object.metadata.size_bytes;
return object;
}
bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting() bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
{ {
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD #ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD

View File

@ -52,6 +52,7 @@ public:
void addObject(ObjectStorageKey key, size_t size); void addObject(ObjectStorageKey key, size_t size);
ObjectKeyWithMetadata popLastObject();
void deserialize(ReadBuffer & buf); void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data); void deserializeFromString(const std::string & data);

View File

@ -559,6 +559,54 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
} }
}; };
struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
size_t size;
TruncateFileOperationOutcomePtr truncate_outcome;
TruncateFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
size_t size_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, size(size_)
{}
std::string getInfoForLog() const override
{
return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size);
}
void execute(MetadataTransactionPtr tx) override
{
if (metadata_storage.exists(path))
{
if (!metadata_storage.isFile(path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path);
truncate_outcome = tx->truncateFile(path, size);
}
}
void undo() override
{
}
void finalize() override
{
if (!truncate_outcome)
return;
if (!truncate_outcome->objects_to_remove.empty())
object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove);
}
};
} }
void DiskObjectStorageTransaction::createDirectory(const std::string & path) void DiskObjectStorageTransaction::createDirectory(const std::string & path)
@ -598,6 +646,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
})); }));
} }
void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size)
{
operations_to_execute.emplace_back(
std::make_unique<TruncateFileObjectStorageOperation>(object_storage, metadata_storage, path, size)
);
}
void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path) void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{ {
auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path); auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path);

View File

@ -92,6 +92,8 @@ public:
void createFile(const String & path) override; void createFile(const String & path) override;
void truncateFile(const String & path, size_t size) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override; void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
/// writeFile is a difficult function for transactions. /// writeFile is a difficult function for transactions.

View File

@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome
UInt32 num_hardlinks = std::numeric_limits<UInt32>::max(); UInt32 num_hardlinks = std::numeric_limits<UInt32>::max();
}; };
struct TruncateFileOperationOutcome
{
StoredObjects objects_to_remove;
};
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>; using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
using TruncateFileOperationOutcomePtr = std::shared_ptr<TruncateFileOperationOutcome>;
/// Tries to provide some "transactions" interface, which allow /// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide /// to execute (commit) operations simultaneously. We don't provide
@ -143,6 +151,11 @@ public:
return nullptr; return nullptr;
} }
virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */)
{
throwNotImplemented();
}
virtual ~IMetadataTransaction() = default; virtual ~IMetadataTransaction() = default;
protected: protected:

View File

@ -259,4 +259,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink
return result; return result;
} }
TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size)
{
auto operation = std::make_unique<TruncateMetadataFileOperation>(path, target_size, metadata_storage, *metadata_storage.getDisk());
auto result = operation->outcome;
addOperation(std::move(operation));
return result;
}
} }

View File

@ -129,6 +129,8 @@ public:
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override; UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override;
}; };

View File

@ -4,9 +4,12 @@
#include <Common/getRandomASCIIString.h> #include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <optional> #include <optional>
#include <ranges> #include <ranges>
#include <filesystem> #include <filesystem>
#include <utility>
namespace fs = std::filesystem; namespace fs = std::filesystem;
@ -14,6 +17,11 @@ namespace fs = std::filesystem;
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static std::string getTempFileName(const std::string & dir) static std::string getTempFileName(const std::string & dir)
{ {
return fs::path(dir) / getRandomASCIIString(32); return fs::path(dir) / getRandomASCIIString(32);
@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
outcome->num_hardlinks++; outcome->num_hardlinks++;
} }
void TruncateMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
if (metadata_storage.exists(path))
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
while (metadata->getTotalSizeBytes() > target_size)
{
auto object_key_with_metadata = metadata->popLastObject();
outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes);
}
if (metadata->getTotalSizeBytes() != target_size)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size);
}
LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size());
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
write_operation->execute(metadata_lock);
}
}
void TruncateMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
{
if (write_operation)
write_operation->undo(lock);
}
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock) void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{ {
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);

View File

@ -282,4 +282,34 @@ private:
std::unique_ptr<WriteFileOperation> write_operation; std::unique_ptr<WriteFileOperation> write_operation;
}; };
struct TruncateMetadataFileOperation final : public IMetadataOperation
{
const TruncateFileOperationOutcomePtr outcome = std::make_shared<TruncateFileOperationOutcome>();
TruncateMetadataFileOperation(
const std::string & path_,
size_t target_size_,
const MetadataStorageFromDisk & metadata_storage_,
IDisk & disk_)
: path(path_)
, target_size(target_size_)
, metadata_storage(metadata_storage_)
, disk(disk_)
{
}
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo(std::unique_lock<SharedMutex> & lock) override;
private:
std::string path;
size_t target_size;
const MetadataStorageFromDisk & metadata_storage;
IDisk & disk;
std::unique_ptr<WriteFileOperation> write_operation;
};
} }

View File

@ -76,7 +76,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
/// Empty field, just skip spaces /// Empty field, just skip spaces
break; break;
case FormatSettings::EscapingRule::Escaped: case FormatSettings::EscapingRule::Escaped:
readEscapedStringInto(out, buf); readEscapedStringInto<NullOutput,false>(out, buf);
break; break;
case FormatSettings::EscapingRule::Quoted: case FormatSettings::EscapingRule::Quoted:
readQuotedFieldInto(out, buf); readQuotedFieldInto(out, buf);

View File

@ -1,6 +1,7 @@
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <algorithm> #include <algorithm>
#include <unistd.h>
#include <Formats/FormatSettings.h> #include <Formats/FormatSettings.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h> #include <Interpreters/ProcessList.h>
@ -15,7 +16,7 @@
#include <Poco/URI.h> #include <Poco/URI.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/KnownObjectNames.h> #include <Common/KnownObjectNames.h>
#include <unistd.h> #include <Common/tryGetFileNameByFileDescriptor.h>
#include <boost/algorithm/string/case_conv.hpp> #include <boost/algorithm/string/case_conv.hpp>
@ -202,6 +203,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header; format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header;
format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines; format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines;
format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns; format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns;
format_settings.tsv.crlf_end_of_line_input = settings.input_format_tsv_crlf_end_of_line;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals; format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon; format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions; format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
@ -693,21 +695,12 @@ String FormatFactory::getFormatFromFileName(String file_name)
std::optional<String> FormatFactory::tryGetFormatFromFileDescriptor(int fd) std::optional<String> FormatFactory::tryGetFormatFromFileDescriptor(int fd)
{ {
#ifdef OS_LINUX std::optional<String> file_name = tryGetFileNameFromFileDescriptor(fd);
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'}; if (file_name)
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1) return tryGetFormatFromFileName(*file_name);
return tryGetFormatFromFileName(file_path);
return std::nullopt; return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return tryGetFormatFromFileName(file_path);
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
} }
String FormatFactory::getFormatFromFileDescriptor(int fd) String FormatFactory::getFormatFromFileDescriptor(int fd)

View File

@ -361,6 +361,7 @@ struct FormatSettings
bool try_detect_header = true; bool try_detect_header = true;
bool skip_trailing_empty_lines = false; bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false; bool allow_variable_number_of_columns = false;
bool crlf_end_of_line_input = false;
} tsv{}; } tsv{};
struct struct

View File

@ -49,6 +49,8 @@
#include <base/bit_cast.h> #include <base/bit_cast.h>
#include <base/unaligned.h> #include <base/unaligned.h>
#include <algorithm>
namespace DB namespace DB
{ {
@ -75,17 +77,29 @@ namespace impl
ColumnPtr key0; ColumnPtr key0;
ColumnPtr key1; ColumnPtr key1;
bool is_const; bool is_const;
const ColumnArray::Offsets * offsets{};
size_t size() const size_t size() const
{ {
assert(key0 && key1); assert(key0 && key1);
assert(key0->size() == key1->size()); assert(key0->size() == key1->size());
assert(offsets == nullptr || offsets->size() == key0->size());
if (offsets != nullptr)
return offsets->back();
return key0->size(); return key0->size();
} }
SipHashKey getKey(size_t i) const SipHashKey getKey(size_t i) const
{ {
if (is_const) if (is_const)
i = 0; i = 0;
if (offsets != nullptr)
{
const auto *const begin = offsets->begin();
const auto * upper = std::upper_bound(begin, offsets->end(), i);
if (upper == offsets->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "offset {} not found in function SipHashKeyColumns::getKey", i);
i = upper - begin;
}
const auto & key0data = assert_cast<const ColumnUInt64 &>(*key0).getData(); const auto & key0data = assert_cast<const ColumnUInt64 &>(*key0).getData();
const auto & key1data = assert_cast<const ColumnUInt64 &>(*key1).getData(); const auto & key1data = assert_cast<const ColumnUInt64 &>(*key1).getData();
return {key0data[i], key1data[i]}; return {key0data[i], key1data[i]};
@ -1112,7 +1126,15 @@ private:
typename ColumnVector<ToType>::Container vec_temp(nested_size); typename ColumnVector<ToType>::Container vec_temp(nested_size);
bool nested_is_first = true; bool nested_is_first = true;
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
if constexpr (Keyed)
{
KeyColumnsType key_cols_tmp{key_cols};
key_cols_tmp.offsets = &offsets;
executeForArgument(key_cols_tmp, nested_type, nested_column, vec_temp, nested_is_first);
}
else
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
const size_t size = offsets.size(); const size_t size = offsets.size();

View File

@ -352,7 +352,6 @@ static ReturnType parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
{ {
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
} }
s.push_back(unhex2(hex_code)); s.push_back(unhex2(hex_code));
} }
else if (char_after_backslash == 'N') else if (char_after_backslash == 'N')
@ -608,13 +607,20 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf, bool kee
} }
template <typename Vector, bool parse_complex_escape_sequence> template <typename Vector, bool parse_complex_escape_sequence, bool support_crlf>
void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf) void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
{ {
while (!buf.eof()) while (!buf.eof())
{ {
char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end()); char * next_pos;
if constexpr (support_crlf)
{
next_pos = find_first_symbols<'\t', '\n', '\\','\r'>(buf.position(), buf.buffer().end());
}
else
{
next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
}
appendToStringOrVector(s, buf, next_pos); appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos; buf.position() = next_pos;
@ -641,25 +647,46 @@ void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
} }
} }
} }
if constexpr (support_crlf)
{
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() != '\n')
{
s.push_back('\r');
continue;
}
return;
}
}
} }
} }
template <typename Vector> template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf) void readEscapedStringInto(Vector & s, ReadBuffer & buf)
{ {
readEscapedStringIntoImpl<Vector, true>(s, buf); readEscapedStringIntoImpl<Vector, true, support_crlf>(s, buf);
} }
void readEscapedString(String & s, ReadBuffer & buf) void readEscapedString(String & s, ReadBuffer & buf)
{ {
s.clear(); s.clear();
readEscapedStringInto(s, buf); readEscapedStringInto<String,false>(s, buf);
} }
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf); void readEscapedStringCRLF(String & s, ReadBuffer & buf)
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf); {
s.clear();
readEscapedStringInto<String,true>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>,false>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,false>(NullOutput & s, ReadBuffer & buf);
template void readEscapedStringInto<PaddedPODArray<UInt8>,true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,true>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true, /** If enable_sql_style_quoting == true,
* strings like 'abc''def' will be parsed as abc'def. * strings like 'abc''def' will be parsed as abc'def.
@ -2069,7 +2096,14 @@ bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON &
void readTSVField(String & s, ReadBuffer & buf) void readTSVField(String & s, ReadBuffer & buf)
{ {
s.clear(); s.clear();
readEscapedStringIntoImpl<String, false>(s, buf); readEscapedStringIntoImpl<String, false, false>(s, buf);
} }
void readTSVFieldCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false, true>(s, buf);
}
} }

View File

@ -583,6 +583,8 @@ void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf); void readEscapedString(String & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf);
void readQuotedString(String & s, ReadBuffer & buf); void readQuotedString(String & s, ReadBuffer & buf);
void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf); void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf);
@ -645,7 +647,7 @@ void readStringInto(Vector & s, ReadBuffer & buf);
template <typename Vector> template <typename Vector>
void readNullTerminated(Vector & s, ReadBuffer & buf); void readNullTerminated(Vector & s, ReadBuffer & buf);
template <typename Vector> template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf); void readEscapedStringInto(Vector & s, ReadBuffer & buf);
template <bool enable_sql_style_quoting, typename Vector> template <bool enable_sql_style_quoting, typename Vector>
@ -1901,6 +1903,7 @@ void readJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & se
bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings); bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings);
void readTSVField(String & s, ReadBuffer & buf); void readTSVField(String & s, ReadBuffer & buf);
void readTSVFieldCRLF(String & s, ReadBuffer & buf);
/** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters). /** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters).
* It is assumed that the cursor is located on the `\` symbol * It is assumed that the cursor is located on the `\` symbol

View File

@ -160,6 +160,8 @@ namespace CurrentMetrics
extern const Metric TablesLoaderForegroundThreadsScheduled; extern const Metric TablesLoaderForegroundThreadsScheduled;
extern const Metric IOWriterThreadsScheduled; extern const Metric IOWriterThreadsScheduled;
extern const Metric AttachedTable; extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
extern const Metric AttachedDatabase; extern const Metric AttachedDatabase;
extern const Metric PartsActive; extern const Metric PartsActive;
} }
@ -359,6 +361,8 @@ struct ContextSharedPart : boost::noncopyable
/// No lock required for format_schema_path modified only during initialization /// No lock required for format_schema_path modified only during initialization
std::atomic_size_t max_database_num_to_warn = 1000lu; std::atomic_size_t max_database_num_to_warn = 1000lu;
std::atomic_size_t max_table_num_to_warn = 5000lu; std::atomic_size_t max_table_num_to_warn = 5000lu;
std::atomic_size_t max_view_num_to_warn = 10000lu;
std::atomic_size_t max_dictionary_num_to_warn = 1000lu;
std::atomic_size_t max_part_num_to_warn = 100000lu; std::atomic_size_t max_part_num_to_warn = 100000lu;
String format_schema_path; /// Path to a directory that contains schema files used by input formats. String format_schema_path; /// Path to a directory that contains schema files used by input formats.
String google_protos_path; /// Path to a directory that contains the proto files for the well-known Protobuf types. String google_protos_path; /// Path to a directory that contains the proto files for the well-known Protobuf types.
@ -935,6 +939,10 @@ Strings Context::getWarnings() const
common_warnings = shared->warnings; common_warnings = shared->warnings;
if (CurrentMetrics::get(CurrentMetrics::AttachedTable) > static_cast<Int64>(shared->max_table_num_to_warn)) if (CurrentMetrics::get(CurrentMetrics::AttachedTable) > static_cast<Int64>(shared->max_table_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached tables is more than {}", shared->max_table_num_to_warn)); common_warnings.emplace_back(fmt::format("The number of attached tables is more than {}", shared->max_table_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedView) > static_cast<Int64>(shared->max_view_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached views is more than {}", shared->max_view_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedDictionary) > static_cast<Int64>(shared->max_dictionary_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached dictionaries is more than {}", shared->max_dictionary_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedDatabase) > static_cast<Int64>(shared->max_database_num_to_warn)) if (CurrentMetrics::get(CurrentMetrics::AttachedDatabase) > static_cast<Int64>(shared->max_database_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached databases is more than {}", shared->max_database_num_to_warn)); common_warnings.emplace_back(fmt::format("The number of attached databases is more than {}", shared->max_database_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::PartsActive) > static_cast<Int64>(shared->max_part_num_to_warn)) if (CurrentMetrics::get(CurrentMetrics::PartsActive) > static_cast<Int64>(shared->max_part_num_to_warn))
@ -3711,6 +3719,18 @@ void Context::setMaxTableNumToWarn(size_t max_table_to_warn)
shared->max_table_num_to_warn= max_table_to_warn; shared->max_table_num_to_warn= max_table_to_warn;
} }
void Context::setMaxViewNumToWarn(size_t max_view_to_warn)
{
SharedLockGuard lock(shared->mutex);
shared->max_view_num_to_warn= max_view_to_warn;
}
void Context::setMaxDictionaryNumToWarn(size_t max_dictionary_to_warn)
{
SharedLockGuard lock(shared->mutex);
shared->max_dictionary_num_to_warn= max_dictionary_to_warn;
}
void Context::setMaxDatabaseNumToWarn(size_t max_database_to_warn) void Context::setMaxDatabaseNumToWarn(size_t max_database_to_warn)
{ {
SharedLockGuard lock(shared->mutex); SharedLockGuard lock(shared->mutex);

View File

@ -861,6 +861,8 @@ public:
const HTTPHeaderFilter & getHTTPHeaderFilter() const; const HTTPHeaderFilter & getHTTPHeaderFilter() const;
void setMaxTableNumToWarn(size_t max_table_to_warn); void setMaxTableNumToWarn(size_t max_table_to_warn);
void setMaxViewNumToWarn(size_t max_view_to_warn);
void setMaxDictionaryNumToWarn(size_t max_dictionary_to_warn);
void setMaxDatabaseNumToWarn(size_t max_database_to_warn); void setMaxDatabaseNumToWarn(size_t max_database_to_warn);
void setMaxPartNumToWarn(size_t max_part_to_warn); void setMaxPartNumToWarn(size_t max_part_to_warn);
/// The port that the server listens for executing SQL queries. /// The port that the server listens for executing SQL queries.

View File

@ -135,7 +135,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// If the key is not found, skip the value. /// If the key is not found, skip the value.
NullOutput sink; NullOutput sink;
readEscapedStringInto(sink, *in); readEscapedStringInto<NullOutput,false>(sink, *in);
} }
else else
{ {

View File

@ -10,6 +10,8 @@
#include <Formats/verbosePrintString.h> #include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h> #include <Formats/EscapingRuleUtils.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h> #include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <boost/range/adaptor/map.hpp>
#include "Formats/FormatSettings.h"
namespace DB namespace DB
{ {
@ -28,7 +30,8 @@ static void checkForCarriageReturn(ReadBuffer & in)
throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row." throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format." "\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format." " You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r."); "\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r"
"\nor else enable setting 'input_format_tsv_crlf_end_of_line'");
} }
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat( TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(
@ -92,7 +95,12 @@ void TabSeparatedFormatReader::skipRowEndDelimiter()
if (buf->eof()) if (buf->eof())
return; return;
if (unlikely(first_row)) if (format_settings.tsv.crlf_end_of_line_input)
{
if (*buf->position() == '\r')
++buf->position();
}
else if (unlikely(first_row))
{ {
checkForCarriageReturn(*buf); checkForCarriageReturn(*buf);
first_row = false; first_row = false;
@ -105,14 +113,15 @@ template <bool read_string>
String TabSeparatedFormatReader::readFieldIntoString() String TabSeparatedFormatReader::readFieldIntoString()
{ {
String field; String field;
bool support_crlf = format_settings.tsv.crlf_end_of_line_input;
if (is_raw) if (is_raw)
readString(field, *buf); readString(field, *buf);
else else
{ {
if constexpr (read_string) if constexpr (read_string)
readEscapedString(field, *buf); support_crlf ? readEscapedStringCRLF(field, *buf) : readEscapedString(field, *buf);
else else
readTSVField(field, *buf); support_crlf ? readTSVFieldCRLF(field, *buf) : readTSVField(field, *buf);
} }
return field; return field;
} }
@ -123,7 +132,7 @@ void TabSeparatedFormatReader::skipField()
if (is_raw) if (is_raw)
readStringInto(out, *buf); readStringInto(out, *buf);
else else
readEscapedStringInto(out, *buf); format_settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<NullOutput,true>(out, *buf) : readEscapedStringInto<NullOutput,false>(out, *buf);
} }
void TabSeparatedFormatReader::skipHeaderRow() void TabSeparatedFormatReader::skipHeaderRow()
@ -155,7 +164,7 @@ bool TabSeparatedFormatReader::readField(IColumn & column, const DataTypePtr & t
const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/) const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/)
{ {
const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t'; const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n'); const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r'));
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end)) if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{ {
@ -220,7 +229,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
try try
{ {
assertChar('\n', *buf); if (!format_settings.tsv.crlf_end_of_line_input)
assertChar('\n', *buf);
else
assertChar('\r', *buf);
} }
catch (const DB::Exception &) catch (const DB::Exception &)
{ {
@ -233,7 +245,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
else if (*buf->position() == '\r') else if (*buf->position() == '\r')
{ {
out << "ERROR: Carriage return found where line feed is expected." out << "ERROR: Carriage return found where line feed is expected."
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n"; " It's like your file has DOS/Windows style line separators. \n"
"You must transform your file to Unix format. \n"
"But if you really need carriage return at end of string value of last column, you need to escape it as \\r \n"
"or else enable setting 'input_format_tsv_crlf_end_of_line'";
} }
else else
{ {
@ -348,7 +363,7 @@ void TabSeparatedFormatReader::skipRow()
bool TabSeparatedFormatReader::checkForEndOfRow() bool TabSeparatedFormatReader::checkForEndOfRow()
{ {
return buf->eof() || *buf->position() == '\n'; return buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r');
} }
TabSeparatedSchemaReader::TabSeparatedSchemaReader( TabSeparatedSchemaReader::TabSeparatedSchemaReader(

View File

@ -590,6 +590,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id); LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id);
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>(); zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
constexpr size_t MAX_RETRIES_ON_FAILED_MUTATION = 30;
size_t retries_on_failed_mutation = 0;
while (!partial_shutdown_called) while (!partial_shutdown_called)
{ {
/// Mutation maybe killed or whole replica was deleted. /// Mutation maybe killed or whole replica was deleted.
@ -637,18 +640,32 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
} }
} }
/// If mutation status is empty, than local replica may just not loaded it into memory.
if (mutation_status && !mutation_status->latest_fail_reason.empty())
{
LOG_DEBUG(log, "Mutation {} is done {} or failed {} (status: '{}')", mutation_id, mutation_status->is_done, !mutation_status->latest_fail_reason.empty(), mutation_status->latest_fail_reason);
break;
}
/// Replica can become inactive, so wait with timeout, if nothing happened -> recheck it /// Replica can become inactive, so wait with timeout, if nothing happened -> recheck it
if (!wait_event->tryWait(1000)) if (!wait_event->tryWait(1000))
{ {
LOG_TRACE(log, "Failed to wait for mutation '{}', will recheck", mutation_id); LOG_TRACE(log, "Failed to wait for mutation '{}', will recheck", mutation_id);
} }
/// If mutation status is empty, than local replica may just not loaded it into memory.
if (mutation_status && !mutation_status->latest_fail_reason.empty())
{
LOG_DEBUG(log, "Mutation {} is done {} or failed {} (status: '{}')", mutation_id, mutation_status->is_done, !mutation_status->latest_fail_reason.empty(), mutation_status->latest_fail_reason);
/// In some cases latest_fail_reason may be retryable and there's a chance it will be cleared after the next attempt
if (++retries_on_failed_mutation <= MAX_RETRIES_ON_FAILED_MUTATION)
continue;
if (mutation_status->is_done)
{
LOG_DEBUG(log, "Looks like mutation {} is done, rechecking", mutation_id);
continue;
}
/// It's still possible that latest_fail_reason will be cleared just before queue.getIncompleteMutationsStatus(...) below,
/// but it's unlikely. Anyway, rethrow the exception here to avoid exiting with is_done=false
checkMutationStatus(mutation_status, {mutation_id});
throw Exception(ErrorCodes::LOGICAL_ERROR, "checkMutationStatus didn't throw when checking status of {}: {}", mutation_id, mutation_status->latest_fail_reason);
}
} }
/// This replica inactive, don't check anything /// This replica inactive, don't check anything

View File

@ -5,6 +5,7 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <IO/WriteBufferFromFileBase.h> #include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
@ -53,8 +54,13 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED; extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE; extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED; extern const int NOT_IMPLEMENTED;
extern const int FAULT_INJECTED;
} }
namespace FailPoints
{
extern const char stripe_log_sink_write_fallpoint[];
}
/// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading, /// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading,
/// because we read ranges of data that do not change. /// because we read ranges of data that do not change.
@ -234,6 +240,11 @@ public:
/// Save the new indices. /// Save the new indices.
storage.saveIndices(lock); storage.saveIndices(lock);
// While executing save file sizes the exception might occurs. S3::TooManyRequests for example.
fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint,
{
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table");
});
/// Save the new file sizes. /// Save the new file sizes.
storage.saveFileSizes(lock); storage.saveFileSizes(lock);

30
src/Storages/Utils.cpp Normal file
View File

@ -0,0 +1,30 @@
#include <Storages/Utils.h>
#include <Storages/IStorage.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
}
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage)
{
if (storage->isView())
{
return CurrentMetrics::AttachedView;
}
else if (storage->isDictionary())
{
return CurrentMetrics::AttachedDictionary;
}
else
{
return CurrentMetrics::AttachedTable;
}
}
}

10
src/Storages/Utils.h Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage);
}

View File

@ -44,6 +44,7 @@ from env_helper import (
REPORT_PATH, REPORT_PATH,
S3_BUILDS_BUCKET, S3_BUILDS_BUCKET,
TEMP_PATH, TEMP_PATH,
GITHUB_RUN_ID,
) )
from get_robot_token import get_best_robot_token from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, Git from git_helper import GIT_PREFIX, Git
@ -52,6 +53,7 @@ from github_helper import GitHub
from pr_info import PRInfo from pr_info import PRInfo
from report import ERROR, SUCCESS, BuildResult, JobReport from report import ERROR, SUCCESS, BuildResult, JobReport
from s3_helper import S3Helper from s3_helper import S3Helper
from ci_metadata import CiMetadata
from version_helper import get_version_from_repo from version_helper import get_version_from_repo
# pylint: disable=too-many-lines # pylint: disable=too-many-lines
@ -66,12 +68,12 @@ class PendingState:
class CiCache: class CiCache:
""" """
CI cache is a bunch of records. Record is a file stored under special location on s3. CI cache is a bunch of records. Record is a file stored under special location on s3.
The file name has following format The file name has a format:
<RECORD_TYPE>_[<ATTRIBUTES>]--<JOB_NAME>_<JOB_DIGEST>_<BATCH>_<NUM_BATCHES>.ci <RECORD_TYPE>_[<ATTRIBUTES>]--<JOB_NAME>_<JOB_DIGEST>_<BATCH>_<NUM_BATCHES>.ci
RECORD_TYPE: RECORD_TYPE:
SUCCESSFUL - for successfuly finished jobs SUCCESSFUL - for successful jobs
PENDING - for pending jobs PENDING - for pending jobs
ATTRIBUTES: ATTRIBUTES:
@ -503,7 +505,7 @@ class CiCache:
self, job: str, batch: int, num_batches: int, release_branch: bool self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool: ) -> bool:
""" """
checks if a given job have already been done successfuly checks if a given job have already been done successfully
""" """
return self.exist( return self.exist(
self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch
@ -744,7 +746,7 @@ class CiOptions:
# list of specified jobs to run # list of specified jobs to run
ci_jobs: Optional[List[str]] = None ci_jobs: Optional[List[str]] = None
# btaches to run for all multi-batch jobs # batches to run for all multi-batch jobs
job_batches: Optional[List[int]] = None job_batches: Optional[List[int]] = None
do_not_test: bool = False do_not_test: bool = False
@ -948,7 +950,7 @@ class CiOptions:
jobs_params[job] = { jobs_params[job] = {
"batches": list(range(num_batches)), "batches": list(range(num_batches)),
"num_batches": num_batches, "num_batches": num_batches,
"run_if_ci_option_include_set": job_config.run_by_ci_option "run_by_ci_option": job_config.run_by_ci_option
and pr_info.is_pr, and pr_info.is_pr,
} }
@ -963,10 +965,7 @@ class CiOptions:
for job in jobs_to_do[:]: for job in jobs_to_do[:]:
job_param = jobs_params[job] job_param = jobs_params[job]
if ( if job_param["run_by_ci_option"] and job not in jobs_to_do_requested:
job_param["run_if_ci_option_include_set"]
and job not in jobs_to_do_requested
):
print( print(
f"Erasing job '{job}' from list because it's not in included set, but will run only by include" f"Erasing job '{job}' from list because it's not in included set, but will run only by include"
) )
@ -991,7 +990,11 @@ def normalize_check_name(check_name: str) -> str:
def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace: def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
# FIXME: consider switching to sub_parser for configure, pre, run, post actions parser.add_argument(
"--cancel-previous-run",
action="store_true",
help="Action that cancels previous running PR workflow if PR added into the Merge Queue",
)
parser.add_argument( parser.add_argument(
"--configure", "--configure",
action="store_true", action="store_true",
@ -1000,17 +1003,19 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--update-gh-statuses", "--update-gh-statuses",
action="store_true", action="store_true",
help="Action that recreate success GH statuses for jobs that finished successfully in past and will be skipped this time", help="Action that recreate success GH statuses for jobs that finished successfully in past and will be "
"skipped this time",
) )
parser.add_argument( parser.add_argument(
"--pre", "--pre",
action="store_true", action="store_true",
help="Action that executes prerequesetes for the job provided in --job-name", help="Action that executes prerequisites for the job provided in --job-name",
) )
parser.add_argument( parser.add_argument(
"--run", "--run",
action="store_true", action="store_true",
help="Action that executes run action for specified --job-name. run_command must be configured for a given job name.", help="Action that executes run action for specified --job-name. run_command must be configured for a given "
"job name.",
) )
parser.add_argument( parser.add_argument(
"--post", "--post",
@ -1075,7 +1080,7 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--skip-jobs", "--skip-jobs",
action="store_true", action="store_true",
default=False, default=False,
help="skip fetching data about job runs, used in --configure action (for debugging and nigthly ci)", help="skip fetching data about job runs, used in --configure action (for debugging and nightly ci)",
) )
parser.add_argument( parser.add_argument(
"--force", "--force",
@ -1088,7 +1093,8 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--rebuild-all-binaries", "--rebuild-all-binaries",
action="store_true", action="store_true",
default=False, default=False,
help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in any case, used in --configure action (for release branches)", help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in "
"any case, used in --configure action (for release branches)",
) )
parser.add_argument( parser.add_argument(
"--commit-message", "--commit-message",
@ -1293,7 +1299,7 @@ def _configure_docker_jobs(docker_digest_or_latest: bool) -> Dict:
missing_amd64 = [] missing_amd64 = []
missing_aarch64 = [] missing_aarch64 = []
if not docker_digest_or_latest: if not docker_digest_or_latest:
# look for missing arm and amd images only among missing multiarch manifests @missing_multi_dict # look for missing arm and amd images only among missing multi-arch manifests @missing_multi_dict
# to avoid extra dockerhub api calls # to avoid extra dockerhub api calls
missing_amd64 = list( missing_amd64 = list(
check_missing_images_on_dockerhub(missing_multi_dict, "amd64") check_missing_images_on_dockerhub(missing_multi_dict, "amd64")
@ -1391,7 +1397,7 @@ def _configure_jobs(
): ):
continue continue
# fill job randomization buckets (for jobs with configured @random_bucket property)) # fill job randomization buckets (for jobs with configured @random_bucket property)
if job_config.random_bucket: if job_config.random_bucket:
if not job_config.random_bucket in randomization_buckets: if not job_config.random_bucket in randomization_buckets:
randomization_buckets[job_config.random_bucket] = set() randomization_buckets[job_config.random_bucket] = set()
@ -1440,8 +1446,7 @@ def _configure_jobs(
jobs_params[job] = { jobs_params[job] = {
"batches": batches_to_do, "batches": batches_to_do,
"num_batches": num_batches, "num_batches": num_batches,
"run_if_ci_option_include_set": job_config.run_by_ci_option "run_by_ci_option": job_config.run_by_ci_option and pr_info.is_pr,
and pr_info.is_pr,
} }
elif add_to_skip: elif add_to_skip:
# treat job as being skipped only if it's controlled by digest # treat job as being skipped only if it's controlled by digest
@ -1485,8 +1490,8 @@ def _configure_jobs(
def _generate_ci_stage_config(jobs_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: def _generate_ci_stage_config(jobs_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
""" """
populates GH Actions' workflow with real jobs populates GH Actions' workflow with real jobs
"Builds_1": [{"job_name": NAME, "runner_type": RUNER_TYPE}] "Builds_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}]
"Tests_1": [{"job_name": NAME, "runner_type": RUNER_TYPE}] "Tests_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}]
... ...
""" """
result = {} # type: Dict[str, Any] result = {} # type: Dict[str, Any]
@ -1577,7 +1582,7 @@ def _fetch_commit_tokens(message: str, pr_info: PRInfo) -> List[str]:
for match in matches for match in matches
if match in CILabels or match.startswith("job_") or match.startswith("batch_") if match in CILabels or match.startswith("job_") or match.startswith("batch_")
] ]
print(f"CI modifyers from commit message: [{res}]") print(f"CI modifiers from commit message: [{res}]")
res_2 = [] res_2 = []
if pr_info.is_pr: if pr_info.is_pr:
matches = [match[-1] for match in re.findall(pattern, pr_info.body)] matches = [match[-1] for match in re.findall(pattern, pr_info.body)]
@ -1588,7 +1593,7 @@ def _fetch_commit_tokens(message: str, pr_info: PRInfo) -> List[str]:
or match.startswith("job_") or match.startswith("job_")
or match.startswith("batch_") or match.startswith("batch_")
] ]
print(f"CI modifyers from PR body: [{res_2}]") print(f"CI modifiers from PR body: [{res_2}]")
return list(set(res + res_2)) return list(set(res + res_2))
@ -1654,7 +1659,7 @@ def _upload_build_artifacts(
report_url = ci_cache.upload_build_report(build_result) report_url = ci_cache.upload_build_report(build_result)
print(f"Report file has been uploaded to [{report_url}]") print(f"Report file has been uploaded to [{report_url}]")
# Upload head master binaries # Upload master head's binaries
static_bin_name = CI_CONFIG.build_config[build_name].static_binary_name static_bin_name = CI_CONFIG.build_config[build_name].static_binary_name
if pr_info.is_master and static_bin_name: if pr_info.is_master and static_bin_name:
# Full binary with debug info: # Full binary with debug info:
@ -1902,6 +1907,15 @@ def _get_ext_check_name(check_name: str) -> str:
return check_name_with_group return check_name_with_group
def _cancel_pr_wf(s3: S3Helper, pr_number: int) -> None:
run_id = CiMetadata(s3, pr_number).fetch_meta().run_id
if not run_id:
print(f"ERROR: FIX IT: Run id has not been found PR [{pr_number}]!")
else:
print(f"Canceling PR workflow run_id: [{run_id}], pr: [{pr_number}]")
GitHub.cancel_wf(run_id)
def main() -> int: def main() -> int:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
exit_code = 0 exit_code = 0
@ -1930,6 +1944,12 @@ def main() -> int:
### CONFIGURE action: start ### CONFIGURE action: start
if args.configure: if args.configure:
if CI and pr_info.is_pr:
# store meta on s3 (now we need it only for PRs)
meta = CiMetadata(s3, pr_info.number)
meta.run_id = int(GITHUB_RUN_ID)
meta.push_meta()
ci_options = CiOptions.create_from_pr_message( ci_options = CiOptions.create_from_pr_message(
args.commit_message or None, update_from_api=True args.commit_message or None, update_from_api=True
) )
@ -2222,6 +2242,13 @@ def main() -> int:
assert indata, "Run config must be provided via --infile" assert indata, "Run config must be provided via --infile"
_update_gh_statuses_action(indata=indata, s3=s3) _update_gh_statuses_action(indata=indata, s3=s3)
### CANCEL PREVIOUS WORKFLOW RUN
elif args.cancel_previous_run:
assert (
pr_info.is_merge_queue
), "Currently it's supposed to be used in MQ wf to cancel running PR wf if any"
_cancel_pr_wf(s3, pr_info.merged_pr)
### print results ### print results
_print_results(result, args.outfile, args.pretty) _print_results(result, args.outfile, args.pretty)

View File

@ -52,9 +52,9 @@ class CILabels(metaclass=WithIter):
CI_SET_ARM = "ci_set_arm" CI_SET_ARM = "ci_set_arm"
CI_SET_INTEGRATION = "ci_set_integration" CI_SET_INTEGRATION = "ci_set_integration"
CI_SET_OLD_ANALYZER = "ci_set_old_analyzer" CI_SET_OLD_ANALYZER = "ci_set_old_analyzer"
CI_SET_STATLESS = "ci_set_stateless" CI_SET_STATELESS = "ci_set_stateless"
CI_SET_STATEFUL = "ci_set_stateful" CI_SET_STATEFUL = "ci_set_stateful"
CI_SET_STATLESS_ASAN = "ci_set_stateless_asan" CI_SET_STATELESS_ASAN = "ci_set_stateless_asan"
CI_SET_STATEFUL_ASAN = "ci_set_stateful_asan" CI_SET_STATEFUL_ASAN = "ci_set_stateful_asan"
libFuzzer = "libFuzzer" libFuzzer = "libFuzzer"
@ -206,7 +206,7 @@ class DigestConfig:
include_paths: List[Union[str, Path]] = field(default_factory=list) include_paths: List[Union[str, Path]] = field(default_factory=list)
# file suffixes to exclude from digest # file suffixes to exclude from digest
exclude_files: List[str] = field(default_factory=list) exclude_files: List[str] = field(default_factory=list)
# directories to exlude from digest # directories to exclude from digest
exclude_dirs: List[Union[str, Path]] = field(default_factory=list) exclude_dirs: List[Union[str, Path]] = field(default_factory=list)
# docker names to include into digest # docker names to include into digest
docker: List[str] = field(default_factory=list) docker: List[str] = field(default_factory=list)
@ -217,7 +217,7 @@ class DigestConfig:
@dataclass @dataclass
class LabelConfig: class LabelConfig:
""" """
configures different CI scenarious per GH label configures different CI scenarios per GH label
""" """
run_jobs: Iterable[str] = frozenset() run_jobs: Iterable[str] = frozenset()
@ -231,7 +231,7 @@ class JobConfig:
# configures digest calculation for the job # configures digest calculation for the job
digest: DigestConfig = field(default_factory=DigestConfig) digest: DigestConfig = field(default_factory=DigestConfig)
# will be triggered for the job if omited in CI workflow yml # will be triggered for the job if omitted in CI workflow yml
run_command: str = "" run_command: str = ""
# job timeout, seconds # job timeout, seconds
timeout: Optional[int] = None timeout: Optional[int] = None
@ -242,7 +242,7 @@ class JobConfig:
# to run always regardless of the job digest or/and label # to run always regardless of the job digest or/and label
run_always: bool = False run_always: bool = False
# if the job needs to be run on the release branch, including master (e.g. building packages, docker server). # if the job needs to be run on the release branch, including master (e.g. building packages, docker server).
# NOTE: Subsequent runs on the same branch with the similar digest are still considered skippable. # NOTE: Subsequent runs on the same branch with the similar digest are still considered skip-able.
required_on_release_branch: bool = False required_on_release_branch: bool = False
# job is for pr workflow only # job is for pr workflow only
pr_only: bool = False pr_only: bool = False
@ -470,7 +470,7 @@ compatibility_test_common_params = {
"digest": compatibility_check_digest, "digest": compatibility_check_digest,
"run_command": "compatibility_check.py", "run_command": "compatibility_check.py",
} }
statless_test_common_params = { stateless_test_common_params = {
"digest": stateless_check_digest, "digest": stateless_check_digest,
"run_command": 'functional_test_check.py "$CHECK_NAME" $KILL_TIMEOUT', "run_command": 'functional_test_check.py "$CHECK_NAME" $KILL_TIMEOUT',
"timeout": 10800, "timeout": 10800,
@ -665,7 +665,7 @@ class CIConfig:
# crosscompile - no arm required # crosscompile - no arm required
pass pass
else: else:
# switch to aarch64 runnner # switch to aarch64 runner
result += "-aarch64" result += "-aarch64"
return result return result
@ -712,7 +712,7 @@ class CIConfig:
break break
assert ( assert (
res res
), f"Error: Experimantal feature... Invlid request or not supported job [{check_name}]" ), f"Error: Experimental feature... Invalid request or not supported job [{check_name}]"
return res return res
def get_digest_config(self, check_name: str) -> DigestConfig: def get_digest_config(self, check_name: str) -> DigestConfig:
@ -815,16 +815,16 @@ class CIConfig:
f"The following names of the build report '{build_report_name}' " f"The following names of the build report '{build_report_name}' "
f"are missed in build_config: {missed_names}", f"are missed in build_config: {missed_names}",
) )
# And finally, all of tests' requirements must be in the builds # And finally, all tests' requirements must be in the builds
for test_name, test_config in self.test_configs.items(): for test_name, test_config in self.test_configs.items():
if test_config.required_build not in self.build_config.keys(): if test_config.required_build not in self.build_config.keys():
logging.error( logging.error(
"The requierment '%s' for '%s' is not found in builds", "The requirement '%s' for '%s' is not found in builds",
test_config, test_config,
test_name, test_name,
) )
errors.append( errors.append(
f"The requierment '{test_config}' for " f"The requirement '{test_config}' for "
f"'{test_name}' is not found in builds" f"'{test_name}' is not found in builds"
) )
@ -865,7 +865,7 @@ CI_CONFIG = CIConfig(
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER, JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER,
] ]
), ),
CILabels.CI_SET_STATLESS: LabelConfig( CILabels.CI_SET_STATELESS: LabelConfig(
run_jobs=[ run_jobs=[
JobNames.STYLE_CHECK, JobNames.STYLE_CHECK,
JobNames.FAST_TEST, JobNames.FAST_TEST,
@ -873,7 +873,7 @@ CI_CONFIG = CIConfig(
JobNames.STATELESS_TEST_RELEASE, JobNames.STATELESS_TEST_RELEASE,
] ]
), ),
CILabels.CI_SET_STATLESS_ASAN: LabelConfig( CILabels.CI_SET_STATELESS_ASAN: LabelConfig(
run_jobs=[ run_jobs=[
JobNames.STYLE_CHECK, JobNames.STYLE_CHECK,
JobNames.FAST_TEST, JobNames.FAST_TEST,
@ -1180,49 +1180,49 @@ CI_CONFIG = CIConfig(
# End stateful tests for parallel replicas # End stateful tests for parallel replicas
JobNames.STATELESS_TEST_ASAN: TestConfig( JobNames.STATELESS_TEST_ASAN: TestConfig(
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=4, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_TSAN: TestConfig( JobNames.STATELESS_TEST_TSAN: TestConfig(
Build.PACKAGE_TSAN, Build.PACKAGE_TSAN,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_MSAN: TestConfig( JobNames.STATELESS_TEST_MSAN: TestConfig(
Build.PACKAGE_MSAN, Build.PACKAGE_MSAN,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_UBSAN: TestConfig( JobNames.STATELESS_TEST_UBSAN: TestConfig(
Build.PACKAGE_UBSAN, Build.PACKAGE_UBSAN,
job_config=JobConfig(num_batches=2, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=2, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_DEBUG: TestConfig( JobNames.STATELESS_TEST_DEBUG: TestConfig(
Build.PACKAGE_DEBUG, Build.PACKAGE_DEBUG,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_RELEASE: TestConfig( JobNames.STATELESS_TEST_RELEASE: TestConfig(
Build.PACKAGE_RELEASE, job_config=JobConfig(**statless_test_common_params) # type: ignore Build.PACKAGE_RELEASE, job_config=JobConfig(**stateless_test_common_params) # type: ignore
), ),
JobNames.STATELESS_TEST_RELEASE_COVERAGE: TestConfig( JobNames.STATELESS_TEST_RELEASE_COVERAGE: TestConfig(
Build.PACKAGE_RELEASE_COVERAGE, Build.PACKAGE_RELEASE_COVERAGE,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_AARCH64: TestConfig( JobNames.STATELESS_TEST_AARCH64: TestConfig(
Build.PACKAGE_AARCH64, job_config=JobConfig(**statless_test_common_params) # type: ignore Build.PACKAGE_AARCH64, job_config=JobConfig(**stateless_test_common_params) # type: ignore
), ),
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: TestConfig( JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: TestConfig(
Build.PACKAGE_RELEASE, Build.PACKAGE_RELEASE,
job_config=JobConfig(num_batches=4, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=4, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_S3_DEBUG: TestConfig( JobNames.STATELESS_TEST_S3_DEBUG: TestConfig(
Build.PACKAGE_DEBUG, Build.PACKAGE_DEBUG,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
), ),
JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig( JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig(
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **statless_test_common_params, release_only=True, run_by_ci_option=True), # type: ignore job_config=JobConfig(num_batches=4, **stateless_test_common_params, release_only=True, run_by_ci_option=True), # type: ignore
), ),
JobNames.STATELESS_TEST_S3_TSAN: TestConfig( JobNames.STATELESS_TEST_S3_TSAN: TestConfig(
Build.PACKAGE_TSAN, Build.PACKAGE_TSAN,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
), ),
JobNames.STRESS_TEST_DEBUG: TestConfig( JobNames.STRESS_TEST_DEBUG: TestConfig(
Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore
@ -1271,8 +1271,7 @@ CI_CONFIG = CIConfig(
), ),
JobNames.INTEGRATION_TEST_ARM: TestConfig( JobNames.INTEGRATION_TEST_ARM: TestConfig(
Build.PACKAGE_AARCH64, Build.PACKAGE_AARCH64,
# add [run_by_label="test arm"] to not run in regular pr workflow by default job_config=JobConfig(num_batches=6, **integration_test_common_params), # type: ignore
job_config=JobConfig(num_batches=6, **integration_test_common_params, run_by_label="test arm"), # type: ignore
), ),
JobNames.INTEGRATION_TEST: TestConfig( JobNames.INTEGRATION_TEST: TestConfig(
Build.PACKAGE_RELEASE, Build.PACKAGE_RELEASE,
@ -1326,7 +1325,7 @@ CI_CONFIG = CIConfig(
JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig( JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig(
# replace to non-default # replace to non-default
Build.PACKAGE_ASAN, Build.PACKAGE_ASAN,
job_config=JobConfig(pr_only=True, **{**statless_test_common_params, "timeout": 3600}), # type: ignore job_config=JobConfig(pr_only=True, **{**stateless_test_common_params, "timeout": 3600}), # type: ignore
), ),
JobNames.JEPSEN_KEEPER: TestConfig( JobNames.JEPSEN_KEEPER: TestConfig(
Build.BINARY_RELEASE, Build.BINARY_RELEASE,
@ -1486,7 +1485,7 @@ CHECK_DESCRIPTIONS = [
"Checks if new added or modified tests are flaky by running them repeatedly, " "Checks if new added or modified tests are flaky by running them repeatedly, "
"in parallel, with more randomization. Functional tests are run 100 times " "in parallel, with more randomization. Functional tests are run 100 times "
"with address sanitizer, and additional randomization of thread scheduling. " "with address sanitizer, and additional randomization of thread scheduling. "
"Integrational tests are run up to 10 times. If at least once a new test has " "Integration tests are run up to 10 times. If at least once a new test has "
"failed, or was too long, this check will be red. We don't allow flaky tests, " "failed, or was too long, this check will be red. We don't allow flaky tests, "
'read <a href="https://clickhouse.com/blog/decorating-a-christmas-tree-with-' 'read <a href="https://clickhouse.com/blog/decorating-a-christmas-tree-with-'
'the-help-of-flaky-tests/">the doc</a>', 'the-help-of-flaky-tests/">the doc</a>',
@ -1576,7 +1575,7 @@ CHECK_DESCRIPTIONS = [
lambda x: x.startswith("ClickBench"), lambda x: x.startswith("ClickBench"),
), ),
CheckDescription( CheckDescription(
"Falback for unknown", "Fallback for unknown",
"There's no description for the check yet, please add it to " "There's no description for the check yet, please add it to "
"tests/ci/ci_config.py:CHECK_DESCRIPTIONS", "tests/ci/ci_config.py:CHECK_DESCRIPTIONS",
lambda x: True, lambda x: True,

116
tests/ci/ci_metadata.py Normal file
View File

@ -0,0 +1,116 @@
from pathlib import Path
from typing import Optional
from env_helper import (
S3_BUILDS_BUCKET,
TEMP_PATH,
)
from s3_helper import S3Helper
from ci_utils import GHActions
# pylint: disable=too-many-lines
class CiMetadata:
"""
CI Metadata class owns data like workflow run_id for a given pr, etc.
Goal is to have everything we need to manage workflows on S3 and rely on GH api as little as possible
"""
_S3_PREFIX = "CI_meta_v1"
_LOCAL_PATH = Path(TEMP_PATH) / "ci_meta"
_FILE_SUFFIX = ".cimd"
_FILENAME_RUN_ID = "run_id" + _FILE_SUFFIX
def __init__(
self,
s3: S3Helper,
pr_number: Optional[int] = None,
sha: Optional[str] = None,
git_ref: Optional[str] = None,
):
assert pr_number or (sha and git_ref)
self.sha = sha
self.pr_number = pr_number
self.git_ref = git_ref
self.s3 = s3
self.run_id = 0
if self.pr_number:
self.s3_path = f"{self._S3_PREFIX}/PRs/{self.pr_number}/"
else:
self.s3_path = f"{self._S3_PREFIX}/{self.git_ref}/{self.sha}/"
self._updated = False
if not self._LOCAL_PATH.exists():
self._LOCAL_PATH.mkdir(parents=True, exist_ok=True)
def fetch_meta(self):
"""
Fetches meta from s3
"""
# clean up
for file in self._LOCAL_PATH.glob("*" + self._FILE_SUFFIX):
file.unlink()
_ = self.s3.download_files(
bucket=S3_BUILDS_BUCKET,
s3_path=self.s3_path,
file_suffix=self._FILE_SUFFIX,
local_directory=self._LOCAL_PATH,
)
meta_files = Path(self._LOCAL_PATH).rglob("*" + self._FILE_SUFFIX)
for file_name in meta_files:
path_in_str = str(file_name)
with open(path_in_str, "r", encoding="utf-8") as f:
# Read all lines in the file
lines = f.readlines()
assert len(lines) == 1
if file_name.name == self._FILENAME_RUN_ID:
self.run_id = int(lines[0])
self._updated = True
return self
def push_meta(
self,
) -> None:
"""
Uploads meta on s3
"""
assert self.run_id
GHActions.print_in_group(
f"Storing workflow metadata: PR [{self.pr_number}]",
[f"run_id: {self.run_id}"],
)
local_file = self._LOCAL_PATH / self._FILENAME_RUN_ID
with open(local_file, "w", encoding="utf-8") as file:
file.write(f"{self.run_id}\n")
_ = self.s3.upload_file(
bucket=S3_BUILDS_BUCKET,
file_path=local_file,
s3_path=self.s3_path + local_file.name,
)
if __name__ == "__main__":
# TEST:
s3 = S3Helper()
a = CiMetadata(s3, 12345, "deadbeaf", "test_branch")
a.run_id = 111
a.push_meta()
b = CiMetadata(s3, 12345, "deadbeaf", "test_branch")
assert b.fetch_meta().run_id == a.run_id
a = CiMetadata(s3, 0, "deadbeaf", "test_branch")
a.run_id = 112
a.push_meta()
b = CiMetadata(s3, 0, "deadbeaf", "test_branch")
assert b.fetch_meta().run_id == a.run_id

View File

@ -9,6 +9,7 @@ from time import sleep
from typing import List, Optional, Tuple, Union from typing import List, Optional, Tuple, Union
import github import github
import requests
# explicit reimport # explicit reimport
# pylint: disable=useless-import-alias # pylint: disable=useless-import-alias
@ -21,6 +22,9 @@ from github.NamedUser import NamedUser as NamedUser
from github.PullRequest import PullRequest as PullRequest from github.PullRequest import PullRequest as PullRequest
from github.Repository import Repository as Repository from github.Repository import Repository as Repository
from env_helper import GITHUB_REPOSITORY
from get_robot_token import get_best_robot_token
# pylint: enable=useless-import-alias # pylint: enable=useless-import-alias
CACHE_PATH = p.join(p.dirname(p.realpath(__file__)), "gh_cache") CACHE_PATH = p.join(p.dirname(p.realpath(__file__)), "gh_cache")
@ -260,3 +264,18 @@ class GitHub(github.Github):
def retries(self, value: int) -> None: def retries(self, value: int) -> None:
assert isinstance(value, int) assert isinstance(value, int)
self._retries = value self._retries = value
# minimalistic static methods not using pygithub
@staticmethod
def cancel_wf(run_id, strict=False):
token = get_best_robot_token()
headers = {"Authorization": f"token {token}"}
url = f"https://api.github.com/repos/{GITHUB_REPOSITORY}/actions/runs/{run_id}/cancel"
try:
response = requests.post(url, headers=headers, timeout=10)
response.raise_for_status()
print(f"NOTE: Workflow [{run_id}] has been cancelled")
except Exception as ex:
print("ERROR: Got exception executing wf cancel request", ex)
if strict:
raise ex

View File

@ -161,7 +161,7 @@ class TestCIOptions(unittest.TestCase):
"Stateless tests (azure, asan)": { "Stateless tests (azure, asan)": {
"batches": list(range(3)), "batches": list(range(3)),
"num_batches": 3, "num_batches": 3,
"run_if_ci_option_include_set": True, "run_by_ci_option": True,
} }
} }
jobs_to_do, jobs_to_skip, job_params = ci_options.apply( jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
@ -226,10 +226,10 @@ class TestCIOptions(unittest.TestCase):
job_params[job] = { job_params[job] = {
"batches": list(range(3)), "batches": list(range(3)),
"num_batches": 3, "num_batches": 3,
"run_if_ci_option_include_set": "azure" in job, "run_by_ci_option": "azure" in job,
} }
else: else:
job_params[job] = {"run_if_ci_option_include_set": False} job_params[job] = {"run_by_ci_option": False}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply( jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
jobs_to_do, jobs_to_skip, job_params, PRInfo() jobs_to_do, jobs_to_skip, job_params, PRInfo()

View File

@ -1,5 +1,7 @@
<clickhouse> <clickhouse>
<max_table_num_to_warn>5</max_table_num_to_warn> <max_table_num_to_warn>5</max_table_num_to_warn>
<max_view_num_to_warn>5</max_view_num_to_warn>
<max_dictionary_num_to_warn>5</max_dictionary_num_to_warn>
<max_database_num_to_warn>2</max_database_num_to_warn> <max_database_num_to_warn>2</max_database_num_to_warn>
<max_part_num_to_warn>10</max_part_num_to_warn> <max_part_num_to_warn>10</max_part_num_to_warn>
</clickhouse> </clickhouse>

View File

@ -0,0 +1,88 @@
import logging
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage_policy.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
def list_of_files_on_ch_disk(node, disk, path):
disk_path = node.query(
f"SELECT path FROM system.disks WHERE name='{disk}'"
).splitlines()[0]
return node.exec_in_container(
["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root"
)
@pytest.mark.parametrize(
"engine",
[
pytest.param("Log"),
],
)
@pytest.mark.parametrize(
"disk,check_s3",
[
pytest.param("default", False),
pytest.param("s3", True),
],
)
@pytest.mark.parametrize(
"delay",
[
pytest.param(0),
pytest.param(4),
],
)
def test_drop_table(cluster, engine, disk, check_s3, delay):
node = cluster.instances["node"]
node.query("DROP DATABASE IF EXISTS lazy")
node.query("CREATE DATABASE lazy ENGINE=Lazy(2)")
node.query(
"CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format(
engine,
disk,
)
)
node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)")
assert node.query("SELECT count(*) FROM lazy.table") == "10\n"
if delay:
time.sleep(delay)
node.query("DROP TABLE lazy.table SYNC")
if check_s3:
# There mustn't be any orphaned data
assert_objects_count(cluster, 0)
# Local data must be removed
assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == ""

View File

@ -0,0 +1,34 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<s3_no_retries>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>1</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>
</disks>
<policies>
<s3_no_retries>
<volumes>
<main>
<disk>s3_no_retries</disk>
</main>
</volumes>
</s3_no_retries>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -11,7 +11,7 @@ def cluster():
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
cluster.add_instance( cluster.add_instance(
"node", "node",
main_configs=["configs/minio.xml", "configs/ssl.xml"], main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"],
with_minio=True, with_minio=True,
) )
logging.info("Starting cluster...") logging.info("Starting cluster...")
@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i
assert_objects_count(cluster, 0) assert_objects_count(cluster, 0)
finally: finally:
node.query("DROP TABLE s3_test") node.query("DROP TABLE s3_test")
# Imitate case when error occurs while inserting into table.
# For examle S3::TooManyRequests.
# In that case we can update data file, but not the size file.
# So due to exception we should do truncate of the data file to undo the insert query.
# See FileChecker::repair().
def test_stripe_log_truncate(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE stripe_table (
a int
) ENGINE = StripeLog()
SETTINGS storage_policy='s3_no_retries'
"""
)
node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query(
"""
INSERT INTO stripe_table SELECT number FROM numbers(10)
""",
ignore_error=True,
)
node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query("SELECT count(*) FROM stripe_table") == "0\n"
node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)")
node.query("SELECT count(*) FROM stripe_table") == "10\n"
# Make sure that everything is okey with the table after restart.
node.query("DETACH TABLE stripe_table")
node.query("ATTACH TABLE stripe_table")
assert node.query("DROP TABLE stripe_table") == ""

View File

@ -19,4 +19,4 @@
<shard>01</shard> <shard>01</shard>
</macros> </macros>
</clickhouse> </clickhouse>

View File

@ -15,6 +15,6 @@
<shard>01</shard> <shard>01</shard>
</macros> </macros>
<default_replica_path>/clickhouse/'/{database}/{table}/{uuid}</default_replica_path> <default_replica_path>/lol/kek/'/{uuid}</default_replica_path>
</clickhouse> </clickhouse>

View File

@ -6,7 +6,7 @@ cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance( ch1 = cluster.add_instance(
"ch1", "ch1",
main_configs=[ main_configs=[
"configs/config.d/clusters_zk_path.xml", "configs/config.d/clusters_unusual.xml",
"configs/config.d/distributed_ddl.xml", "configs/config.d/distributed_ddl.xml",
], ],
with_zookeeper=True, with_zookeeper=True,
@ -63,7 +63,7 @@ def check_tables():
) )
.strip() .strip()
.startswith( .startswith(
"ReplicatedReplacingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', D)" "ReplicatedReplacingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', D)"
) )
) )
assert ( assert (
@ -73,7 +73,7 @@ def check_tables():
) )
.strip() .strip()
.startswith( .startswith(
"ReplicatedVersionedCollapsingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', Sign, Version)" "ReplicatedVersionedCollapsingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', Sign, Version)"
) )
) )

View File

@ -1,69 +0,0 @@
import pytest
from test_modify_engine_on_restart.common import (
get_table_path,
set_convert_flags,
)
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "modify_engine_zk_path"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=database_name, sql=query)
def test_modify_engine_fails_if_zk_path_exists(started_cluster):
ch1.query("CREATE DATABASE " + database_name)
q(
ch1,
"CREATE TABLE already_exists_1 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;",
)
uuid = q(
ch1,
f"SELECT uuid FROM system.tables WHERE table = 'already_exists_1' and database = '{database_name}'",
).strip("'[]\n")
q(
ch1,
f"CREATE TABLE already_exists_2 ( A Int64, D Date, S String ) ENGINE ReplicatedMergeTree('/clickhouse/\\'/{database_name}/already_exists_1/{uuid}', 'r2') PARTITION BY toYYYYMM(D) ORDER BY A;",
)
set_convert_flags(ch1, database_name, ["already_exists_1"])
table_data_path = get_table_path(ch1, "already_exists_1", database_name)
ch1.stop_clickhouse()
ch1.start_clickhouse(retry_start=False, expected_to_fail=True)
# Check if we can cancel convertation
ch1.exec_in_container(
[
"bash",
"-c",
f"rm {table_data_path}convert_to_replicated",
]
)
ch1.start_clickhouse()

View File

@ -5,7 +5,7 @@ import time
import pytest import pytest
import logging import logging
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException from helpers.client import QueryRuntimeException
@ -18,6 +18,10 @@ from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsReques
from kafka.protocol.group import MemberAssignment from kafka.protocol.group import MemberAssignment
import socket import socket
if is_arm():
# skip due to no arm support for clickhouse/kerberos-kdc docker image
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance( instance = cluster.add_instance(
"instance", "instance",

View File

@ -61,6 +61,11 @@ CREATE TABLE github_events
) )
ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at); ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);
with top_repos as ( select repo_name from github_events where event_type = 'WatchEvent' and toDate(created_at) = today() - 1 group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toMonday(created_at) = toMonday(today() - interval 1 week) group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toYear(created_at) = toYear(today()) - 1 group by repo_name order by count() desc limit 100 ), last_day as ( select repo_name, count() as count_last_day, rowNumberInAllBlocks() + 1 as position_last_day from github_events where repo_name in (select repo_name from top_repos) and toDate(created_at) = today() - 1 group by repo_name order by count_last_day desc ), last_week as ( select repo_name, count() as count_last_week, rowNumberInAllBlocks() + 1 as position_last_week from github_events where repo_name in (select repo_name from top_repos) and toMonday(created_at) = toMonday(today()) - interval 1 week group by repo_name order by count_last_week desc ), last_month as ( select repo_name, count() as count_last_month, rowNumberInAllBlocks() + 1 as position_last_month from github_events where repo_name in (select repo_name from top_repos) and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count_last_month desc ) select d.repo_name, columns(count) from last_day d join last_week w on d.repo_name = w.repo_name join last_month m on d.repo_name = m.repo_name; with
top_repos as ( select repo_name from github_events where event_type = 'WatchEvent' and toDate(created_at) = today() - 1 group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toMonday(created_at) = toMonday(today() - interval 1 week) group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toYear(created_at) = toYear(today()) - 1 group by repo_name order by count() desc limit 100 ),
last_day as ( select repo_name, count() as count_last_day, rowNumberInAllBlocks() + 1 as position_last_day from github_events where repo_name in (select repo_name from top_repos) and toDate(created_at) = today() - 1 group by repo_name order by count_last_day desc ),
last_week as ( select repo_name, count() as count_last_week, rowNumberInAllBlocks() + 1 as position_last_week from github_events where repo_name in (select repo_name from top_repos) and toMonday(created_at) = toMonday(today()) - interval 1 week group by repo_name order by count_last_week desc ),
last_month as ( select repo_name, count() as count_last_month, rowNumberInAllBlocks() + 1 as position_last_month from github_events where repo_name in (select repo_name from top_repos) and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count_last_month desc )
select d.repo_name, columns('count') from last_day d join last_week w on d.repo_name = w.repo_name join last_month m on d.repo_name = m.repo_name;
DROP TABLE github_events; DROP TABLE github_events;

View File

@ -236,3 +236,6 @@ Check asan bug
0 0
Check bug found fuzzing Check bug found fuzzing
9042C6691B1A75F0EA3314B6F55728BB 9042C6691B1A75F0EA3314B6F55728BB
Check bug 2 found fuzzing
608E1FF030C9E206185B112C2A25F1A7
ABB65AE97711A2E053E324ED88B1D08B

View File

@ -338,3 +338,10 @@ SELECT sipHash128((toUInt64(9223372036854775806), 1)) = sipHash128(1) GROUP BY s
SELECT 'Check bug found fuzzing'; SELECT 'Check bug found fuzzing';
SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED } SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED }
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1]))); SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1])));
SELECT 'Check bug 2 found fuzzing';
DROP TABLE IF EXISTS sipHashKeyed_keys;
CREATE TABLE sipHashKeyed_keys (`a` Map(String, String)) ENGINE = Memory;
INSERT INTO sipHashKeyed_keys FORMAT VALUES ({'a':'b', 'c':'d'}), ({'e':'f', 'g':'h'});
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, materialize(0::UInt64)), a)) FROM sipHashKeyed_keys ORDER BY a;
DROP TABLE sipHashKeyed_keys;

View File

@ -1,3 +1,5 @@
The number of attached tables is more than 5 The number of attached tables is more than 5
The number of attached views is more than 5
The number of attached dictionaries is more than 5
The number of attached databases is more than 2 The number of attached databases is more than 2
The number of active parts is more than 10 The number of active parts is more than 10

View File

@ -13,6 +13,39 @@ CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_9 (id
CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_10 (id Int32, str String) Engine=Memory; CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_10 (id Int32, str String) Engine=Memory;
CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_11 (id Int32, str String) Engine=Memory; CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_11 (id Int32, str String) Engine=Memory;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_1 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_1;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_2 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_2;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_3 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_3;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_4 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_4;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_5 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_5;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_6 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_6;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_7 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_7;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_8 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_8;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_9 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_9;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_10 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_10;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_11 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_11;
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_1 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_1'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_2 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_2'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_3 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_3'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_4 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_4'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_5 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_5'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_6 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_6'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_7 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_7'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_8 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_8'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_9 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_9'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_10 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_10'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_1; CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_1;
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_2; CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_2;
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_3; CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_3;
@ -37,7 +70,13 @@ INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_9 VALUES (1, 'Hello'
INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_10 VALUES (1, 'Hello'); INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_10 VALUES (1, 'Hello');
INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_11 VALUES (1, 'Hello'); INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_11 VALUES (1, 'Hello');
SELECT * FROM system.warnings where message in ('The number of attached tables is more than 5', 'The number of attached databases is more than 2', 'The number of active parts is more than 10'); SELECT * FROM system.warnings where message in (
'The number of attached tables is more than 5',
'The number of attached views is more than 5',
'The number of attached dictionaries is more than 5',
'The number of attached databases is more than 2',
'The number of active parts is more than 10'
);
DROP DATABASE IF EXISTS test_max_num_to_warn_02931; DROP DATABASE IF EXISTS test_max_num_to_warn_02931;
DROP DATABASE IF EXISTS test_max_num_to_warn_1; DROP DATABASE IF EXISTS test_max_num_to_warn_1;

View File

@ -0,0 +1,11 @@
<-- Read UNIX endings -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1
<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Data preparation step
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
UNIX_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_without_crlf.tsv"
DOS_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_with_crlf.tsv"
DATA_FILE_UNIX_ENDINGS="${USER_FILES_PATH:?}/${UNIX_ENDINGS}"
DATA_FILE_DOS_ENDINGS="${USER_FILES_PATH:?}/${DOS_ENDINGS}"
touch $DATA_FILE_UNIX_ENDINGS
touch $DATA_FILE_DOS_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\nAegithina_tiphia\t2018-02-01\t34\n1971-72_Utah_Stars_season\t2016-10-01\t1\n" > $DATA_FILE_UNIX_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\r\nAegithina_tiphia\t2018-02-01\t34\r\n1971-72_Utah_Stars_season\t2016-10-01\t1\r\n" > $DATA_FILE_DOS_ENDINGS
echo -e "<-- Read UNIX endings -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${UNIX_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32');"
$CLICKHOUSE_CLIENT --multiquery --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32'); --{serverError 117}"
echo -e "\n<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32') SETTINGS input_format_tsv_crlf_end_of_line = 1;"
# Test teardown
rm $DATA_FILE_UNIX_ENDINGS
rm $DATA_FILE_DOS_ENDINGS

View File

@ -0,0 +1,2 @@
Hello, World! From client.
Hello, World! From local.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client"
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"

View File

@ -0,0 +1 @@
4 3

View File

@ -0,0 +1,13 @@
CREATE TABLE test
(
foo String,
bar String,
)
ENGINE = MergeTree()
ORDER BY (foo, bar);
INSERT INTO test VALUES ('foo', 'bar1');
SELECT COLUMNS(bar, foo) APPLY (length) FROM test;
SELECT COLUMNS(bar, foo, xyz) APPLY (length) FROM test; -- { serverError UNKNOWN_IDENTIFIER }