diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d51c1b242f..96ba2961d3a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,8 +68,9 @@ if (ENABLE_CHECK_HEAVY_BUILDS) set (RLIMIT_AS 20000000000) endif() - # For some files currently building RISCV64 might be too slow. TODO: Improve compilation times per file - if (ARCH_RISCV64) + # For some files currently building RISCV64/LOONGARCH64 might be too slow. + # TODO: Improve compilation times per file + if (ARCH_RISCV64 OR ARCH_LOONGARCH64) set (RLIMIT_CPU 1800) endif() diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index a137eb2bdf2..66d5bd2e574 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -197,6 +197,7 @@ SELECT * FROM nestedt FORMAT TSV - [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`. - [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`. - [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`. +- [input_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV input format will be `\r\n` instead of `\n`. Default value - `false`. - [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`. - [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`. - [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`. diff --git a/docs/en/operations/settings/settings-formats.md b/docs/en/operations/settings/settings-formats.md index 6666f68c177..1a27b350652 100644 --- a/docs/en/operations/settings/settings-formats.md +++ b/docs/en/operations/settings/settings-formats.md @@ -831,7 +831,13 @@ Default value: `0`. ### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line} -Use DOC/Windows-style line separator (CRLF) in TSV instead of Unix style (LF). +Use DOS/Windows-style line separator (CRLF) in TSV instead of Unix style (LF). + +Disabled by default. + +### input_format_tsv_crlf_end_of_line {#input_format_tsv_crlf_end_of_line} + +Use DOS/Windows-style line separator (CRLF) for TSV input files instead of Unix style (LF). Disabled by default. diff --git a/docs/en/sql-reference/functions/other-functions.md b/docs/en/sql-reference/functions/other-functions.md index 11ee471d709..12b565d5358 100644 --- a/docs/en/sql-reference/functions/other-functions.md +++ b/docs/en/sql-reference/functions/other-functions.md @@ -998,17 +998,170 @@ SELECT version() Returns the build ID generated by a compiler for the running ClickHouse server binary. If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise it produces a constant value. -## blockNumber() +## blockNumber -Returns the sequence number of the data block where the row is located. +Returns a monotonically increasing sequence number of the [block](../../development/architecture.md#block) containing the row. +The returned block number is updated on a best-effort basis, i.e. it may not be fully accurate. -## rowNumberInBlock() {#rowNumberInBlock} +**Syntax** + +```sql +blockNumber() +``` + +**Returned value** + +- Sequence number of the data block where the row is located. [UInt64](../data-types/int-uint.md). + +**Example** + +Query: + +```sql +SELECT blockNumber() +FROM +( + SELECT * + FROM system.numbers + LIMIT 10 +) SETTINGS max_block_size = 2 +``` + +Result: + +```response +┌─blockNumber()─┐ +│ 7 │ +│ 7 │ +└───────────────┘ +┌─blockNumber()─┐ +│ 8 │ +│ 8 │ +└───────────────┘ +┌─blockNumber()─┐ +│ 9 │ +│ 9 │ +└───────────────┘ +┌─blockNumber()─┐ +│ 10 │ +│ 10 │ +└───────────────┘ +┌─blockNumber()─┐ +│ 11 │ +│ 11 │ +└───────────────┘ +``` + +## rowNumberInBlock {#rowNumberInBlock} + +Returns for each [block](../../development/architecture.md#block) processed by `rowNumberInBlock` the number of the current row. +The returned number starts for each block at 0. + +**Syntax** + +```sql +rowNumberInBlock() +``` + +**Returned value** + +- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md). + +**Example** + +Query: + +```sql +SELECT rowNumberInBlock() +FROM +( + SELECT * + FROM system.numbers_mt + LIMIT 10 +) SETTINGS max_block_size = 2 +``` + +Result: + +```response +┌─rowNumberInBlock()─┐ +│ 0 │ +│ 1 │ +└────────────────────┘ +┌─rowNumberInBlock()─┐ +│ 0 │ +│ 1 │ +└────────────────────┘ +┌─rowNumberInBlock()─┐ +│ 0 │ +│ 1 │ +└────────────────────┘ +┌─rowNumberInBlock()─┐ +│ 0 │ +│ 1 │ +└────────────────────┘ +┌─rowNumberInBlock()─┐ +│ 0 │ +│ 1 │ +└────────────────────┘ +``` + +## rowNumberInAllBlocks + +Returns a unique row number for each row processed by `rowNumberInAllBlocks`. The returned numbers start at 0. + +**Syntax** + +```sql +rowNumberInAllBlocks() +``` + +**Returned value** + +- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md). + +**Example** + +Query: + +```sql +SELECT rowNumberInAllBlocks() +FROM +( + SELECT * + FROM system.numbers_mt + LIMIT 10 +) +SETTINGS max_block_size = 2 +``` + +Result: + +```response +┌─rowNumberInAllBlocks()─┐ +│ 0 │ +│ 1 │ +└────────────────────────┘ +┌─rowNumberInAllBlocks()─┐ +│ 4 │ +│ 5 │ +└────────────────────────┘ +┌─rowNumberInAllBlocks()─┐ +│ 2 │ +│ 3 │ +└────────────────────────┘ +┌─rowNumberInAllBlocks()─┐ +│ 6 │ +│ 7 │ +└────────────────────────┘ +┌─rowNumberInAllBlocks()─┐ +│ 8 │ +│ 9 │ +└────────────────────────┘ +``` -Returns the ordinal number of the row in the data block. Different data blocks are always recalculated. -## rowNumberInAllBlocks() -Returns the ordinal number of the row in the data block. This function only considers the affected data blocks. ## neighbor diff --git a/docs/ru/interfaces/formats.md b/docs/ru/interfaces/formats.md index a9280de9c7b..4ed42b6fb22 100644 --- a/docs/ru/interfaces/formats.md +++ b/docs/ru/interfaces/formats.md @@ -119,6 +119,7 @@ Hello\nworld Hello\ world ``` +`\n\r` (CRLF) поддерживается с помощью настройки `input_format_tsv_crlf_end_of_line`. Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа. diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 01ed7d70b38..efe23d57478 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -1178,7 +1178,7 @@ void Client::processConfig() pager = config().getString("pager", ""); - setDefaultFormatsFromConfiguration(); + setDefaultFormatsAndCompressionFromConfiguration(); global_context->setClientName(std::string(DEFAULT_CLIENT_NAME)); global_context->setQueryKindInitial(); diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 6d1ebf8d30c..4d5cfb09e6a 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -607,7 +607,7 @@ void LocalServer::processConfig() if (config().has("macros")) global_context->setMacros(std::make_unique(config(), "macros", log)); - setDefaultFormatsFromConfiguration(); + setDefaultFormatsAndCompressionFromConfiguration(); /// Sets external authenticators config (LDAP, Kerberos). global_context->setExternalAuthenticatorsConfig(config()); diff --git a/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp b/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp index e8f40120152..9608ca26f37 100644 --- a/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp +++ b/src/AggregateFunctions/AggregateFunctionsArgMinArgMax.cpp @@ -14,7 +14,7 @@ struct Settings; namespace ErrorCodes { -extern const int CORRUPTED_DATA; +extern const int INCORRECT_DATA; extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int LOGICAL_ERROR; } @@ -198,7 +198,7 @@ public: this->data(place).value().read(buf, *serialization_val, arena); if (unlikely(this->data(place).value().has() != this->data(place).result().has())) throw Exception( - ErrorCodes::CORRUPTED_DATA, + ErrorCodes::INCORRECT_DATA, "Invalid state of the aggregate function {}: has_value ({}) != has_result ({})", getName(), this->data(place).value().has(), diff --git a/src/Analyzer/Passes/AutoFinalOnQueryPass.cpp b/src/Analyzer/Passes/AutoFinalOnQueryPass.cpp index 9bd044dd89c..70aa1a41548 100644 --- a/src/Analyzer/Passes/AutoFinalOnQueryPass.cpp +++ b/src/Analyzer/Passes/AutoFinalOnQueryPass.cpp @@ -42,7 +42,7 @@ private: return; const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage(); - bool is_final_supported = storage && storage->supportsFinal(); + bool is_final_supported = storage && !storage->isRemote() && storage->supportsFinal(); if (!is_final_supported) return; diff --git a/src/Analyzer/QueryTreePassManager.cpp b/src/Analyzer/QueryTreePassManager.cpp index 51f1fb6cc2f..f7919b6422c 100644 --- a/src/Analyzer/QueryTreePassManager.cpp +++ b/src/Analyzer/QueryTreePassManager.cpp @@ -192,7 +192,7 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node) void QueryTreePassManager::runOnlyResolve(QueryTreeNodePtr query_tree_node) { // Run only QueryAnalysisPass and GroupingFunctionsResolvePass passes. - run(query_tree_node, 2); + run(query_tree_node, 3); } void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index) @@ -249,6 +249,7 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze) { manager.addPass(std::make_unique(only_analyze)); manager.addPass(std::make_unique()); + manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); @@ -294,7 +295,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze) manager.addPass(std::make_unique()); - manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index b6f821794f1..f8391c64d5a 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -643,6 +644,9 @@ try bool extras_into_stdout = need_render_progress || logs_into_stdout; bool select_only_into_file = select_into_file && !select_into_file_and_stdout; + if (!out_file_buf && default_output_compression_method != CompressionMethod::None) + out_file_buf = wrapWriteBufferWithCompressionMethod(out_buf, default_output_compression_method, 3, 0); + /// It is not clear how to write progress and logs /// intermixed with data with parallel formatting. /// It may increase code complexity significantly. @@ -735,7 +739,7 @@ bool ClientBase::isRegularFile(int fd) return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode); } -void ClientBase::setDefaultFormatsFromConfiguration() +void ClientBase::setDefaultFormatsAndCompressionFromConfiguration() { if (config().has("output-format")) { @@ -759,6 +763,10 @@ void ClientBase::setDefaultFormatsFromConfiguration() default_output_format = *format_from_file_name; else default_output_format = "TSV"; + + std::optional file_name = tryGetFileNameFromFileDescriptor(STDOUT_FILENO); + if (file_name) + default_output_compression_method = chooseCompressionMethod(*file_name, ""); } else if (is_interactive) { diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 64cbdbe8989..7a0489641c8 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -190,7 +190,7 @@ protected: /// Adjust some settings after command line options and config had been processed. void adjustSettings(); - void setDefaultFormatsFromConfiguration(); + void setDefaultFormatsAndCompressionFromConfiguration(); void initTTYBuffer(ProgressOption progress); @@ -224,6 +224,7 @@ protected: String pager; String default_output_format; /// Query results output format. + CompressionMethod default_output_compression_method = CompressionMethod::None; String default_input_format; /// Tables' format for clickhouse-local. bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering. diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index 9607333b9f7..cfb273b9058 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -30,6 +30,7 @@ namespace ErrorCodes extern const int ASYNC_LOAD_CYCLE; extern const int ASYNC_LOAD_FAILED; extern const int ASYNC_LOAD_CANCELED; + extern const int ASYNC_LOAD_WAIT_FAILED; extern const int LOGICAL_ERROR; } @@ -433,7 +434,7 @@ void AsyncLoader::wait(const LoadJobPtr & job, bool no_throw) std::unique_lock job_lock{job->mutex}; wait(job_lock, job); if (!no_throw && job->load_exception) - std::rethrow_exception(job->load_exception); + throw Exception(ErrorCodes::ASYNC_LOAD_WAIT_FAILED, "Waited job failed: {}", getExceptionMessage(job->load_exception, /* with_stacktrace = */ false)); } void AsyncLoader::remove(const LoadJobSet & jobs) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 44c051401ef..d21eaedcc2a 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -600,6 +600,7 @@ M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \ M(720, USER_EXPIRED) \ M(721, DEPRECATED_FUNCTION) \ + M(722, ASYNC_LOAD_WAIT_FAILED) \ \ M(900, DISTRIBUTED_CACHE_ERROR) \ M(901, CANNOT_USE_DISTRIBUTED_CACHE) \ diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index 79b0b6a1fe1..5454cba8e2e 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -40,6 +40,7 @@ static struct InitFiu REGULAR(use_delayed_remote_source) \ REGULAR(cluster_discovery_faults) \ REGULAR(replicated_sends_failpoint) \ + REGULAR(stripe_log_sink_write_fallpoint)\ ONCE(smt_commit_merge_mutate_zk_fail_after_op) \ ONCE(smt_commit_merge_mutate_zk_fail_before_op) \ ONCE(smt_commit_write_zk_fail_after_op) \ @@ -58,6 +59,7 @@ static struct InitFiu ONCE(execute_query_calling_empty_set_result_func_on_exception) \ ONCE(receive_timeout_on_table_status_response) + namespace FailPoints { #define M(NAME) extern const char(NAME)[] = #NAME ""; diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp index 174997ddf14..9fda58b9008 100644 --- a/src/Common/tests/gtest_async_loader.cpp +++ b/src/Common/tests/gtest_async_loader.cpp @@ -35,6 +35,7 @@ namespace DB::ErrorCodes extern const int ASYNC_LOAD_CYCLE; extern const int ASYNC_LOAD_FAILED; extern const int ASYNC_LOAD_CANCELED; + extern const int ASYNC_LOAD_WAIT_FAILED; } struct Initializer { @@ -262,7 +263,8 @@ TEST(AsyncLoader, CancelPendingJob) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } } @@ -288,7 +290,8 @@ TEST(AsyncLoader, CancelPendingTask) } catch (Exception & e) { - ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } try @@ -298,7 +301,8 @@ TEST(AsyncLoader, CancelPendingTask) } catch (Exception & e) { - ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } } @@ -325,7 +329,8 @@ TEST(AsyncLoader, CancelPendingDependency) } catch (Exception & e) { - ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } try @@ -335,7 +340,8 @@ TEST(AsyncLoader, CancelPendingDependency) } catch (Exception & e) { - ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } } @@ -451,8 +457,9 @@ TEST(AsyncLoader, JobFailure) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED); - ASSERT_TRUE(e.message().find(error_message) != String::npos); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains(error_message)); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_FAILED")); } } @@ -489,8 +496,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); - ASSERT_TRUE(e.message().find(error_message) != String::npos); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); + ASSERT_TRUE(e.message().contains(error_message)); } try { @@ -499,8 +507,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); - ASSERT_TRUE(e.message().find(error_message) != String::npos); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); + ASSERT_TRUE(e.message().contains(error_message)); } } @@ -531,7 +540,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } try { @@ -540,7 +550,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies) } catch (Exception & e) { - ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED); + ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED")); } } diff --git a/src/Common/tryGetFileNameByFileDescriptor.cpp b/src/Common/tryGetFileNameByFileDescriptor.cpp new file mode 100644 index 00000000000..47e81050388 --- /dev/null +++ b/src/Common/tryGetFileNameByFileDescriptor.cpp @@ -0,0 +1,33 @@ +#include + +#ifdef OS_LINUX +# include +#elif defined(OS_DARWIN) +# include +#endif + +#include + + +namespace DB +{ +std::optional tryGetFileNameFromFileDescriptor(int fd) +{ +#ifdef OS_LINUX + std::string proc_path = fmt::format("/proc/self/fd/{}", fd); + char file_path[PATH_MAX] = {'\0'}; + if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1) + return file_path; + return std::nullopt; +#elif defined(OS_DARWIN) + char file_path[PATH_MAX] = {'\0'}; + if (fcntl(fd, F_GETPATH, file_path) != -1) + return file_path; + return std::nullopt; +#else + (void)fd; + return std::nullopt; +#endif +} + +} diff --git a/src/Common/tryGetFileNameByFileDescriptor.h b/src/Common/tryGetFileNameByFileDescriptor.h new file mode 100644 index 00000000000..c38ccb4f851 --- /dev/null +++ b/src/Common/tryGetFileNameByFileDescriptor.h @@ -0,0 +1,10 @@ +#pragma once + +#include +#include + +namespace DB +{ +/// Supports only Linux/MacOS. On other platforms, returns nullopt. +std::optional tryGetFileNameFromFileDescriptor(int fd); +} diff --git a/src/Core/Settings.h b/src/Core/Settings.h index c555b5cb208..8ca965f7e9b 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -80,6 +80,7 @@ class IColumn; M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \ M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \ M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \ + M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \ M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \ M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \ M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \ @@ -1078,6 +1079,7 @@ class IColumn; M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \ M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \ M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \ + M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \ \ M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \ \ diff --git a/src/Core/SettingsChangesHistory.h b/src/Core/SettingsChangesHistory.h index 65c8934cb23..21552a336c0 100644 --- a/src/Core/SettingsChangesHistory.h +++ b/src/Core/SettingsChangesHistory.h @@ -87,12 +87,14 @@ static std::map sett { {"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"}, {"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."}, + {"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"}, {"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."}, {"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."}, {"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."}, {"http_max_chunk_size", 0, 0, "Internal limitation"}, {"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."}, {"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"}, + {"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."}, }}, {"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"}, {"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"}, diff --git a/src/Core/SettingsQuirks.cpp b/src/Core/SettingsQuirks.cpp index 5e7d02dc448..5541cc19653 100644 --- a/src/Core/SettingsQuirks.cpp +++ b/src/Core/SettingsQuirks.cpp @@ -92,7 +92,7 @@ void applySettingsQuirks(Settings & settings, LoggerPtr log) void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log) { - auto getCurrentValue = [¤t_settings](const std::string_view name) -> Field + auto get_current_value = [¤t_settings](const std::string_view name) -> Field { Field current_value; bool has_current_value = current_settings.tryGet(name, current_value); @@ -100,7 +100,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log) return current_value; }; - UInt64 max_threads = getCurrentValue("max_threads").get(); + UInt64 max_threads = get_current_value("max_threads").get(); UInt64 max_threads_max_value = 256 * getNumberOfPhysicalCPUCores(); if (max_threads > max_threads_max_value) { @@ -109,7 +109,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log) current_settings.set("max_threads", max_threads_max_value); } - constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32 + static constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32 std::unordered_set block_rows_settings{ "max_block_size", "max_insert_block_size", @@ -120,13 +120,21 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log) "input_format_parquet_max_block_size"}; for (auto const & setting : block_rows_settings) { - auto block_size = getCurrentValue(setting).get(); - if (block_size > max_sane_block_rows_size) + if (auto block_size = get_current_value(setting).get(); + block_size > max_sane_block_rows_size) { if (log) LOG_WARNING(log, "Sanity check: '{}' value is too high ({}). Reduced to {}", setting, block_size, max_sane_block_rows_size); current_settings.set(setting, max_sane_block_rows_size); } } + + if (auto max_block_size = get_current_value("max_block_size").get(); max_block_size == 0) + { + if (log) + LOG_WARNING(log, "Sanity check: 'max_block_size' cannot be 0. Set to default value {}", DEFAULT_BLOCK_SIZE); + current_settings.set("max_block_size", DEFAULT_BLOCK_SIZE); + } } + } diff --git a/src/DataTypes/Serializations/SerializationAggregateFunction.cpp b/src/DataTypes/Serializations/SerializationAggregateFunction.cpp index bab7c1d4cf2..55f7641e058 100644 --- a/src/DataTypes/Serializations/SerializationAggregateFunction.cpp +++ b/src/DataTypes/Serializations/SerializationAggregateFunction.cpp @@ -146,10 +146,10 @@ void SerializationAggregateFunction::serializeTextEscaped(const IColumn & column } -void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const +void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { String s; - readEscapedString(s, istr); + settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); deserializeFromString(function, column, s, version); } diff --git a/src/DataTypes/Serializations/SerializationBool.cpp b/src/DataTypes/Serializations/SerializationBool.cpp index b63f25ddc35..a71c8a91cef 100644 --- a/src/DataTypes/Serializations/SerializationBool.cpp +++ b/src/DataTypes/Serializations/SerializationBool.cpp @@ -242,8 +242,10 @@ void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & is { if (istr.eof()) throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF."); - - deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; }); + if (settings.tsv.crlf_end_of_line_input) + deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || *buf.position() == '\r'; }); + else + deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; }); } bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const diff --git a/src/DataTypes/Serializations/SerializationCustomSimpleText.cpp b/src/DataTypes/Serializations/SerializationCustomSimpleText.cpp index 938fd050173..1ba16f8492e 100644 --- a/src/DataTypes/Serializations/SerializationCustomSimpleText.cpp +++ b/src/DataTypes/Serializations/SerializationCustomSimpleText.cpp @@ -75,7 +75,7 @@ void SerializationCustomSimpleText::serializeTextEscaped(const IColumn & column, void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { String str; - readEscapedString(str, istr); + settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(str, istr) : readEscapedString(str, istr); deserializeFromString(*this, column, str, settings); } diff --git a/src/DataTypes/Serializations/SerializationEnum.cpp b/src/DataTypes/Serializations/SerializationEnum.cpp index d72442eec99..6d36c6a9a96 100644 --- a/src/DataTypes/Serializations/SerializationEnum.cpp +++ b/src/DataTypes/Serializations/SerializationEnum.cpp @@ -29,7 +29,7 @@ void SerializationEnum::deserializeTextEscaped(IColumn & column, ReadBuffe { /// NOTE It would be nice to do without creating a temporary object - at least extract std::string out. std::string field_name; - readEscapedString(field_name, istr); + settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field_name, istr) : readEscapedString(field_name, istr); assert_cast(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true)); } } diff --git a/src/DataTypes/Serializations/SerializationFixedString.cpp b/src/DataTypes/Serializations/SerializationFixedString.cpp index 481ae2a6165..f919dc16d33 100644 --- a/src/DataTypes/Serializations/SerializationFixedString.cpp +++ b/src/DataTypes/Serializations/SerializationFixedString.cpp @@ -10,8 +10,10 @@ #include #include +#include "Common/PODArray.h" #include #include +#include "base/types.h" namespace DB { @@ -183,14 +185,17 @@ static inline bool tryRead(const SerializationFixedString & self, IColumn & colu } -void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const +void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { - read(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); }); + read(*this, column, [&istr, &settings](ColumnFixedString::Chars & data) + { + settings.tsv.crlf_end_of_line_input ? readEscapedStringInto(data, istr) : readEscapedStringInto(data, istr); + }); } bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const { - return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); return true; }); + return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto,false>(data, istr); return true; }); } diff --git a/src/DataTypes/Serializations/SerializationNullable.cpp b/src/DataTypes/Serializations/SerializationNullable.cpp index 4d31451f92d..06361e24aa2 100644 --- a/src/DataTypes/Serializations/SerializationNullable.cpp +++ b/src/DataTypes/Serializations/SerializationNullable.cpp @@ -286,7 +286,7 @@ bool SerializationNullable::tryDeserializeNullRaw(DB::ReadBuffer & istr, const D } template -ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null) +ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null) { static constexpr bool throw_exception = std::is_same_v; @@ -319,10 +319,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, /// Check if we have enough data in buffer to check if it's a null. if (istr.available() > null_representation.size()) { - auto check_for_null = [&null_representation](ReadBuffer & buf) + auto check_for_null = [&null_representation, &settings](ReadBuffer & buf) { auto * pos = buf.position(); - if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n')) + if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r'))) return true; buf.position() = pos; return false; @@ -334,14 +334,14 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, /// Use PeekableReadBuffer to make a checkpoint before checking null /// representation and rollback if check was failed. PeekableReadBuffer peekable_buf(istr, true); - auto check_for_null = [&null_representation](ReadBuffer & buf_) + auto check_for_null = [&null_representation, &settings](ReadBuffer & buf_) { auto & buf = assert_cast(buf_); buf.setCheckpoint(); SCOPE_EXIT(buf.dropCheckpoint()); - if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n')) - return true; + if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r'))) + return true; buf.rollbackToCheckpoint(); return false; }; @@ -371,7 +371,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos) throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation " - "containing '\\t' or '\\n' may not work correctly for large input."); + "containing '\\t' or '\\n' may not work correctly for large input."); + if (settings.tsv.crlf_end_of_line_input && null_representation.find('\r') != std::string::npos) + throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation " + "containing '\\r' may not work correctly for large input."); WriteBufferFromOwnString parsed_value; if constexpr (escaped) diff --git a/src/DataTypes/Serializations/SerializationObject.cpp b/src/DataTypes/Serializations/SerializationObject.cpp index 67bf7af7799..c82968ce090 100644 --- a/src/DataTypes/Serializations/SerializationObject.cpp +++ b/src/DataTypes/Serializations/SerializationObject.cpp @@ -104,9 +104,9 @@ void SerializationObject::deserializeWholeText(IColumn & column, ReadBuf } template -void SerializationObject::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const +void SerializationObject::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { - deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); }); + deserializeTextImpl(column, [&](String & s) { settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); }); } template diff --git a/src/DataTypes/Serializations/SerializationString.cpp b/src/DataTypes/Serializations/SerializationString.cpp index 8abaa3bd5ea..9e39ab23709 100644 --- a/src/DataTypes/Serializations/SerializationString.cpp +++ b/src/DataTypes/Serializations/SerializationString.cpp @@ -147,7 +147,6 @@ void SerializationString::serializeBinaryBulk(const IColumn & column, WriteBuffe } } - template static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit) { @@ -324,14 +323,17 @@ bool SerializationString::tryDeserializeWholeText(IColumn & column, ReadBuffer & return read(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; }); } -void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const +void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { - read(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); }); + read(column, [&](ColumnString::Chars & data) + { + settings.tsv.crlf_end_of_line_input ? readEscapedStringInto,true>(data, istr) : readEscapedStringInto,false>(data, istr); + }); } bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const { - return read(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); return true; }); + return read(column, [&](ColumnString::Chars & data) { readEscapedStringInto,true>(data, istr); return true; }); } void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const diff --git a/src/DataTypes/Serializations/SerializationVariant.cpp b/src/DataTypes/Serializations/SerializationVariant.cpp index 300686ff8d3..dbd7355944b 100644 --- a/src/DataTypes/Serializations/SerializationVariant.cpp +++ b/src/DataTypes/Serializations/SerializationVariant.cpp @@ -599,14 +599,14 @@ void SerializationVariant::serializeTextEscaped(const IColumn & column, size_t r bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { String field; - readEscapedString(field, istr); + settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr); return tryDeserializeTextEscapedImpl(column, field, settings); } void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const { String field; - readEscapedString(field, istr); + settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr); if (!tryDeserializeTextEscapedImpl(column, field, settings)) throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field); } diff --git a/src/Databases/DatabaseOnDisk.cpp b/src/Databases/DatabaseOnDisk.cpp index 161be35f129..5cb4198e1a2 100644 --- a/src/Databases/DatabaseOnDisk.cpp +++ b/src/Databases/DatabaseOnDisk.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na StoragePtr table = detachTable(local_context, table_name); - /// This is possible for Lazy database. - if (!table) - return; - bool renamed = false; try { fs::rename(table_metadata_path, table_metadata_path_drop); renamed = true; - table->drop(); - table->is_dropped = true; - - fs::path table_data_dir(local_context->getPath() + table_data_path_relative); - if (fs::exists(table_data_dir)) - (void)fs::remove_all(table_data_dir); + // The table might be not loaded for Lazy database engine. + if (table) + { + table->drop(); + table->is_dropped = true; + } } catch (...) { LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true)); - attachTable(local_context, table_name, table, table_data_path_relative); + if (table) + attachTable(local_context, table_name, table, table_data_path_relative); if (renamed) fs::rename(table_metadata_path_drop, table_metadata_path); throw; } + for (const auto & [disk_name, disk] : getContext()->getDisksMap()) + { + if (disk->isReadOnly() || !disk->exists(table_data_path_relative)) + continue; + + LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name); + disk->removeRecursive(table_data_path_relative); + } (void)fs::remove(table_metadata_path_drop); } diff --git a/src/Databases/DatabaseOrdinary.cpp b/src/Databases/DatabaseOrdinary.cpp index 5d36f1cc3d6..58fa7f01947 100644 --- a/src/Databases/DatabaseOrdinary.cpp +++ b/src/Databases/DatabaseOrdinary.cpp @@ -76,6 +76,20 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex String replica_path = server_settings.default_replica_path; String replica_name = server_settings.default_replica_name; + /// Check that replica path doesn't exist + Macros::MacroExpansionInfo info; + StorageID table_id = StorageID(create_query->getDatabase(), create_query->getTable(), create_query->uuid); + info.table_id = table_id; + info.expand_special_macros_only = false; + + String zookeeper_path = context->getMacros()->expand(replica_path, info); + if (context->getZooKeeper()->exists(zookeeper_path)) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Found existing ZooKeeper path {} while trying to convert table {} to replicated. Table will not be converted.", + zookeeper_path, backQuote(table_id.getFullTableName()) + ); + auto args = std::make_shared(); args->children.push_back(std::make_shared(replica_path)); args->children.push_back(std::make_shared(replica_name)); diff --git a/src/Dictionaries/SSDCacheDictionaryStorage.h b/src/Dictionaries/SSDCacheDictionaryStorage.h index e3eea71cd9a..f0b56cbf529 100644 --- a/src/Dictionaries/SSDCacheDictionaryStorage.h +++ b/src/Dictionaries/SSDCacheDictionaryStorage.h @@ -721,11 +721,10 @@ public: if (!block.checkCheckSum()) { std::string calculated_check_sum = std::to_string(block.calculateCheckSum()); - std::string check_sum = std::to_string(block.getCheckSum()); + std::string expected_check_sum = std::to_string(block.getCheckSum()); throw Exception(ErrorCodes::CORRUPTED_DATA, - "Cache data corrupted. Checksum validation failed. Calculated {} in block {}", - calculated_check_sum, - check_sum); + "Cache data corrupted. Checksum validation failed. Calculated {} expected in block {}, in file {}", + calculated_check_sum, expected_check_sum, file_path); } func(blocks_to_fetch[block_to_fetch_index], block.getBlockData()); diff --git a/src/Disks/DiskEncryptedTransaction.h b/src/Disks/DiskEncryptedTransaction.h index 94bd8ef5378..26f4c979bf8 100644 --- a/src/Disks/DiskEncryptedTransaction.h +++ b/src/Disks/DiskEncryptedTransaction.h @@ -244,6 +244,13 @@ public: return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings); } + /// Truncate file to the target size. + void truncateFile(const std::string & src_path, size_t target_size) override + { + auto wrapped_path = wrappedPath(src_path); + delegate_transaction->truncateFile(wrapped_path, target_size); + } + private: diff --git a/src/Disks/FakeDiskTransaction.h b/src/Disks/FakeDiskTransaction.h index f83642eee56..69f08de2517 100644 --- a/src/Disks/FakeDiskTransaction.h +++ b/src/Disks/FakeDiskTransaction.h @@ -2,10 +2,16 @@ #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + /// Fake disk transaction implementation. /// Just execute all operations immediately, commit is noop operation. /// No support for atomicity and rollback. @@ -134,6 +140,11 @@ public: disk.createHardLink(src_path, dst_path); } + void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented"); + } + private: IDisk & disk; }; diff --git a/src/Disks/IDiskTransaction.h b/src/Disks/IDiskTransaction.h index 49fcdde1a4f..fc84281baea 100644 --- a/src/Disks/IDiskTransaction.h +++ b/src/Disks/IDiskTransaction.h @@ -128,6 +128,9 @@ public: /// Create hardlink from `src_path` to `dst_path`. virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0; + + /// Truncate file to the target size. + virtual void truncateFile(const std::string & src_path, size_t target_size) = 0; }; using DiskTransactionPtr = std::shared_ptr; diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp index a535b007541..bae58f0b9c6 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.cpp @@ -257,6 +257,7 @@ std::unique_ptr getAzureBlobStorageSettings(const Po settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size); settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size); settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false); + settings->max_blocks_in_multipart_upload = config.getUInt64(config_prefix + ".max_blocks_in_multipart_upload", 50000); settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries); settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file); settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size); diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h index f12ebb68dbb..c3062def763 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h +++ b/src/Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h @@ -63,6 +63,7 @@ struct AzureObjectStorageSettings bool use_native_copy = false; size_t max_unexpected_write_error_retries = 4; size_t max_inflight_parts_for_one_file = 20; + size_t max_blocks_in_multipart_upload = 50000; size_t strict_upload_part_size = 0; size_t upload_part_size_multiply_factor = 2; size_t upload_part_size_multiply_parts_count_threshold = 500; diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index f6980d1e8f1..d4ff9bc0b79 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -133,6 +133,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat transaction->commit(); } +void DiskObjectStorage::truncateFile(const String & path, size_t size) +{ + LOG_TEST(log, "Truncate file operation {} to size : {}", path, size); + auto transaction = createObjectStorageTransaction(); + transaction->truncateFile(path, size); + transaction->commit(); +} + void DiskObjectStorage::copyFile( /// NOLINT const String & from_file_path, IDisk & to_disk, diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index fe8a5e2844a..2a27ddf89a7 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -84,6 +84,8 @@ public: void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override; + void truncateFile(const String & path, size_t size) override; + MetadataStoragePtr getMetadataStorage() override { return metadata_storage; } UInt32 getRefCount(const String & path) const override; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp index 19b8b51384f..44854633d65 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_FORMAT; + extern const int LOGICAL_ERROR; } void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf) @@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size) keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}}); } +ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject() +{ + if (keys_with_meta.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path); + + ObjectKeyWithMetadata object = std::move(keys_with_meta.back()); + keys_with_meta.pop_back(); + total_size -= object.metadata.size_bytes; + + return object; +} + bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting() { #ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h index 729d93af10d..4f45f5b7ddf 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h @@ -52,6 +52,7 @@ public: void addObject(ObjectStorageKey key, size_t size); + ObjectKeyWithMetadata popLastObject(); void deserialize(ReadBuffer & buf); void deserializeFromString(const std::string & data); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp index 4e364e44624..e7c85bea1c6 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp @@ -559,6 +559,54 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation } }; +struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation +{ + std::string path; + size_t size; + + TruncateFileOperationOutcomePtr truncate_outcome; + + TruncateFileObjectStorageOperation( + IObjectStorage & object_storage_, + IMetadataStorage & metadata_storage_, + const std::string & path_, + size_t size_) + : IDiskObjectStorageOperation(object_storage_, metadata_storage_) + , path(path_) + , size(size_) + {} + + std::string getInfoForLog() const override + { + return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size); + } + + void execute(MetadataTransactionPtr tx) override + { + if (metadata_storage.exists(path)) + { + if (!metadata_storage.isFile(path)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path); + + truncate_outcome = tx->truncateFile(path, size); + } + } + + void undo() override + { + + } + + void finalize() override + { + if (!truncate_outcome) + return; + + if (!truncate_outcome->objects_to_remove.empty()) + object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove); + } +}; + } void DiskObjectStorageTransaction::createDirectory(const std::string & path) @@ -598,6 +646,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri })); } +void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size) +{ + operations_to_execute.emplace_back( + std::make_unique(object_storage, metadata_storage, path, size) + ); +} + void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path) { auto operation = std::make_unique(object_storage, metadata_storage, from_path, to_path); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h index 67044751b84..23f66990d54 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.h @@ -92,6 +92,8 @@ public: void createFile(const String & path) override; + void truncateFile(const String & path, size_t size) override; + void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override; /// writeFile is a difficult function for transactions. diff --git a/src/Disks/ObjectStorages/IMetadataStorage.h b/src/Disks/ObjectStorages/IMetadataStorage.h index 168160f61a6..bed24849ed6 100644 --- a/src/Disks/ObjectStorages/IMetadataStorage.h +++ b/src/Disks/ObjectStorages/IMetadataStorage.h @@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome UInt32 num_hardlinks = std::numeric_limits::max(); }; +struct TruncateFileOperationOutcome +{ + StoredObjects objects_to_remove; +}; + + using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr; +using TruncateFileOperationOutcomePtr = std::shared_ptr; + /// Tries to provide some "transactions" interface, which allow /// to execute (commit) operations simultaneously. We don't provide @@ -143,6 +151,11 @@ public: return nullptr; } + virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */) + { + throwNotImplemented(); + } + virtual ~IMetadataTransaction() = default; protected: diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp index ab952888419..493470982cb 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp @@ -259,4 +259,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink return result; } +TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size) +{ + auto operation = std::make_unique(path, target_size, metadata_storage, *metadata_storage.getDisk()); + auto result = operation->outcome; + addOperation(std::move(operation)); + return result; +} + } diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index df16bf76a3c..8096b3b4565 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -129,6 +129,8 @@ public: UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override; + TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override; + }; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp index 194a735f64f..79d1f4a1f7c 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp @@ -4,9 +4,12 @@ #include #include #include +#include +#include #include #include #include +#include namespace fs = std::filesystem; @@ -14,6 +17,11 @@ namespace fs = std::filesystem; namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + static std::string getTempFileName(const std::string & dir) { return fs::path(dir) / getRandomASCIIString(32); @@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo(std::unique_lock & lock) outcome->num_hardlinks++; } +void TruncateMetadataFileOperation::execute(std::unique_lock & metadata_lock) +{ + if (metadata_storage.exists(path)) + { + auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); + while (metadata->getTotalSizeBytes() > target_size) + { + auto object_key_with_metadata = metadata->popLastObject(); + outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes); + } + if (metadata->getTotalSizeBytes() != target_size) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size); + } + LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size()); + + write_operation = std::make_unique(path, disk, metadata->serializeToString()); + + write_operation->execute(metadata_lock); + } +} + +void TruncateMetadataFileOperation::undo(std::unique_lock & lock) +{ + if (write_operation) + write_operation->undo(lock); +} + + void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h index 3df29833f44..26f9f6460a4 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h @@ -282,4 +282,34 @@ private: std::unique_ptr write_operation; }; +struct TruncateMetadataFileOperation final : public IMetadataOperation +{ + const TruncateFileOperationOutcomePtr outcome = std::make_shared(); + + TruncateMetadataFileOperation( + const std::string & path_, + size_t target_size_, + const MetadataStorageFromDisk & metadata_storage_, + IDisk & disk_) + : path(path_) + , target_size(target_size_) + , metadata_storage(metadata_storage_) + , disk(disk_) + { + } + + void execute(std::unique_lock & metadata_lock) override; + + void undo(std::unique_lock & lock) override; + +private: + std::string path; + size_t target_size; + + const MetadataStorageFromDisk & metadata_storage; + IDisk & disk; + + std::unique_ptr write_operation; +}; + } diff --git a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp index cc53054c775..c83b9247b99 100644 --- a/src/Disks/ObjectStorages/ObjectStorageFactory.cpp +++ b/src/Disks/ObjectStorages/ObjectStorageFactory.cpp @@ -306,7 +306,6 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory) bool /* skip_access_check */) -> ObjectStoragePtr { AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix); - std::string endpoint_string = endpoint.getEndpoint(); return createObjectStorage( ObjectStorageType::Azure, config, config_prefix, name, diff --git a/src/Formats/EscapingRuleUtils.cpp b/src/Formats/EscapingRuleUtils.cpp index 3edade639df..89a7a31d033 100644 --- a/src/Formats/EscapingRuleUtils.cpp +++ b/src/Formats/EscapingRuleUtils.cpp @@ -76,7 +76,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca /// Empty field, just skip spaces break; case FormatSettings::EscapingRule::Escaped: - readEscapedStringInto(out, buf); + readEscapedStringInto(out, buf); break; case FormatSettings::EscapingRule::Quoted: readQuotedFieldInto(out, buf); diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 3199445864d..90630d30c20 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -15,7 +16,7 @@ #include #include #include -#include +#include #include @@ -202,6 +203,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header; format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines; format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns; + format_settings.tsv.crlf_end_of_line_input = settings.input_format_tsv_crlf_end_of_line; format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals; format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon; format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions; @@ -693,21 +695,12 @@ String FormatFactory::getFormatFromFileName(String file_name) std::optional FormatFactory::tryGetFormatFromFileDescriptor(int fd) { -#ifdef OS_LINUX - std::string proc_path = fmt::format("/proc/self/fd/{}", fd); - char file_path[PATH_MAX] = {'\0'}; - if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1) - return tryGetFormatFromFileName(file_path); + std::optional file_name = tryGetFileNameFromFileDescriptor(fd); + + if (file_name) + return tryGetFormatFromFileName(*file_name); + return std::nullopt; -#elif defined(OS_DARWIN) - char file_path[PATH_MAX] = {'\0'}; - if (fcntl(fd, F_GETPATH, file_path) != -1) - return tryGetFormatFromFileName(file_path); - return std::nullopt; -#else - (void)fd; - return std::nullopt; -#endif } String FormatFactory::getFormatFromFileDescriptor(int fd) diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index f29fc51af6a..cada75d340c 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -361,6 +361,7 @@ struct FormatSettings bool try_detect_header = true; bool skip_trailing_empty_lines = false; bool allow_variable_number_of_columns = false; + bool crlf_end_of_line_input = false; } tsv{}; struct diff --git a/src/Functions/CMakeLists.txt b/src/Functions/CMakeLists.txt index 751e8cf5103..c52b00150ec 100644 --- a/src/Functions/CMakeLists.txt +++ b/src/Functions/CMakeLists.txt @@ -31,7 +31,7 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources}) if (OMIT_HEAVY_DEBUG_SYMBOLS) target_compile_options(clickhouse_functions_obj PRIVATE "-g0") - set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0") + set_source_files_properties(${DBMS_FUNCTIONS} DIRECTORY .. PROPERTIES COMPILE_FLAGS "-g0") endif() list (APPEND OBJECT_LIBS $) diff --git a/src/Functions/FunctionsHashing.h b/src/Functions/FunctionsHashing.h index 3d8a11319c4..27717ea3611 100644 --- a/src/Functions/FunctionsHashing.h +++ b/src/Functions/FunctionsHashing.h @@ -49,6 +49,8 @@ #include #include +#include + namespace DB { @@ -75,17 +77,29 @@ namespace impl ColumnPtr key0; ColumnPtr key1; bool is_const; + const ColumnArray::Offsets * offsets{}; size_t size() const { assert(key0 && key1); assert(key0->size() == key1->size()); + assert(offsets == nullptr || offsets->size() == key0->size()); + if (offsets != nullptr) + return offsets->back(); return key0->size(); } SipHashKey getKey(size_t i) const { if (is_const) i = 0; + if (offsets != nullptr) + { + const auto *const begin = offsets->begin(); + const auto * upper = std::upper_bound(begin, offsets->end(), i); + if (upper == offsets->end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "offset {} not found in function SipHashKeyColumns::getKey", i); + i = upper - begin; + } const auto & key0data = assert_cast(*key0).getData(); const auto & key1data = assert_cast(*key1).getData(); return {key0data[i], key1data[i]}; @@ -1112,7 +1126,15 @@ private: typename ColumnVector::Container vec_temp(nested_size); bool nested_is_first = true; - executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first); + + if constexpr (Keyed) + { + KeyColumnsType key_cols_tmp{key_cols}; + key_cols_tmp.offsets = &offsets; + executeForArgument(key_cols_tmp, nested_type, nested_column, vec_temp, nested_is_first); + } + else + executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first); const size_t size = offsets.size(); diff --git a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp index 667e63729ca..8bd436f218c 100644 --- a/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp +++ b/src/IO/AzureBlobStorage/copyAzureBlobStorageFile.cpp @@ -30,6 +30,7 @@ namespace ErrorCodes { extern const int INVALID_CONFIG_PARAMETER; extern const int AZURE_BLOB_STORAGE_ERROR; + extern const int LOGICAL_ERROR; } namespace @@ -94,11 +95,56 @@ namespace void calculatePartSize() { - auto max_upload_part_size = settings->max_upload_part_size; - if (!max_upload_part_size) - throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0"); + if (!total_size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen"); + + auto max_part_number = settings->max_blocks_in_multipart_upload; + const auto min_upload_part_size = settings->min_upload_part_size; + const auto max_upload_part_size = settings->max_upload_part_size; + + if (!max_part_number) + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_blocks_in_multipart_upload must not be 0"); + else if (!min_upload_part_size) + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0"); + else if (max_upload_part_size < min_upload_part_size) + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size"); + + size_t part_size = min_upload_part_size; + auto num_parts = (total_size + part_size - 1) / part_size; + + if (num_parts > max_part_number) + { + part_size = (total_size + max_part_number - 1) / max_part_number; + num_parts = (total_size + part_size - 1) / part_size; + } + + if (part_size > max_upload_part_size) + { + part_size = max_upload_part_size; + num_parts = (total_size + part_size - 1) / part_size; + } + + String error; + if (num_parts < 1) + error = "Number of parts is zero"; + else if (num_parts > max_part_number) + error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number); + else if (part_size < min_upload_part_size) + error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size); + else if (part_size > max_upload_part_size) + error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size); + + if (!error.empty()) + { + throw Exception( + ErrorCodes::INVALID_CONFIG_PARAMETER, + "{} while writing {} bytes to Azure. Check max_part_number = {}, " + "min_upload_part_size = {}, max_upload_part_size = {}", + error, total_size, max_part_number, min_upload_part_size, max_upload_part_size); + } + /// We've calculated the size of a normal part (the final part can be smaller). - normal_part_size = max_upload_part_size; + normal_part_size = part_size; } public: @@ -219,21 +265,22 @@ namespace auto block_blob_client = client->GetBlockBlobClient(dest_blob); auto read_buffer = std::make_unique(create_read_buffer(), task.part_offset, task.part_size); - while (!read_buffer->eof()) - { - auto size = read_buffer->available(); - if (size > 0) - { - auto block_id = getRandomASCIIString(64); - Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast(read_buffer->position()), size); - block_blob_client.StageBlock(block_id, memory); - task.block_ids.emplace_back(block_id); - read_buffer->ignore(size); - LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id); - } - } - std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race - LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size()); + + /// task.part_size is already normalized according to min_upload_part_size and max_upload_part_size. + size_t size_to_stage = task.part_size; + + PODArray memory; + memory.resize(size_to_stage); + WriteBufferFromVector> wb(memory); + + copyData(*read_buffer, wb, size_to_stage); + Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast(memory.data()), size_to_stage); + + const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64)); + block_blob_client.StageBlock(block_id, stream); + + LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}, size: {}", + dest_container_for_logging, dest_blob, block_id, size_to_stage); } @@ -300,21 +347,32 @@ void copyAzureBlobStorageFile( if (size < settings->max_single_part_copy_size) { + LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob); block_blob_client_dest.CopyFromUri(source_uri); } else { Azure::Storage::Blobs::StartBlobCopyOperation operation = block_blob_client_dest.StartCopyFromUri(source_uri); - // Wait for the operation to finish, checking for status every 100 second. auto copy_response = operation.PollUntilDone(std::chrono::milliseconds(100)); auto properties_model = copy_response.Value; - if (properties_model.CopySource.HasValue()) - { - throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy failed"); - } + auto copy_status = properties_model.CopyStatus; + auto copy_status_description = properties_model.CopyStatusDescription; + + if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success) + { + LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob); + } + else + { + if (copy_status.HasValue()) + throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} failed with status {} description {} (operation is done {})", + src_blob, dest_blob, copy_status.Value().ToString(), copy_status_description.Value(), operation.IsDone()); + else + throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} didn't complete with success status (operation is done {})", src_blob, dest_blob, operation.IsDone()); + } } } else @@ -322,8 +380,8 @@ void copyAzureBlobStorageFile( LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob); auto create_read_buffer = [&] { - return std::make_unique(src_client, src_blob, read_settings, settings->max_single_read_retries, - settings->max_single_download_retries); + return std::make_unique( + src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries); }; UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")}; diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index b428b1c7d8a..c771fced73a 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -352,7 +352,6 @@ static ReturnType parseComplexEscapeSequence(Vector & s, ReadBuffer & buf) { return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE); } - s.push_back(unhex2(hex_code)); } else if (char_after_backslash == 'N') @@ -608,13 +607,20 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf, bool kee } -template +template void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf) { while (!buf.eof()) { - char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end()); - + char * next_pos; + if constexpr (support_crlf) + { + next_pos = find_first_symbols<'\t', '\n', '\\','\r'>(buf.position(), buf.buffer().end()); + } + else + { + next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end()); + } appendToStringOrVector(s, buf, next_pos); buf.position() = next_pos; @@ -641,25 +647,46 @@ void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf) } } } + + if constexpr (support_crlf) + { + if (*buf.position() == '\r') + { + ++buf.position(); + if (!buf.eof() && *buf.position() != '\n') + { + s.push_back('\r'); + continue; + } + return; + } + } } } -template +template void readEscapedStringInto(Vector & s, ReadBuffer & buf) { - readEscapedStringIntoImpl(s, buf); + readEscapedStringIntoImpl(s, buf); } void readEscapedString(String & s, ReadBuffer & buf) { s.clear(); - readEscapedStringInto(s, buf); + readEscapedStringInto(s, buf); } -template void readEscapedStringInto>(PaddedPODArray & s, ReadBuffer & buf); -template void readEscapedStringInto(NullOutput & s, ReadBuffer & buf); +void readEscapedStringCRLF(String & s, ReadBuffer & buf) +{ + s.clear(); + readEscapedStringInto(s, buf); +} +template void readEscapedStringInto,false>(PaddedPODArray & s, ReadBuffer & buf); +template void readEscapedStringInto(NullOutput & s, ReadBuffer & buf); +template void readEscapedStringInto,true>(PaddedPODArray & s, ReadBuffer & buf); +template void readEscapedStringInto(NullOutput & s, ReadBuffer & buf); /** If enable_sql_style_quoting == true, * strings like 'abc''def' will be parsed as abc'def. @@ -2069,7 +2096,14 @@ bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & void readTSVField(String & s, ReadBuffer & buf) { s.clear(); - readEscapedStringIntoImpl(s, buf); + readEscapedStringIntoImpl(s, buf); } +void readTSVFieldCRLF(String & s, ReadBuffer & buf) +{ + s.clear(); + readEscapedStringIntoImpl(s, buf); +} + + } diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index 63bfae513e7..ffba4fafb5c 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -583,6 +583,8 @@ void readString(String & s, ReadBuffer & buf); void readEscapedString(String & s, ReadBuffer & buf); +void readEscapedStringCRLF(String & s, ReadBuffer & buf); + void readQuotedString(String & s, ReadBuffer & buf); void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf); @@ -645,7 +647,7 @@ void readStringInto(Vector & s, ReadBuffer & buf); template void readNullTerminated(Vector & s, ReadBuffer & buf); -template +template void readEscapedStringInto(Vector & s, ReadBuffer & buf); template @@ -1901,6 +1903,7 @@ void readJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & se bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings); void readTSVField(String & s, ReadBuffer & buf); +void readTSVFieldCRLF(String & s, ReadBuffer & buf); /** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters). * It is assumed that the cursor is located on the `\` symbol diff --git a/src/IO/S3/copyS3File.cpp b/src/IO/S3/copyS3File.cpp index 549d0a569c6..cff6fa5ad21 100644 --- a/src/IO/S3/copyS3File.cpp +++ b/src/IO/S3/copyS3File.cpp @@ -316,23 +316,23 @@ namespace num_parts = (total_size + part_size - 1) / part_size; } - if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size) - { - String msg; - if (num_parts < 1) - msg = "Number of parts is zero"; - else if (num_parts > max_part_number) - msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number); - else if (part_size < min_upload_part_size) - msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size); - else - msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size); + String error; + if (num_parts < 1) + error = "Number of parts is zero"; + else if (num_parts > max_part_number) + error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number); + else if (part_size < min_upload_part_size) + error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size); + else if (part_size > max_upload_part_size) + error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size); + if (!error.empty()) + { throw Exception( ErrorCodes::INVALID_CONFIG_PARAMETER, "{} while writing {} bytes to S3. Check max_part_number = {}, " "min_upload_part_size = {}, max_upload_part_size = {}", - msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size); + error, total_size, max_part_number, min_upload_part_size, max_upload_part_size); } /// We've calculated the size of a normal part (the final part can be smaller). diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 6607df8d9af..9ca521a4ab3 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -462,8 +462,8 @@ void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_re { #if !defined(SANITIZER) && !defined(__APPLE__) /// profilers are useless without trace collector - auto global_context_ptr = global_context.lock(); - if (!global_context_ptr || !global_context_ptr->hasTraceCollector()) + auto context = Context::getGlobalContextInstance(); + if (!context->hasTraceCollector()) return; try diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index f1f72a4ea4a..1fa86018d5b 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -103,7 +103,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; extern const int QUERY_WAS_CANCELLED; - extern const int INCORRECT_DATA; extern const int SYNTAX_ERROR; extern const int SUPPORT_IS_DISABLED; extern const int INCORRECT_QUERY; @@ -1256,34 +1255,6 @@ static std::tuple executeQueryImpl( } } } - // Here we check if our our projections contain force_optimize_projection_name - if (!settings.force_optimize_projection_name.value.empty()) - { - bool found = false; - std::set projections; - { - const auto & access_info = context->getQueryAccessInfo(); - std::lock_guard lock(access_info.mutex); - projections = access_info.projections; - } - - for (const auto &projection : projections) - { - // projection value has structure like: .. - // We need to get only the projection name - size_t last_dot_pos = projection.find_last_of('.'); - std::string projection_name = (last_dot_pos != std::string::npos) ? projection.substr(last_dot_pos + 1) : projection; - if (settings.force_optimize_projection_name.value == projection_name) - { - found = true; - break; - } - } - - if (!found) - throw Exception(ErrorCodes::INCORRECT_DATA, "Projection {} is specified in setting force_optimize_projection_name but not used", - settings.force_optimize_projection_name.value); - } if (process_list_entry) { @@ -1421,7 +1392,16 @@ void executeQuery( const char * begin; const char * end; - istr.nextIfAtEnd(); + try + { + istr.nextIfAtEnd(); + } + catch (...) + { + /// If buffer contains invalid data and we failed to decompress, we still want to have some information about the query in the log. + logQuery("", context, /* internal = */ false, QueryProcessingStage::Complete); + throw; + } size_t max_query_size = context->getSettingsRef().max_query_size; diff --git a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp index 5382527fcdc..4d67bc1a4e9 100644 --- a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp @@ -135,7 +135,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex /// If the key is not found, skip the value. NullOutput sink; - readEscapedStringInto(sink, *in); + readEscapedStringInto(sink, *in); } else { diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index 09f8fa92e5f..6d4dcba9e60 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -10,6 +10,8 @@ #include #include #include +#include +#include "Formats/FormatSettings.h" namespace DB { @@ -28,7 +30,8 @@ static void checkForCarriageReturn(ReadBuffer & in) throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row." "\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format." " You must transform your file to Unix format." - "\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r."); + "\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r" + "\nor else enable setting 'input_format_tsv_crlf_end_of_line'"); } TabSeparatedRowInputFormat::TabSeparatedRowInputFormat( @@ -92,7 +95,12 @@ void TabSeparatedFormatReader::skipRowEndDelimiter() if (buf->eof()) return; - if (unlikely(first_row)) + if (format_settings.tsv.crlf_end_of_line_input) + { + if (*buf->position() == '\r') + ++buf->position(); + } + else if (unlikely(first_row)) { checkForCarriageReturn(*buf); first_row = false; @@ -105,14 +113,15 @@ template String TabSeparatedFormatReader::readFieldIntoString() { String field; + bool support_crlf = format_settings.tsv.crlf_end_of_line_input; if (is_raw) readString(field, *buf); else { if constexpr (read_string) - readEscapedString(field, *buf); + support_crlf ? readEscapedStringCRLF(field, *buf) : readEscapedString(field, *buf); else - readTSVField(field, *buf); + support_crlf ? readTSVFieldCRLF(field, *buf) : readTSVField(field, *buf); } return field; } @@ -123,7 +132,7 @@ void TabSeparatedFormatReader::skipField() if (is_raw) readStringInto(out, *buf); else - readEscapedStringInto(out, *buf); + format_settings.tsv.crlf_end_of_line_input ? readEscapedStringInto(out, *buf) : readEscapedStringInto(out, *buf); } void TabSeparatedFormatReader::skipHeaderRow() @@ -155,7 +164,7 @@ bool TabSeparatedFormatReader::readField(IColumn & column, const DataTypePtr & t const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/) { const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t'; - const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n'); + const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r')); if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end)) { @@ -220,7 +229,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out) try { - assertChar('\n', *buf); + if (!format_settings.tsv.crlf_end_of_line_input) + assertChar('\n', *buf); + else + assertChar('\r', *buf); } catch (const DB::Exception &) { @@ -233,7 +245,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out) else if (*buf->position() == '\r') { out << "ERROR: Carriage return found where line feed is expected." - " It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n"; + " It's like your file has DOS/Windows style line separators. \n" + "You must transform your file to Unix format. \n" + "But if you really need carriage return at end of string value of last column, you need to escape it as \\r \n" + "or else enable setting 'input_format_tsv_crlf_end_of_line'"; } else { @@ -348,7 +363,7 @@ void TabSeparatedFormatReader::skipRow() bool TabSeparatedFormatReader::checkForEndOfRow() { - return buf->eof() || *buf->position() == '\n'; + return buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r'); } TabSeparatedSchemaReader::TabSeparatedSchemaReader( diff --git a/src/Processors/QueryPlan/Optimizations/Optimizations.h b/src/Processors/QueryPlan/Optimizations/Optimizations.h index 18f1496d26a..b33a373a970 100644 --- a/src/Processors/QueryPlan/Optimizations/Optimizations.h +++ b/src/Processors/QueryPlan/Optimizations/Optimizations.h @@ -111,8 +111,11 @@ void optimizePrimaryKeyCondition(const Stack & stack); void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes); void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes); void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &); -bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections); -bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes); + +/// Returns the name of used projection or nullopt if no projection is used. +std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections); +std::optional optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes); + bool addPlansForSets(QueryPlan & plan, QueryPlan::Node & node, QueryPlan::Nodes & nodes); /// Enable memory bound merging of aggregation states for remote queries diff --git a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp index 80923159ddc..2738de1ff5f 100644 --- a/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp +++ b/src/Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.cpp @@ -46,7 +46,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const settings.optimize_projection = from.optimize_use_projections; settings.force_use_projection = settings.optimize_projection && from.force_optimize_projection; - settings.force_projection_name = from.force_optimize_projection_name; + settings.force_projection_name = settings.optimize_projection ? from.force_optimize_projection_name.value : ""; settings.optimize_use_implicit_projections = settings.optimize_projection && from.optimize_use_implicit_projections; return settings; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp index 915e664ea8f..df9e095af30 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeTree.cpp @@ -12,6 +12,7 @@ namespace DB namespace ErrorCodes { + extern const int INCORRECT_DATA; extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS; extern const int PROJECTION_NOT_USED; } @@ -106,7 +107,7 @@ void optimizeTreeFirstPass(const QueryPlanOptimizationSettings & settings, Query void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes) { const size_t max_optimizations_to_apply = optimization_settings.max_optimizations_to_apply; - size_t num_applied_projection = 0; + std::unordered_set applied_projection_names; bool has_reading_from_mt = false; Stack stack; @@ -159,9 +160,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s /// Projection optimization relies on PK optimization if (optimization_settings.optimize_projection) - num_applied_projection - += optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections); - + { + auto applied_projection = optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections); + if (applied_projection) + applied_projection_names.insert(*applied_projection); + } if (optimization_settings.aggregation_in_order) optimizeAggregationInOrder(*frame.node, nodes); @@ -180,11 +183,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s if (optimization_settings.optimize_projection) { /// Projection optimization relies on PK optimization - if (optimizeUseNormalProjections(stack, nodes)) + if (auto applied_projection = optimizeUseNormalProjections(stack, nodes)) { - ++num_applied_projection; + applied_projection_names.insert(*applied_projection); - if (max_optimizations_to_apply && max_optimizations_to_apply < num_applied_projection) + if (max_optimizations_to_apply && max_optimizations_to_apply < applied_projection_names.size()) throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS, "Too many projection optimizations applied to query plan. Current limit {}", max_optimizations_to_apply); @@ -201,10 +204,16 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s stack.pop_back(); } - if (optimization_settings.force_use_projection && has_reading_from_mt && num_applied_projection == 0) + if (optimization_settings.force_use_projection && has_reading_from_mt && applied_projection_names.empty()) throw Exception( ErrorCodes::PROJECTION_NOT_USED, "No projection is used when optimize_use_projections = 1 and force_optimize_projection = 1"); + + if (!optimization_settings.force_projection_name.empty() && has_reading_from_mt && !applied_projection_names.contains(optimization_settings.force_projection_name)) + throw Exception( + ErrorCodes::INCORRECT_DATA, + "Projection {} is specified in setting force_optimize_projection_name but not used", + optimization_settings.force_projection_name); } void optimizeTreeThirdPass(QueryPlan & plan, QueryPlan::Node & root, QueryPlan::Nodes & nodes) diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp index 30ff9970790..4017670ad14 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseAggregateProjection.cpp @@ -552,28 +552,28 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node) return nullptr; } -bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections) +std::optional optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections) { if (node.children.size() != 1) - return false; + return {}; auto * aggregating = typeid_cast(node.step.get()); if (!aggregating) - return false; + return {}; if (!aggregating->canUseProjection()) - return false; + return {}; QueryPlan::Node * reading_node = findReadingStep(*node.children.front()); if (!reading_node) - return false; + return {}; auto * reading = typeid_cast(reading_node->step.get()); if (!reading) - return false; + return {}; if (!canUseProjectionForReadingStep(reading)) - return false; + return {}; std::shared_ptr max_added_blocks = getMaxAddedBlocks(reading); @@ -597,7 +597,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & if (ordinary_reading_marks == 0) { reading->setAnalyzedResult(std::move(ordinary_reading_select_result)); - return false; + return {}; } const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges; @@ -631,15 +631,14 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & if (!best_candidate) { reading->setAnalyzedResult(std::move(ordinary_reading_select_result)); - return false; + return {}; } } else { - return false; + return {}; } - Context::QualifiedProjectionName projection_name; chassert(best_candidate != nullptr); QueryPlanStepPtr projection_reading; @@ -654,12 +653,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & Pipe pipe(std::make_shared(std::move(candidates.minmax_projection->block))); projection_reading = std::make_unique(std::move(pipe)); has_ordinary_parts = false; - - projection_name = Context::QualifiedProjectionName - { - .storage_id = reading->getMergeTreeData().getStorageID(), - .projection_name = candidates.minmax_projection->candidate.projection->name, - }; } else { @@ -691,12 +684,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & projection_reading = std::make_unique(std::move(pipe)); } - projection_name = Context::QualifiedProjectionName - { - .storage_id = reading->getMergeTreeData().getStorageID(), - .projection_name = best_candidate->projection->name, - }; - has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr; if (has_ordinary_parts) reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr)); @@ -746,7 +733,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & node.children.push_back(&expr_or_filter_node); } - return true; + return best_candidate->projection->name; } } diff --git a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp index 13c6c6b0821..728aaaa6fc4 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeUseNormalProjection.cpp @@ -73,16 +73,16 @@ static bool hasAllRequiredColumns(const ProjectionDescription * projection, cons } -bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) +std::optional optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) { const auto & frame = stack.back(); auto * reading = typeid_cast(frame.node->step.get()); if (!reading) - return false; + return {}; if (!canUseProjectionForReadingStep(reading)) - return false; + return {}; auto iter = stack.rbegin(); while (std::next(iter) != stack.rend()) @@ -96,7 +96,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) /// Dangling query plan node. This might be generated by StorageMerge. if (iter->node->step.get() == reading) - return false; + return {}; const auto metadata = reading->getStorageMetadata(); const auto & projections = metadata->projections; @@ -107,7 +107,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) normal_projections.push_back(&projection); if (normal_projections.empty()) - return false; + return {}; ContextPtr context = reading->getContext(); auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection) @@ -126,7 +126,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) { auto & child = iter->node->children[iter->next_child - 1]; if (!query.build(*child)) - return false; + return {}; if (query.dag) query.dag->removeUnusedActions(); @@ -146,7 +146,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) if (ordinary_reading_marks == 0) { reading->setAnalyzedResult(std::move(ordinary_reading_select_result)); - return false; + return {}; } const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges; @@ -185,7 +185,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) if (!best_candidate) { reading->setAnalyzedResult(std::move(ordinary_reading_select_result)); - return false; + return {}; } auto storage_snapshot = reading->getStorageSnapshot(); @@ -283,8 +283,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes) /// Here we remove last steps from stack to be able to optimize again. /// In theory, read-in-order can be applied to projection. stack.resize(iter.base() - stack.begin()); - - return true; + return best_candidate->projection->name; } } diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index a677c537622..d1db4cb3951 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -707,11 +707,11 @@ void HTTPHandler::processQuery( /// The data can also be compressed using incompatible internal algorithm. This is indicated by /// 'decompress' query parameter. std::unique_ptr in_post_maybe_compressed; - bool in_post_compressed = false; + bool is_in_post_compressed = false; if (params.getParsed("decompress", false)) { - in_post_maybe_compressed = std::make_unique(*in_post); - in_post_compressed = true; + in_post_maybe_compressed = std::make_unique(*in_post, /* allow_different_codecs_ = */ false, /* external_data_ = */ true); + is_in_post_compressed = true; } else in_post_maybe_compressed = std::move(in_post); @@ -845,7 +845,7 @@ void HTTPHandler::processQuery( /// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on, /// checksums of client data compressed with internal algorithm are not checked. - if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress) + if (is_in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress) static_cast(*in_post_maybe_compressed).disableChecksumming(); /// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin diff --git a/src/Storages/StorageAzureBlob.cpp b/src/Storages/StorageAzureBlob.cpp index e98eaf1e8f2..365f93cc324 100644 --- a/src/Storages/StorageAzureBlob.cpp +++ b/src/Storages/StorageAzureBlob.cpp @@ -254,6 +254,10 @@ AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(const ContextPt auto settings_ptr = std::make_unique(); settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size; settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries; + settings_ptr->strict_upload_part_size = context_settings.azure_strict_upload_part_size; + settings_ptr->max_upload_part_size = context_settings.azure_max_upload_part_size; + settings_ptr->max_blocks_in_multipart_upload = context_settings.azure_max_blocks_in_multipart_upload; + settings_ptr->min_upload_part_size = context_settings.azure_min_upload_part_size; settings_ptr->list_object_keys_size = static_cast(context_settings.azure_list_object_keys_size); return settings_ptr; diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 48389dccf48..f0c5103d657 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -53,8 +54,13 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; extern const int CANNOT_RESTORE_TABLE; extern const int NOT_IMPLEMENTED; + extern const int FAULT_INJECTED; } +namespace FailPoints +{ + extern const char stripe_log_sink_write_fallpoint[]; +} /// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading, /// because we read ranges of data that do not change. @@ -234,6 +240,11 @@ public: /// Save the new indices. storage.saveIndices(lock); + // While executing save file sizes the exception might occurs. S3::TooManyRequests for example. + fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint, + { + throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table"); + }); /// Save the new file sizes. storage.saveFileSizes(lock); diff --git a/tests/integration/test_azure_blob_storage_native_copy/test.py b/tests/integration/test_azure_blob_storage_native_copy/test.py index 4f543e4c8b2..77d400240b1 100644 --- a/tests/integration/test_azure_blob_storage_native_copy/test.py +++ b/tests/integration/test_azure_blob_storage_native_copy/test.py @@ -110,6 +110,11 @@ def cluster(): main_configs=[path], with_azurite=True, ) + cluster.add_instance( + "node3", + main_configs=[path], + with_azurite=True, + ) cluster.start() yield cluster @@ -216,3 +221,37 @@ def test_backup_restore_on_merge_tree_different_container(cluster): azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket") azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored") + + +def test_backup_restore_on_merge_tree_native_copy_async(cluster): + node3 = cluster.instances["node3"] + azure_query( + node3, + f"CREATE TABLE test_simple_merge_tree_async(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'", + ) + azure_query(node3, f"INSERT INTO test_simple_merge_tree_async VALUES (1, 'a')") + + backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_async_backup')" + print("BACKUP DEST", backup_destination) + azure_query( + node3, + f"BACKUP TABLE test_simple_merge_tree_async TO {backup_destination}", + settings={"azure_max_single_part_copy_size": 0}, + ) + + assert node3.contains_in_log("using native copy") + + azure_query( + node3, + f"RESTORE TABLE test_simple_merge_tree_async AS test_simple_merge_tree_async_restored FROM {backup_destination};", + settings={"azure_max_single_part_copy_size": 0}, + ) + assert ( + azure_query(node3, f"SELECT * from test_simple_merge_tree_async_restored") + == "1\ta\n" + ) + + assert node3.contains_in_log("using native copy") + + azure_query(node3, f"DROP TABLE test_simple_merge_tree_async") + azure_query(node3, f"DROP TABLE test_simple_merge_tree_async_restored") diff --git a/tests/integration/test_backup_restore_azure_blob_storage/test.py b/tests/integration/test_backup_restore_azure_blob_storage/test.py index 1a1458cb68e..78b186e3227 100644 --- a/tests/integration/test_backup_restore_azure_blob_storage/test.py +++ b/tests/integration/test_backup_restore_azure_blob_storage/test.py @@ -281,7 +281,10 @@ def test_backup_restore_on_merge_tree(cluster): node = cluster.instances["node"] azure_query( node, - f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'", + f""" + DROP TABLE IF EXISTS test_simple_merge_tree; + CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy' + """, ) azure_query(node, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')") @@ -299,3 +302,85 @@ def test_backup_restore_on_merge_tree(cluster): ) azure_query(node, f"DROP TABLE test_simple_merge_tree") azure_query(node, f"DROP TABLE test_simple_merge_tree_restored") + + +def test_backup_restore_correct_block_ids(cluster): + node = cluster.instances["node"] + azure_query( + node, + f""" + DROP TABLE IF EXISTS test_simple_merge_tree; + CREATE TABLE test_simple_merge_tree(key UInt64, data String) + Engine = MergeTree() + ORDER BY tuple() + SETTINGS storage_policy='blob_storage_policy'""", + ) + data_query = "SELECT number, repeat('a', 100) FROM numbers(1000)" + azure_query( + node, + f"INSERT INTO test_simple_merge_tree {data_query}", + ) + + for min_upload_size, max_upload_size, max_blocks, expected_block_size in [ + (42, 100, 1000, 42), + (42, 52, 86, 52), + ]: + data_path = f"test_backup_correct_block_ids_{max_blocks}" + + backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{data_path}')" + azure_query( + node, + f""" + SET azure_min_upload_part_size = {min_upload_size}; + SET azure_max_upload_part_size = {max_upload_size}; + SET azure_max_blocks_in_multipart_upload = {max_blocks}; + BACKUP TABLE test_simple_merge_tree TO {backup_destination} SETTINGS allow_azure_native_copy = 0; + """, + ) + + port = cluster.azurite_port + connection_string = ( + f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;" + f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;" + ) + container_name = "cont" + blob_service_client = BlobServiceClient.from_connection_string( + connection_string + ) + container_client = blob_service_client.get_container_client(container_name) + blobs = container_client.list_blobs() + + data_blob = ( + f"{data_path}/data/default/test_simple_merge_tree/all_1_1_0/data.bin" + ) + found = False + for blob in blobs: + if data_blob == blob.get("name"): + found = True + break + assert found + + blob_client = blob_service_client.get_blob_client( + blob=data_blob, container=container_name + ) + + blocks_num = len(blob_client.get_block_list()[0]) + assert blocks_num > 50 + + count = 0 + for block in blob_client.get_block_list()[0]: + count += 1 + if count < blocks_num: + assert block.get("size") == expected_block_size + else: + assert block.get("size") < expected_block_size + + azure_query( + node, + f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored_{max_blocks} FROM {backup_destination};", + ) + assert azure_query( + node, + f"SELECT * from test_simple_merge_tree_restored_{max_blocks} ORDER BY key", + ) == node.query(data_query) diff --git a/tests/queries/0_stateless/00694_max_block_size_zero.reference b/tests/integration/test_lazy_database/__init__.py similarity index 100% rename from tests/queries/0_stateless/00694_max_block_size_zero.reference rename to tests/integration/test_lazy_database/__init__.py diff --git a/tests/integration/test_log_family_s3/configs/minio.xml b/tests/integration/test_lazy_database/configs/storage_policy.xml similarity index 100% rename from tests/integration/test_log_family_s3/configs/minio.xml rename to tests/integration/test_lazy_database/configs/storage_policy.xml diff --git a/tests/integration/test_lazy_database/test.py b/tests/integration/test_lazy_database/test.py new file mode 100644 index 00000000000..6890aa87374 --- /dev/null +++ b/tests/integration/test_lazy_database/test.py @@ -0,0 +1,88 @@ +import logging +import time +import pytest +import os +from helpers.cluster import ClickHouseCluster + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + cluster.add_instance( + "node", + main_configs=["configs/storage_policy.xml"], + with_minio=True, + ) + logging.info("Starting cluster...") + cluster.start() + logging.info("Cluster started") + + yield cluster + finally: + cluster.shutdown() + + +def assert_objects_count(cluster, objects_count, path="data/"): + minio = cluster.minio_client + s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True)) + if objects_count != len(s3_objects): + for s3_object in s3_objects: + object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name) + logging.info("Existing S3 object: %s", str(object_meta)) + assert objects_count == len(s3_objects) + + +def list_of_files_on_ch_disk(node, disk, path): + disk_path = node.query( + f"SELECT path FROM system.disks WHERE name='{disk}'" + ).splitlines()[0] + return node.exec_in_container( + ["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root" + ) + + +@pytest.mark.parametrize( + "engine", + [ + pytest.param("Log"), + ], +) +@pytest.mark.parametrize( + "disk,check_s3", + [ + pytest.param("default", False), + pytest.param("s3", True), + ], +) +@pytest.mark.parametrize( + "delay", + [ + pytest.param(0), + pytest.param(4), + ], +) +def test_drop_table(cluster, engine, disk, check_s3, delay): + node = cluster.instances["node"] + + node.query("DROP DATABASE IF EXISTS lazy") + node.query("CREATE DATABASE lazy ENGINE=Lazy(2)") + node.query( + "CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format( + engine, + disk, + ) + ) + + node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)") + assert node.query("SELECT count(*) FROM lazy.table") == "10\n" + if delay: + time.sleep(delay) + node.query("DROP TABLE lazy.table SYNC") + + if check_s3: + # There mustn't be any orphaned data + assert_objects_count(cluster, 0) + + # Local data must be removed + assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == "" diff --git a/tests/integration/test_log_family_s3/configs/storage_configuration.xml b/tests/integration/test_log_family_s3/configs/storage_configuration.xml new file mode 100644 index 00000000000..d479a59b197 --- /dev/null +++ b/tests/integration/test_log_family_s3/configs/storage_configuration.xml @@ -0,0 +1,34 @@ + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + true + + 1 + 0 + 1 + 20000 + + + + + +
+ s3_no_retries +
+
+
+
+
+
diff --git a/tests/integration/test_log_family_s3/test.py b/tests/integration/test_log_family_s3/test.py index bed379d098b..ed84bdf48e6 100644 --- a/tests/integration/test_log_family_s3/test.py +++ b/tests/integration/test_log_family_s3/test.py @@ -11,7 +11,7 @@ def cluster(): cluster = ClickHouseCluster(__file__) cluster.add_instance( "node", - main_configs=["configs/minio.xml", "configs/ssl.xml"], + main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"], with_minio=True, ) logging.info("Starting cluster...") @@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i assert_objects_count(cluster, 0) finally: node.query("DROP TABLE s3_test") + + +# Imitate case when error occurs while inserting into table. +# For examle S3::TooManyRequests. +# In that case we can update data file, but not the size file. +# So due to exception we should do truncate of the data file to undo the insert query. +# See FileChecker::repair(). +def test_stripe_log_truncate(cluster): + node = cluster.instances["node"] + + node.query( + """ + CREATE TABLE stripe_table ( + a int + ) ENGINE = StripeLog() + SETTINGS storage_policy='s3_no_retries' + """ + ) + + node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint") + node.query( + """ + INSERT INTO stripe_table SELECT number FROM numbers(10) + """, + ignore_error=True, + ) + node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint") + node.query("SELECT count(*) FROM stripe_table") == "0\n" + node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)") + node.query("SELECT count(*) FROM stripe_table") == "10\n" + + # Make sure that everything is okey with the table after restart. + node.query("DETACH TABLE stripe_table") + node.query("ATTACH TABLE stripe_table") + + assert node.query("DROP TABLE stripe_table") == "" diff --git a/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters.xml b/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters.xml index d3a9d4fb8f0..c8bbb7f3530 100644 --- a/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters.xml +++ b/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters.xml @@ -19,4 +19,4 @@ 01
- \ No newline at end of file + diff --git a/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_unusual.xml b/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_zk_path.xml similarity index 80% rename from tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_unusual.xml rename to tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_zk_path.xml index 812291335b8..ba13cd87031 100644 --- a/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_unusual.xml +++ b/tests/integration/test_modify_engine_on_restart/configs/config.d/clusters_zk_path.xml @@ -15,6 +15,6 @@ 01 -/lol/kek/'/{uuid} +/clickhouse/'/{database}/{table}/{uuid} diff --git a/tests/integration/test_modify_engine_on_restart/test_unusual_path.py b/tests/integration/test_modify_engine_on_restart/test_unusual_path.py index e82f48e8b34..20d2c29257b 100644 --- a/tests/integration/test_modify_engine_on_restart/test_unusual_path.py +++ b/tests/integration/test_modify_engine_on_restart/test_unusual_path.py @@ -6,7 +6,7 @@ cluster = ClickHouseCluster(__file__) ch1 = cluster.add_instance( "ch1", main_configs=[ - "configs/config.d/clusters_unusual.xml", + "configs/config.d/clusters_zk_path.xml", "configs/config.d/distributed_ddl.xml", ], with_zookeeper=True, @@ -63,7 +63,7 @@ def check_tables(): ) .strip() .startswith( - "ReplicatedReplacingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', D)" + "ReplicatedReplacingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', D)" ) ) assert ( @@ -73,7 +73,7 @@ def check_tables(): ) .strip() .startswith( - "ReplicatedVersionedCollapsingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', Sign, Version)" + "ReplicatedVersionedCollapsingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', Sign, Version)" ) ) diff --git a/tests/integration/test_modify_engine_on_restart/test_zk_path.py b/tests/integration/test_modify_engine_on_restart/test_zk_path.py new file mode 100644 index 00000000000..dd633ad0810 --- /dev/null +++ b/tests/integration/test_modify_engine_on_restart/test_zk_path.py @@ -0,0 +1,69 @@ +import pytest +from test_modify_engine_on_restart.common import ( + get_table_path, + set_convert_flags, +) +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +ch1 = cluster.add_instance( + "ch1", + main_configs=[ + "configs/config.d/clusters_zk_path.xml", + "configs/config.d/distributed_ddl.xml", + ], + with_zookeeper=True, + macros={"replica": "node1"}, + stay_alive=True, +) + +database_name = "modify_engine_zk_path" + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def q(node, query): + return node.query(database=database_name, sql=query) + + +def test_modify_engine_fails_if_zk_path_exists(started_cluster): + ch1.query("CREATE DATABASE " + database_name) + + q( + ch1, + "CREATE TABLE already_exists_1 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;", + ) + uuid = q( + ch1, + f"SELECT uuid FROM system.tables WHERE table = 'already_exists_1' and database = '{database_name}'", + ).strip("'[]\n") + + q( + ch1, + f"CREATE TABLE already_exists_2 ( A Int64, D Date, S String ) ENGINE ReplicatedMergeTree('/clickhouse/\\'/{database_name}/already_exists_1/{uuid}', 'r2') PARTITION BY toYYYYMM(D) ORDER BY A;", + ) + + set_convert_flags(ch1, database_name, ["already_exists_1"]) + + table_data_path = get_table_path(ch1, "already_exists_1", database_name) + + ch1.stop_clickhouse() + ch1.start_clickhouse(retry_start=False, expected_to_fail=True) + + # Check if we can cancel convertation + ch1.exec_in_container( + [ + "bash", + "-c", + f"rm {table_data_path}convert_to_replicated", + ] + ) + ch1.start_clickhouse() diff --git a/tests/queries/0_stateless/00694_max_block_size_zero.sql b/tests/queries/0_stateless/00694_max_block_size_zero.sql deleted file mode 100644 index ba5b513bb5d..00000000000 --- a/tests/queries/0_stateless/00694_max_block_size_zero.sql +++ /dev/null @@ -1,4 +0,0 @@ -SET send_logs_level = 'fatal'; - -SET max_block_size = 0; -SELECT number FROM system.numbers; -- { serverError 12 } diff --git a/tests/queries/0_stateless/02420_final_setting_analyzer.reference b/tests/queries/0_stateless/02420_final_setting_analyzer.reference index dd9fed65f13..780a6e5de68 100644 --- a/tests/queries/0_stateless/02420_final_setting_analyzer.reference +++ b/tests/queries/0_stateless/02420_final_setting_analyzer.reference @@ -132,3 +132,7 @@ SELECT * FROM merge_table ORDER BY id, val; 2 a 2 b 3 c +select sum(number) from numbers(10) settings final=1; +45 +select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1; +90 diff --git a/tests/queries/0_stateless/02420_final_setting_analyzer.sql b/tests/queries/0_stateless/02420_final_setting_analyzer.sql index 14c832cfaf5..cbdec017602 100644 --- a/tests/queries/0_stateless/02420_final_setting_analyzer.sql +++ b/tests/queries/0_stateless/02420_final_setting_analyzer.sql @@ -102,3 +102,6 @@ insert into table_to_merge_c values (3,'c'); -- expected output: -- 1 c, 2 a, 2 b, 3 c SELECT * FROM merge_table ORDER BY id, val; + +select sum(number) from numbers(10) settings final=1; +select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1; diff --git a/tests/queries/0_stateless/02477_single_value_data_string_regression.sql b/tests/queries/0_stateless/02477_single_value_data_string_regression.sql index 0f11a06f3fc..8499786f47a 100644 --- a/tests/queries/0_stateless/02477_single_value_data_string_regression.sql +++ b/tests/queries/0_stateless/02477_single_value_data_string_regression.sql @@ -103,11 +103,11 @@ SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x); SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x); -SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA } +SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError INCORRECT_DATA } SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA } -SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA } -SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA } +SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA } +SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA } +SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError INCORRECT_DATA } drop table if exists aggr; diff --git a/tests/queries/0_stateless/02534_keyed_siphash.reference b/tests/queries/0_stateless/02534_keyed_siphash.reference index e3fae07333a..3f478218ff1 100644 --- a/tests/queries/0_stateless/02534_keyed_siphash.reference +++ b/tests/queries/0_stateless/02534_keyed_siphash.reference @@ -236,3 +236,6 @@ Check asan bug 0 Check bug found fuzzing 9042C6691B1A75F0EA3314B6F55728BB +Check bug 2 found fuzzing +608E1FF030C9E206185B112C2A25F1A7 +ABB65AE97711A2E053E324ED88B1D08B diff --git a/tests/queries/0_stateless/02534_keyed_siphash.sql b/tests/queries/0_stateless/02534_keyed_siphash.sql index 112ae15bf46..fb707109c83 100644 --- a/tests/queries/0_stateless/02534_keyed_siphash.sql +++ b/tests/queries/0_stateless/02534_keyed_siphash.sql @@ -338,3 +338,10 @@ SELECT sipHash128((toUInt64(9223372036854775806), 1)) = sipHash128(1) GROUP BY s SELECT 'Check bug found fuzzing'; SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED } SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1]))); + +SELECT 'Check bug 2 found fuzzing'; +DROP TABLE IF EXISTS sipHashKeyed_keys; +CREATE TABLE sipHashKeyed_keys (`a` Map(String, String)) ENGINE = Memory; +INSERT INTO sipHashKeyed_keys FORMAT VALUES ({'a':'b', 'c':'d'}), ({'e':'f', 'g':'h'}); +SELECT hex(sipHash128ReferenceKeyed((0::UInt64, materialize(0::UInt64)), a)) FROM sipHashKeyed_keys ORDER BY a; +DROP TABLE sipHashKeyed_keys; diff --git a/tests/queries/0_stateless/02906_force_optimize_projection_name.reference b/tests/queries/0_stateless/02906_force_optimize_projection_name.reference index 9daeafb9864..679eff3f0b4 100644 --- a/tests/queries/0_stateless/02906_force_optimize_projection_name.reference +++ b/tests/queries/0_stateless/02906_force_optimize_projection_name.reference @@ -1 +1,3 @@ test +1 +0 diff --git a/tests/queries/0_stateless/02906_force_optimize_projection_name.sql b/tests/queries/0_stateless/02906_force_optimize_projection_name.sql index 952ef8178b7..6b9d7f74f9f 100644 --- a/tests/queries/0_stateless/02906_force_optimize_projection_name.sql +++ b/tests/queries/0_stateless/02906_force_optimize_projection_name.sql @@ -1,3 +1,5 @@ +DROP TABLE IF EXISTS test; + CREATE TABLE test ( `id` UInt64, @@ -18,3 +20,16 @@ SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='pro SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='non_existing_projection'; -- { serverError 117 } SELECT name FROM test SETTINGS force_optimize_projection_name='projection_name'; -- { serverError 117 } + +INSERT INTO test SELECT number, 'test' FROM numbers(1, 100) SETTINGS force_optimize_projection_name='projection_name'; +SELECT 1 SETTINGS force_optimize_projection_name='projection_name'; + +SYSTEM FLUSH LOGS; + +SELECT read_rows FROM system.query_log +WHERE current_database = currentDatabase() + AND query LIKE '%SELECT name FROM test%' + AND Settings['force_optimize_projection_name'] = 'projection_name' + AND type = 'ExceptionBeforeStart'; + +DROP TABLE test; diff --git a/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.reference b/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.reference new file mode 100644 index 00000000000..88d203bd723 --- /dev/null +++ b/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.reference @@ -0,0 +1,11 @@ +<-- Read UNIX endings --> + +Akiba_Hebrew_Academy 2017-08-01 241 +Aegithina_tiphia 2018-02-01 34 +1971-72_Utah_Stars_season 2016-10-01 1 + +<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 --> + +Akiba_Hebrew_Academy 2017-08-01 241 +Aegithina_tiphia 2018-02-01 34 +1971-72_Utah_Stars_season 2016-10-01 1 diff --git a/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.sh b/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.sh new file mode 100755 index 00000000000..14f28f1ba4a --- /dev/null +++ b/tests/queries/0_stateless/02973_parse_crlf_with_tsv_files.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +# Data preparation step +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') +UNIX_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_without_crlf.tsv" +DOS_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_with_crlf.tsv" +DATA_FILE_UNIX_ENDINGS="${USER_FILES_PATH:?}/${UNIX_ENDINGS}" +DATA_FILE_DOS_ENDINGS="${USER_FILES_PATH:?}/${DOS_ENDINGS}" + +touch $DATA_FILE_UNIX_ENDINGS +touch $DATA_FILE_DOS_ENDINGS + +echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\nAegithina_tiphia\t2018-02-01\t34\n1971-72_Utah_Stars_season\t2016-10-01\t1\n" > $DATA_FILE_UNIX_ENDINGS +echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\r\nAegithina_tiphia\t2018-02-01\t34\r\n1971-72_Utah_Stars_season\t2016-10-01\t1\r\n" > $DATA_FILE_DOS_ENDINGS + +echo -e "<-- Read UNIX endings -->\n" +$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${UNIX_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32');" +$CLICKHOUSE_CLIENT --multiquery --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32'); --{serverError 117}" + +echo -e "\n<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->\n" +$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32') SETTINGS input_format_tsv_crlf_end_of_line = 1;" + +# Test teardown +rm $DATA_FILE_UNIX_ENDINGS +rm $DATA_FILE_DOS_ENDINGS diff --git a/tests/queries/0_stateless/03144_compress_stdout.reference b/tests/queries/0_stateless/03144_compress_stdout.reference new file mode 100644 index 00000000000..6f51dfc24e1 --- /dev/null +++ b/tests/queries/0_stateless/03144_compress_stdout.reference @@ -0,0 +1,2 @@ +Hello, World! From client. +Hello, World! From local. diff --git a/tests/queries/0_stateless/03144_compress_stdout.sh b/tests/queries/0_stateless/03144_compress_stdout.sh new file mode 100755 index 00000000000..569754303a7 --- /dev/null +++ b/tests/queries/0_stateless/03144_compress_stdout.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz + +${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz +gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz +cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client + +rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client" + +[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz + +${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz +gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz +cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local + +rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local" diff --git a/tests/queries/0_stateless/03149_numbers_max_block_size_zero.reference b/tests/queries/0_stateless/03149_numbers_max_block_size_zero.reference new file mode 100644 index 00000000000..d86bac9de59 --- /dev/null +++ b/tests/queries/0_stateless/03149_numbers_max_block_size_zero.reference @@ -0,0 +1 @@ +OK diff --git a/tests/queries/0_stateless/03149_numbers_max_block_size_zero.sh b/tests/queries/0_stateless/03149_numbers_max_block_size_zero.sh new file mode 100755 index 00000000000..6f70a0d2536 --- /dev/null +++ b/tests/queries/0_stateless/03149_numbers_max_block_size_zero.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "SELECT count(*) FROM numbers(10) AS a, numbers(11) AS b, numbers(12) AS c SETTINGS max_block_size = 0" 2>&1 | grep -q "Sanity check: 'max_block_size' cannot be 0. Set to default value" && echo "OK" || echo "FAIL" diff --git a/utils/check-style/check-large-objects.sh b/utils/check-style/check-large-objects.sh index e2266e89556..2122cca911e 100755 --- a/utils/check-style/check-large-objects.sh +++ b/utils/check-style/check-large-objects.sh @@ -7,8 +7,6 @@ export LC_ALL=C # The "total" should be printed without localization TU_EXCLUDES=( AggregateFunctionUniq Aggregator - # FIXME: Exclude for now - FunctionsConversion ) if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \