Merge remote-tracking branch 'blessed/master' into ifunction_column_size

This commit is contained in:
Raúl Marín 2024-05-22 17:37:43 +02:00
commit 72604ab940
97 changed files with 1288 additions and 252 deletions

View File

@ -68,8 +68,9 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
set (RLIMIT_AS 20000000000)
endif()
# For some files currently building RISCV64 might be too slow. TODO: Improve compilation times per file
if (ARCH_RISCV64)
# For some files currently building RISCV64/LOONGARCH64 might be too slow.
# TODO: Improve compilation times per file
if (ARCH_RISCV64 OR ARCH_LOONGARCH64)
set (RLIMIT_CPU 1800)
endif()

View File

@ -197,6 +197,7 @@ SELECT * FROM nestedt FORMAT TSV
- [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`.
- [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`.
- [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV input format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`.
- [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`.
- [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.

View File

@ -831,7 +831,13 @@ Default value: `0`.
### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line}
Use DOC/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Use DOS/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Disabled by default.
### input_format_tsv_crlf_end_of_line {#input_format_tsv_crlf_end_of_line}
Use DOS/Windows-style line separator (CRLF) for TSV input files instead of Unix style (LF).
Disabled by default.

View File

@ -998,17 +998,170 @@ SELECT version()
Returns the build ID generated by a compiler for the running ClickHouse server binary.
If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise it produces a constant value.
## blockNumber()
## blockNumber
Returns the sequence number of the data block where the row is located.
Returns a monotonically increasing sequence number of the [block](../../development/architecture.md#block) containing the row.
The returned block number is updated on a best-effort basis, i.e. it may not be fully accurate.
## rowNumberInBlock() {#rowNumberInBlock}
**Syntax**
```sql
blockNumber()
```
**Returned value**
- Sequence number of the data block where the row is located. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT blockNumber()
FROM
(
SELECT *
FROM system.numbers
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─blockNumber()─┐
│ 7 │
│ 7 │
└───────────────┘
┌─blockNumber()─┐
│ 8 │
│ 8 │
└───────────────┘
┌─blockNumber()─┐
│ 9 │
│ 9 │
└───────────────┘
┌─blockNumber()─┐
│ 10 │
│ 10 │
└───────────────┘
┌─blockNumber()─┐
│ 11 │
│ 11 │
└───────────────┘
```
## rowNumberInBlock {#rowNumberInBlock}
Returns for each [block](../../development/architecture.md#block) processed by `rowNumberInBlock` the number of the current row.
The returned number starts for each block at 0.
**Syntax**
```sql
rowNumberInBlock()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInBlock()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
) SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
┌─rowNumberInBlock()─┐
│ 0 │
│ 1 │
└────────────────────┘
```
## rowNumberInAllBlocks
Returns a unique row number for each row processed by `rowNumberInAllBlocks`. The returned numbers start at 0.
**Syntax**
```sql
rowNumberInAllBlocks()
```
**Returned value**
- Ordinal number of the row in the data block starting from 0. [UInt64](../data-types/int-uint.md).
**Example**
Query:
```sql
SELECT rowNumberInAllBlocks()
FROM
(
SELECT *
FROM system.numbers_mt
LIMIT 10
)
SETTINGS max_block_size = 2
```
Result:
```response
┌─rowNumberInAllBlocks()─┐
│ 0 │
│ 1 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 4 │
│ 5 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 2 │
│ 3 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 6 │
│ 7 │
└────────────────────────┘
┌─rowNumberInAllBlocks()─┐
│ 8 │
│ 9 │
└────────────────────────┘
```
Returns the ordinal number of the row in the data block. Different data blocks are always recalculated.
## rowNumberInAllBlocks()
Returns the ordinal number of the row in the data block. This function only considers the affected data blocks.
## neighbor

View File

@ -119,6 +119,7 @@ Hello\nworld
Hello\
world
```
`\n\r` (CRLF) поддерживается с помощью настройки `input_format_tsv_crlf_end_of_line`.
Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа.

View File

@ -1178,7 +1178,7 @@ void Client::processConfig()
pager = config().getString("pager", "");
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial();

View File

@ -607,7 +607,7 @@ void LocalServer::processConfig()
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
/// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config());

View File

@ -14,7 +14,7 @@ struct Settings;
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int INCORRECT_DATA;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}
@ -198,7 +198,7 @@ public:
this->data(place).value().read(buf, *serialization_val, arena);
if (unlikely(this->data(place).value().has() != this->data(place).result().has()))
throw Exception(
ErrorCodes::CORRUPTED_DATA,
ErrorCodes::INCORRECT_DATA,
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
getName(),
this->data(place).value().has(),

View File

@ -42,7 +42,7 @@ private:
return;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
bool is_final_supported = storage && storage->supportsFinal();
bool is_final_supported = storage && !storage->isRemote() && storage->supportsFinal();
if (!is_final_supported)
return;

View File

@ -192,7 +192,7 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node)
void QueryTreePassManager::runOnlyResolve(QueryTreeNodePtr query_tree_node)
{
// Run only QueryAnalysisPass and GroupingFunctionsResolvePass passes.
run(query_tree_node, 2);
run(query_tree_node, 3);
}
void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index)
@ -249,6 +249,7 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
{
manager.addPass(std::make_unique<QueryAnalysisPass>(only_analyze));
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<RemoveUnusedProjectionColumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
@ -294,7 +295,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<LogicalExpressionOptimizerPass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());

View File

@ -21,6 +21,7 @@
#include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h>
#include <Common/NetException.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/FormatFactory.h>
@ -643,6 +644,9 @@ try
bool extras_into_stdout = need_render_progress || logs_into_stdout;
bool select_only_into_file = select_into_file && !select_into_file_and_stdout;
if (!out_file_buf && default_output_compression_method != CompressionMethod::None)
out_file_buf = wrapWriteBufferWithCompressionMethod(out_buf, default_output_compression_method, 3, 0);
/// It is not clear how to write progress and logs
/// intermixed with data with parallel formatting.
/// It may increase code complexity significantly.
@ -735,7 +739,7 @@ bool ClientBase::isRegularFile(int fd)
return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
void ClientBase::setDefaultFormatsFromConfiguration()
void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
{
if (config().has("output-format"))
{
@ -759,6 +763,10 @@ void ClientBase::setDefaultFormatsFromConfiguration()
default_output_format = *format_from_file_name;
else
default_output_format = "TSV";
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(STDOUT_FILENO);
if (file_name)
default_output_compression_method = chooseCompressionMethod(*file_name, "");
}
else if (is_interactive)
{

View File

@ -190,7 +190,7 @@ protected:
/// Adjust some settings after command line options and config had been processed.
void adjustSettings();
void setDefaultFormatsFromConfiguration();
void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress);
@ -224,6 +224,7 @@ protected:
String pager;
String default_output_format; /// Query results output format.
CompressionMethod default_output_compression_method = CompressionMethod::None;
String default_input_format; /// Tables' format for clickhouse-local.
bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering.

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
extern const int LOGICAL_ERROR;
}
@ -433,7 +434,7 @@ void AsyncLoader::wait(const LoadJobPtr & job, bool no_throw)
std::unique_lock job_lock{job->mutex};
wait(job_lock, job);
if (!no_throw && job->load_exception)
std::rethrow_exception(job->load_exception);
throw Exception(ErrorCodes::ASYNC_LOAD_WAIT_FAILED, "Waited job failed: {}", getExceptionMessage(job->load_exception, /* with_stacktrace = */ false));
}
void AsyncLoader::remove(const LoadJobSet & jobs)

View File

@ -600,6 +600,7 @@
M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \
M(720, USER_EXPIRED) \
M(721, DEPRECATED_FUNCTION) \
M(722, ASYNC_LOAD_WAIT_FAILED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -40,6 +40,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(replicated_sends_failpoint) \
REGULAR(stripe_log_sink_write_fallpoint)\
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \
@ -58,6 +59,7 @@ static struct InitFiu
ONCE(execute_query_calling_empty_set_result_func_on_exception) \
ONCE(receive_timeout_on_table_status_response)
namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";

View File

@ -35,6 +35,7 @@ namespace DB::ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
}
struct Initializer {
@ -262,7 +263,8 @@ TEST(AsyncLoader, CancelPendingJob)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -288,7 +290,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -298,7 +301,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -325,7 +329,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -335,7 +340,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -451,8 +457,9 @@ TEST(AsyncLoader, JobFailure)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains(error_message));
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_FAILED"));
}
}
@ -489,8 +496,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
try
{
@ -499,8 +507,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
}
@ -531,7 +540,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
{
@ -540,7 +550,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}

View File

@ -0,0 +1,33 @@
#include <Common/tryGetFileNameByFileDescriptor.h>
#ifdef OS_LINUX
# include <unistd.h>
#elif defined(OS_DARWIN)
# include <fcntl.h>
#endif
#include <fmt/format.h>
namespace DB
{
std::optional<String> tryGetFileNameFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return file_path;
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return file_path;
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <optional>
#include <base/types.h>
namespace DB
{
/// Supports only Linux/MacOS. On other platforms, returns nullopt.
std::optional<String> tryGetFileNameFromFileDescriptor(int fd);
}

View File

@ -80,6 +80,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
@ -1078,6 +1079,7 @@ class IColumn;
M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
\
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
\

View File

@ -87,12 +87,14 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
{"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."},
{"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."},
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},
{"http_max_chunk_size", 0, 0, "Internal limitation"},
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -92,7 +92,7 @@ void applySettingsQuirks(Settings & settings, LoggerPtr log)
void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
{
auto getCurrentValue = [&current_settings](const std::string_view name) -> Field
auto get_current_value = [&current_settings](const std::string_view name) -> Field
{
Field current_value;
bool has_current_value = current_settings.tryGet(name, current_value);
@ -100,7 +100,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
return current_value;
};
UInt64 max_threads = getCurrentValue("max_threads").get<UInt64>();
UInt64 max_threads = get_current_value("max_threads").get<UInt64>();
UInt64 max_threads_max_value = 256 * getNumberOfPhysicalCPUCores();
if (max_threads > max_threads_max_value)
{
@ -109,7 +109,7 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
current_settings.set("max_threads", max_threads_max_value);
}
constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32
static constexpr UInt64 max_sane_block_rows_size = 4294967296; // 2^32
std::unordered_set<String> block_rows_settings{
"max_block_size",
"max_insert_block_size",
@ -120,13 +120,21 @@ void doSettingsSanityCheckClamp(Settings & current_settings, LoggerPtr log)
"input_format_parquet_max_block_size"};
for (auto const & setting : block_rows_settings)
{
auto block_size = getCurrentValue(setting).get<UInt64>();
if (block_size > max_sane_block_rows_size)
if (auto block_size = get_current_value(setting).get<UInt64>();
block_size > max_sane_block_rows_size)
{
if (log)
LOG_WARNING(log, "Sanity check: '{}' value is too high ({}). Reduced to {}", setting, block_size, max_sane_block_rows_size);
current_settings.set(setting, max_sane_block_rows_size);
}
}
if (auto max_block_size = get_current_value("max_block_size").get<UInt64>(); max_block_size == 0)
{
if (log)
LOG_WARNING(log, "Sanity check: 'max_block_size' cannot be 0. Set to default value {}", DEFAULT_BLOCK_SIZE);
current_settings.set("max_block_size", DEFAULT_BLOCK_SIZE);
}
}
}

View File

@ -146,10 +146,10 @@ void SerializationAggregateFunction::serializeTextEscaped(const IColumn & column
}
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readEscapedString(s, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr);
deserializeFromString(function, column, s, version);
}

View File

@ -242,8 +242,10 @@ void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & is
{
if (istr.eof())
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF.");
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
if (settings.tsv.crlf_end_of_line_input)
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || *buf.position() == '\r'; });
else
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
}
bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const

View File

@ -75,7 +75,7 @@ void SerializationCustomSimpleText::serializeTextEscaped(const IColumn & column,
void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String str;
readEscapedString(str, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(str, istr) : readEscapedString(str, istr);
deserializeFromString(*this, column, str, settings);
}

View File

@ -29,7 +29,7 @@ void SerializationEnum<Type>::deserializeTextEscaped(IColumn & column, ReadBuffe
{
/// NOTE It would be nice to do without creating a temporary object - at least extract std::string out.
std::string field_name;
readEscapedString(field_name, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field_name, istr) : readEscapedString(field_name, istr);
assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true));
}
}

View File

@ -10,8 +10,10 @@
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include "Common/PODArray.h"
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "base/types.h"
namespace DB
{
@ -183,14 +185,17 @@ static inline bool tryRead(const SerializationFixedString & self, IColumn & colu
}
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); });
read(*this, column, [&istr, &settings](ColumnFixedString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<ColumnFixedString::Chars,true>(data, istr) : readEscapedStringInto<ColumnFixedString::Chars,false>(data, istr);
});
}
bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr); return true; });
}

View File

@ -286,7 +286,7 @@ bool SerializationNullable::tryDeserializeNullRaw(DB::ReadBuffer & istr, const D
}
template<typename ReturnType, bool escaped>
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
@ -319,10 +319,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Check if we have enough data in buffer to check if it's a null.
if (istr.available() > null_representation.size())
{
auto check_for_null = [&null_representation](ReadBuffer & buf)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf)
{
auto * pos = buf.position();
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n'))
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.position() = pos;
return false;
@ -334,14 +334,14 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Use PeekableReadBuffer to make a checkpoint before checking null
/// representation and rollback if check was failed.
PeekableReadBuffer peekable_buf(istr, true);
auto check_for_null = [&null_representation](ReadBuffer & buf_)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf_)
{
auto & buf = assert_cast<PeekableReadBuffer &>(buf_);
buf.setCheckpoint();
SCOPE_EXIT(buf.dropCheckpoint());
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'))
return true;
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.rollbackToCheckpoint();
return false;
};
@ -371,7 +371,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\t' or '\\n' may not work correctly for large input.");
"containing '\\t' or '\\n' may not work correctly for large input.");
if (settings.tsv.crlf_end_of_line_input && null_representation.find('\r') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\r' may not work correctly for large input.");
WriteBufferFromOwnString parsed_value;
if constexpr (escaped)

View File

@ -104,9 +104,9 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
}
template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); });
deserializeTextImpl(column, [&](String & s) { settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); });
}
template <typename Parser>

View File

@ -147,7 +147,6 @@ void SerializationString::serializeBinaryBulk(const IColumn & column, WriteBuffe
}
}
template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
{
@ -324,14 +323,17 @@ bool SerializationString::tryDeserializeWholeText(IColumn & column, ReadBuffer &
return read<bool>(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; });
}
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read<void>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); });
read<void>(column, [&](ColumnString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr) : readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr);
});
}
bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr); return true; });
}
void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -599,14 +599,14 @@ void SerializationVariant::serializeTextEscaped(const IColumn & column, size_t r
bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
return tryDeserializeTextEscapedImpl(column, field, settings);
}
void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
if (!tryDeserializeTextEscapedImpl(column, field, settings))
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field);
}

View File

@ -5,6 +5,7 @@
#include <span>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
StoragePtr table = detachTable(local_context, table_name);
/// This is possible for Lazy database.
if (!table)
return;
bool renamed = false;
try
{
fs::rename(table_metadata_path, table_metadata_path_drop);
renamed = true;
table->drop();
table->is_dropped = true;
fs::path table_data_dir(local_context->getPath() + table_data_path_relative);
if (fs::exists(table_data_dir))
(void)fs::remove_all(table_data_dir);
// The table might be not loaded for Lazy database engine.
if (table)
{
table->drop();
table->is_dropped = true;
}
}
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
attachTable(local_context, table_name, table, table_data_path_relative);
if (table)
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
throw;
}
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isReadOnly() || !disk->exists(table_data_path_relative))
continue;
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
disk->removeRecursive(table_data_path_relative);
}
(void)fs::remove(table_metadata_path_drop);
}

View File

@ -76,6 +76,20 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex
String replica_path = server_settings.default_replica_path;
String replica_name = server_settings.default_replica_name;
/// Check that replica path doesn't exist
Macros::MacroExpansionInfo info;
StorageID table_id = StorageID(create_query->getDatabase(), create_query->getTable(), create_query->uuid);
info.table_id = table_id;
info.expand_special_macros_only = false;
String zookeeper_path = context->getMacros()->expand(replica_path, info);
if (context->getZooKeeper()->exists(zookeeper_path))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Found existing ZooKeeper path {} while trying to convert table {} to replicated. Table will not be converted.",
zookeeper_path, backQuote(table_id.getFullTableName())
);
auto args = std::make_shared<ASTExpressionList>();
args->children.push_back(std::make_shared<ASTLiteral>(replica_path));
args->children.push_back(std::make_shared<ASTLiteral>(replica_name));

View File

@ -721,11 +721,10 @@ public:
if (!block.checkCheckSum())
{
std::string calculated_check_sum = std::to_string(block.calculateCheckSum());
std::string check_sum = std::to_string(block.getCheckSum());
std::string expected_check_sum = std::to_string(block.getCheckSum());
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Cache data corrupted. Checksum validation failed. Calculated {} in block {}",
calculated_check_sum,
check_sum);
"Cache data corrupted. Checksum validation failed. Calculated {} expected in block {}, in file {}",
calculated_check_sum, expected_check_sum, file_path);
}
func(blocks_to_fetch[block_to_fetch_index], block.getBlockData());

View File

@ -244,6 +244,13 @@ public:
return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings);
}
/// Truncate file to the target size.
void truncateFile(const std::string & src_path, size_t target_size) override
{
auto wrapped_path = wrappedPath(src_path);
delegate_transaction->truncateFile(wrapped_path, target_size);
}
private:

View File

@ -2,10 +2,16 @@
#include <Disks/IDiskTransaction.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Fake disk transaction implementation.
/// Just execute all operations immediately, commit is noop operation.
/// No support for atomicity and rollback.
@ -134,6 +140,11 @@ public:
disk.createHardLink(src_path, dst_path);
}
void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented");
}
private:
IDisk & disk;
};

View File

@ -128,6 +128,9 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
};
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;

View File

@ -257,6 +257,7 @@ std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Po
settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size);
settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size);
settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false);
settings->max_blocks_in_multipart_upload = config.getUInt64(config_prefix + ".max_blocks_in_multipart_upload", 50000);
settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries);
settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file);
settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size);

View File

@ -63,6 +63,7 @@ struct AzureObjectStorageSettings
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
size_t max_inflight_parts_for_one_file = 20;
size_t max_blocks_in_multipart_upload = 50000;
size_t strict_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;

View File

@ -133,6 +133,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::truncateFile(const String & path, size_t size)
{
LOG_TEST(log, "Truncate file operation {} to size : {}", path, size);
auto transaction = createObjectStorageTransaction();
transaction->truncateFile(path, size);
transaction->commit();
}
void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,

View File

@ -84,6 +84,8 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void truncateFile(const String & path, size_t size) override;
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
}
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()
{
if (keys_with_meta.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path);
ObjectKeyWithMetadata object = std::move(keys_with_meta.back());
keys_with_meta.pop_back();
total_size -= object.metadata.size_bytes;
return object;
}
bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD

View File

@ -52,6 +52,7 @@ public:
void addObject(ObjectStorageKey key, size_t size);
ObjectKeyWithMetadata popLastObject();
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);

View File

@ -559,6 +559,54 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
}
};
struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
size_t size;
TruncateFileOperationOutcomePtr truncate_outcome;
TruncateFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
size_t size_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, size(size_)
{}
std::string getInfoForLog() const override
{
return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size);
}
void execute(MetadataTransactionPtr tx) override
{
if (metadata_storage.exists(path))
{
if (!metadata_storage.isFile(path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path);
truncate_outcome = tx->truncateFile(path, size);
}
}
void undo() override
{
}
void finalize() override
{
if (!truncate_outcome)
return;
if (!truncate_outcome->objects_to_remove.empty())
object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove);
}
};
}
void DiskObjectStorageTransaction::createDirectory(const std::string & path)
@ -598,6 +646,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
}));
}
void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size)
{
operations_to_execute.emplace_back(
std::make_unique<TruncateFileObjectStorageOperation>(object_storage, metadata_storage, path, size)
);
}
void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{
auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path);

View File

@ -92,6 +92,8 @@ public:
void createFile(const String & path) override;
void truncateFile(const String & path, size_t size) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
/// writeFile is a difficult function for transactions.

View File

@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome
UInt32 num_hardlinks = std::numeric_limits<UInt32>::max();
};
struct TruncateFileOperationOutcome
{
StoredObjects objects_to_remove;
};
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
using TruncateFileOperationOutcomePtr = std::shared_ptr<TruncateFileOperationOutcome>;
/// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide
@ -143,6 +151,11 @@ public:
return nullptr;
}
virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */)
{
throwNotImplemented();
}
virtual ~IMetadataTransaction() = default;
protected:

View File

@ -259,4 +259,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink
return result;
}
TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size)
{
auto operation = std::make_unique<TruncateMetadataFileOperation>(path, target_size, metadata_storage, *metadata_storage.getDisk());
auto result = operation->outcome;
addOperation(std::move(operation));
return result;
}
}

View File

@ -129,6 +129,8 @@ public:
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override;
};

View File

@ -4,9 +4,12 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <optional>
#include <ranges>
#include <filesystem>
#include <utility>
namespace fs = std::filesystem;
@ -14,6 +17,11 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString(32);
@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
outcome->num_hardlinks++;
}
void TruncateMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
if (metadata_storage.exists(path))
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
while (metadata->getTotalSizeBytes() > target_size)
{
auto object_key_with_metadata = metadata->popLastObject();
outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes);
}
if (metadata->getTotalSizeBytes() != target_size)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size);
}
LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size());
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
write_operation->execute(metadata_lock);
}
}
void TruncateMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
{
if (write_operation)
write_operation->undo(lock);
}
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);

View File

@ -282,4 +282,34 @@ private:
std::unique_ptr<WriteFileOperation> write_operation;
};
struct TruncateMetadataFileOperation final : public IMetadataOperation
{
const TruncateFileOperationOutcomePtr outcome = std::make_shared<TruncateFileOperationOutcome>();
TruncateMetadataFileOperation(
const std::string & path_,
size_t target_size_,
const MetadataStorageFromDisk & metadata_storage_,
IDisk & disk_)
: path(path_)
, target_size(target_size_)
, metadata_storage(metadata_storage_)
, disk(disk_)
{
}
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo(std::unique_lock<SharedMutex> & lock) override;
private:
std::string path;
size_t target_size;
const MetadataStorageFromDisk & metadata_storage;
IDisk & disk;
std::unique_ptr<WriteFileOperation> write_operation;
};
}

View File

@ -306,7 +306,6 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
bool /* skip_access_check */) -> ObjectStoragePtr
{
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
std::string endpoint_string = endpoint.getEndpoint();
return createObjectStorage<AzureObjectStorage>(
ObjectStorageType::Azure, config, config_prefix, name,

View File

@ -76,7 +76,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
/// Empty field, just skip spaces
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedStringInto(out, buf);
readEscapedStringInto<NullOutput,false>(out, buf);
break;
case FormatSettings::EscapingRule::Quoted:
readQuotedFieldInto(out, buf);

View File

@ -1,6 +1,7 @@
#include <Formats/FormatFactory.h>
#include <algorithm>
#include <unistd.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
@ -15,7 +16,7 @@
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/KnownObjectNames.h>
#include <unistd.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -202,6 +203,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header;
format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines;
format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns;
format_settings.tsv.crlf_end_of_line_input = settings.input_format_tsv_crlf_end_of_line;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
@ -693,21 +695,12 @@ String FormatFactory::getFormatFromFileName(String file_name)
std::optional<String> FormatFactory::tryGetFormatFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return tryGetFormatFromFileName(file_path);
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(fd);
if (file_name)
return tryGetFormatFromFileName(*file_name);
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return tryGetFormatFromFileName(file_path);
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
String FormatFactory::getFormatFromFileDescriptor(int fd)

View File

@ -361,6 +361,7 @@ struct FormatSettings
bool try_detect_header = true;
bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false;
bool crlf_end_of_line_input = false;
} tsv{};
struct

View File

@ -31,7 +31,7 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} DIRECTORY .. PROPERTIES COMPILE_FLAGS "-g0")
endif()
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)

View File

@ -49,6 +49,8 @@
#include <base/bit_cast.h>
#include <base/unaligned.h>
#include <algorithm>
namespace DB
{
@ -75,17 +77,29 @@ namespace impl
ColumnPtr key0;
ColumnPtr key1;
bool is_const;
const ColumnArray::Offsets * offsets{};
size_t size() const
{
assert(key0 && key1);
assert(key0->size() == key1->size());
assert(offsets == nullptr || offsets->size() == key0->size());
if (offsets != nullptr)
return offsets->back();
return key0->size();
}
SipHashKey getKey(size_t i) const
{
if (is_const)
i = 0;
if (offsets != nullptr)
{
const auto *const begin = offsets->begin();
const auto * upper = std::upper_bound(begin, offsets->end(), i);
if (upper == offsets->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "offset {} not found in function SipHashKeyColumns::getKey", i);
i = upper - begin;
}
const auto & key0data = assert_cast<const ColumnUInt64 &>(*key0).getData();
const auto & key1data = assert_cast<const ColumnUInt64 &>(*key1).getData();
return {key0data[i], key1data[i]};
@ -1112,7 +1126,15 @@ private:
typename ColumnVector<ToType>::Container vec_temp(nested_size);
bool nested_is_first = true;
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
if constexpr (Keyed)
{
KeyColumnsType key_cols_tmp{key_cols};
key_cols_tmp.offsets = &offsets;
executeForArgument(key_cols_tmp, nested_type, nested_column, vec_temp, nested_is_first);
}
else
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
const size_t size = offsets.size();

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
@ -94,11 +95,56 @@ namespace
void calculatePartSize()
{
auto max_upload_part_size = settings->max_upload_part_size;
if (!max_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0");
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
auto max_part_number = settings->max_blocks_in_multipart_upload;
const auto min_upload_part_size = settings->min_upload_part_size;
const auto max_upload_part_size = settings->max_upload_part_size;
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_blocks_in_multipart_upload must not be 0");
else if (!min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (max_upload_part_size < min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = min_upload_part_size;
auto num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > max_part_number)
{
part_size = (total_size + max_part_number - 1) / max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > max_upload_part_size)
{
part_size = max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to Azure. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = max_upload_part_size;
normal_part_size = part_size;
}
public:
@ -219,21 +265,22 @@ namespace
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
while (!read_buffer->eof())
{
auto size = read_buffer->available();
if (size > 0)
{
auto block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(read_buffer->position()), size);
block_blob_client.StageBlock(block_id, memory);
task.block_ids.emplace_back(block_id);
read_buffer->ignore(size);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id);
}
}
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size());
/// task.part_size is already normalized according to min_upload_part_size and max_upload_part_size.
size_t size_to_stage = task.part_size;
PODArray<char> memory;
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
copyData(*read_buffer, wb, size_to_stage);
Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast<const uint8_t *>(memory.data()), size_to_stage);
const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64));
block_blob_client.StageBlock(block_id, stream);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}, size: {}",
dest_container_for_logging, dest_blob, block_id, size_to_stage);
}
@ -300,21 +347,32 @@ void copyAzureBlobStorageFile(
if (size < settings->max_single_part_copy_size)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy blob sync {} -> {}", src_blob, dest_blob);
block_blob_client_dest.CopyFromUri(source_uri);
}
else
{
Azure::Storage::Blobs::StartBlobCopyOperation operation = block_blob_client_dest.StartCopyFromUri(source_uri);
// Wait for the operation to finish, checking for status every 100 second.
auto copy_response = operation.PollUntilDone(std::chrono::milliseconds(100));
auto properties_model = copy_response.Value;
if (properties_model.CopySource.HasValue())
{
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy failed");
}
auto copy_status = properties_model.CopyStatus;
auto copy_status_description = properties_model.CopyStatusDescription;
if (copy_status.HasValue() && copy_status.Value() == Azure::Storage::Blobs::Models::CopyStatus::Success)
{
LOG_TRACE(getLogger("copyAzureBlobStorageFile"), "Copy of {} to {} finished", properties_model.CopySource.Value(), dest_blob);
}
else
{
if (copy_status.HasValue())
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} failed with status {} description {} (operation is done {})",
src_blob, dest_blob, copy_status.Value().ToString(), copy_status_description.Value(), operation.IsDone());
else
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Copy from {} to {} didn't complete with success status (operation is done {})", src_blob, dest_blob, operation.IsDone());
}
}
}
else
@ -322,8 +380,8 @@ void copyAzureBlobStorageFile(
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};

View File

@ -352,7 +352,6 @@ static ReturnType parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
{
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
}
s.push_back(unhex2(hex_code));
}
else if (char_after_backslash == 'N')
@ -608,13 +607,20 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf, bool kee
}
template <typename Vector, bool parse_complex_escape_sequence>
template <typename Vector, bool parse_complex_escape_sequence, bool support_crlf>
void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
char * next_pos;
if constexpr (support_crlf)
{
next_pos = find_first_symbols<'\t', '\n', '\\','\r'>(buf.position(), buf.buffer().end());
}
else
{
next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
}
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
@ -641,25 +647,46 @@ void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
}
}
}
if constexpr (support_crlf)
{
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() != '\n')
{
s.push_back('\r');
continue;
}
return;
}
}
}
}
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf)
{
readEscapedStringIntoImpl<Vector, true>(s, buf);
readEscapedStringIntoImpl<Vector, true, support_crlf>(s, buf);
}
void readEscapedString(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto(s, buf);
readEscapedStringInto<String,false>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto<String,true>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>,false>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,false>(NullOutput & s, ReadBuffer & buf);
template void readEscapedStringInto<PaddedPODArray<UInt8>,true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,true>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true,
* strings like 'abc''def' will be parsed as abc'def.
@ -2069,7 +2096,14 @@ bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON &
void readTSVField(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false>(s, buf);
readEscapedStringIntoImpl<String, false, false>(s, buf);
}
void readTSVFieldCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false, true>(s, buf);
}
}

View File

@ -583,6 +583,8 @@ void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf);
void readQuotedString(String & s, ReadBuffer & buf);
void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf);
@ -645,7 +647,7 @@ void readStringInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readNullTerminated(Vector & s, ReadBuffer & buf);
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf);
template <bool enable_sql_style_quoting, typename Vector>
@ -1901,6 +1903,7 @@ void readJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & se
bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings);
void readTSVField(String & s, ReadBuffer & buf);
void readTSVFieldCRLF(String & s, ReadBuffer & buf);
/** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters).
* It is assumed that the cursor is located on the `\` symbol

View File

@ -316,23 +316,23 @@ namespace
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size);
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).

View File

@ -462,8 +462,8 @@ void ThreadStatus::initGlobalProfiler([[maybe_unused]] UInt64 global_profiler_re
{
#if !defined(SANITIZER) && !defined(__APPLE__)
/// profilers are useless without trace collector
auto global_context_ptr = global_context.lock();
if (!global_context_ptr || !global_context_ptr->hasTraceCollector())
auto context = Context::getGlobalContextInstance();
if (!context->hasTraceCollector())
return;
try

View File

@ -103,7 +103,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_WAS_CANCELLED;
extern const int INCORRECT_DATA;
extern const int SYNTAX_ERROR;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_QUERY;
@ -1256,34 +1255,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
}
}
// Here we check if our our projections contain force_optimize_projection_name
if (!settings.force_optimize_projection_name.value.empty())
{
bool found = false;
std::set<std::string> projections;
{
const auto & access_info = context->getQueryAccessInfo();
std::lock_guard lock(access_info.mutex);
projections = access_info.projections;
}
for (const auto &projection : projections)
{
// projection value has structure like: <db_name>.<table_name>.<projection_name>
// We need to get only the projection name
size_t last_dot_pos = projection.find_last_of('.');
std::string projection_name = (last_dot_pos != std::string::npos) ? projection.substr(last_dot_pos + 1) : projection;
if (settings.force_optimize_projection_name.value == projection_name)
{
found = true;
break;
}
}
if (!found)
throw Exception(ErrorCodes::INCORRECT_DATA, "Projection {} is specified in setting force_optimize_projection_name but not used",
settings.force_optimize_projection_name.value);
}
if (process_list_entry)
{
@ -1421,7 +1392,16 @@ void executeQuery(
const char * begin;
const char * end;
istr.nextIfAtEnd();
try
{
istr.nextIfAtEnd();
}
catch (...)
{
/// If buffer contains invalid data and we failed to decompress, we still want to have some information about the query in the log.
logQuery("<cannot parse>", context, /* internal = */ false, QueryProcessingStage::Complete);
throw;
}
size_t max_query_size = context->getSettingsRef().max_query_size;

View File

@ -135,7 +135,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// If the key is not found, skip the value.
NullOutput sink;
readEscapedStringInto(sink, *in);
readEscapedStringInto<NullOutput,false>(sink, *in);
}
else
{

View File

@ -10,6 +10,8 @@
#include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <boost/range/adaptor/map.hpp>
#include "Formats/FormatSettings.h"
namespace DB
{
@ -28,7 +30,8 @@ static void checkForCarriageReturn(ReadBuffer & in)
throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.");
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r"
"\nor else enable setting 'input_format_tsv_crlf_end_of_line'");
}
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(
@ -92,7 +95,12 @@ void TabSeparatedFormatReader::skipRowEndDelimiter()
if (buf->eof())
return;
if (unlikely(first_row))
if (format_settings.tsv.crlf_end_of_line_input)
{
if (*buf->position() == '\r')
++buf->position();
}
else if (unlikely(first_row))
{
checkForCarriageReturn(*buf);
first_row = false;
@ -105,14 +113,15 @@ template <bool read_string>
String TabSeparatedFormatReader::readFieldIntoString()
{
String field;
bool support_crlf = format_settings.tsv.crlf_end_of_line_input;
if (is_raw)
readString(field, *buf);
else
{
if constexpr (read_string)
readEscapedString(field, *buf);
support_crlf ? readEscapedStringCRLF(field, *buf) : readEscapedString(field, *buf);
else
readTSVField(field, *buf);
support_crlf ? readTSVFieldCRLF(field, *buf) : readTSVField(field, *buf);
}
return field;
}
@ -123,7 +132,7 @@ void TabSeparatedFormatReader::skipField()
if (is_raw)
readStringInto(out, *buf);
else
readEscapedStringInto(out, *buf);
format_settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<NullOutput,true>(out, *buf) : readEscapedStringInto<NullOutput,false>(out, *buf);
}
void TabSeparatedFormatReader::skipHeaderRow()
@ -155,7 +164,7 @@ bool TabSeparatedFormatReader::readField(IColumn & column, const DataTypePtr & t
const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/)
{
const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n');
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r'));
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
@ -220,7 +229,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
try
{
assertChar('\n', *buf);
if (!format_settings.tsv.crlf_end_of_line_input)
assertChar('\n', *buf);
else
assertChar('\r', *buf);
}
catch (const DB::Exception &)
{
@ -233,7 +245,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
else if (*buf->position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
" It's like your file has DOS/Windows style line separators. \n"
"You must transform your file to Unix format. \n"
"But if you really need carriage return at end of string value of last column, you need to escape it as \\r \n"
"or else enable setting 'input_format_tsv_crlf_end_of_line'";
}
else
{
@ -348,7 +363,7 @@ void TabSeparatedFormatReader::skipRow()
bool TabSeparatedFormatReader::checkForEndOfRow()
{
return buf->eof() || *buf->position() == '\n';
return buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r');
}
TabSeparatedSchemaReader::TabSeparatedSchemaReader(

View File

@ -111,8 +111,11 @@ void optimizePrimaryKeyCondition(const Stack & stack);
void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
/// Returns the name of used projection or nullopt if no projection is used.
std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan & plan, QueryPlan::Node & node, QueryPlan::Nodes & nodes);
/// Enable memory bound merging of aggregation states for remote queries

View File

@ -46,7 +46,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.optimize_projection = from.optimize_use_projections;
settings.force_use_projection = settings.optimize_projection && from.force_optimize_projection;
settings.force_projection_name = from.force_optimize_projection_name;
settings.force_projection_name = settings.optimize_projection ? from.force_optimize_projection_name.value : "";
settings.optimize_use_implicit_projections = settings.optimize_projection && from.optimize_use_implicit_projections;
return settings;

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int TOO_MANY_QUERY_PLAN_OPTIMIZATIONS;
extern const int PROJECTION_NOT_USED;
}
@ -106,7 +107,7 @@ void optimizeTreeFirstPass(const QueryPlanOptimizationSettings & settings, Query
void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes)
{
const size_t max_optimizations_to_apply = optimization_settings.max_optimizations_to_apply;
size_t num_applied_projection = 0;
std::unordered_set<String> applied_projection_names;
bool has_reading_from_mt = false;
Stack stack;
@ -159,9 +160,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
/// Projection optimization relies on PK optimization
if (optimization_settings.optimize_projection)
num_applied_projection
+= optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections);
{
auto applied_projection = optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections);
if (applied_projection)
applied_projection_names.insert(*applied_projection);
}
if (optimization_settings.aggregation_in_order)
optimizeAggregationInOrder(*frame.node, nodes);
@ -180,11 +183,11 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
if (optimization_settings.optimize_projection)
{
/// Projection optimization relies on PK optimization
if (optimizeUseNormalProjections(stack, nodes))
if (auto applied_projection = optimizeUseNormalProjections(stack, nodes))
{
++num_applied_projection;
applied_projection_names.insert(*applied_projection);
if (max_optimizations_to_apply && max_optimizations_to_apply < num_applied_projection)
if (max_optimizations_to_apply && max_optimizations_to_apply < applied_projection_names.size())
throw Exception(ErrorCodes::TOO_MANY_QUERY_PLAN_OPTIMIZATIONS,
"Too many projection optimizations applied to query plan. Current limit {}",
max_optimizations_to_apply);
@ -201,10 +204,16 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
stack.pop_back();
}
if (optimization_settings.force_use_projection && has_reading_from_mt && num_applied_projection == 0)
if (optimization_settings.force_use_projection && has_reading_from_mt && applied_projection_names.empty())
throw Exception(
ErrorCodes::PROJECTION_NOT_USED,
"No projection is used when optimize_use_projections = 1 and force_optimize_projection = 1");
if (!optimization_settings.force_projection_name.empty() && has_reading_from_mt && !applied_projection_names.contains(optimization_settings.force_projection_name))
throw Exception(
ErrorCodes::INCORRECT_DATA,
"Projection {} is specified in setting force_optimize_projection_name but not used",
optimization_settings.force_projection_name);
}
void optimizeTreeThirdPass(QueryPlan & plan, QueryPlan::Node & root, QueryPlan::Nodes & nodes)

View File

@ -552,28 +552,28 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
return nullptr;
}
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
{
if (node.children.size() != 1)
return false;
return {};
auto * aggregating = typeid_cast<AggregatingStep *>(node.step.get());
if (!aggregating)
return false;
return {};
if (!aggregating->canUseProjection())
return false;
return {};
QueryPlan::Node * reading_node = findReadingStep(*node.children.front());
if (!reading_node)
return false;
return {};
auto * reading = typeid_cast<ReadFromMergeTree *>(reading_node->step.get());
if (!reading)
return false;
return {};
if (!canUseProjectionForReadingStep(reading))
return false;
return {};
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
@ -597,7 +597,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
@ -631,15 +631,14 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
}
else
{
return false;
return {};
}
Context::QualifiedProjectionName projection_name;
chassert(best_candidate != nullptr);
QueryPlanStepPtr projection_reading;
@ -654,12 +653,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
Pipe pipe(std::make_shared<SourceFromSingleChunk>(std::move(candidates.minmax_projection->block)));
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
has_ordinary_parts = false;
projection_name = Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = candidates.minmax_projection->candidate.projection->name,
};
}
else
{
@ -691,12 +684,6 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
projection_reading = std::make_unique<ReadFromPreparedSource>(std::move(pipe));
}
projection_name = Context::QualifiedProjectionName
{
.storage_id = reading->getMergeTreeData().getStorageID(),
.projection_name = best_candidate->projection->name,
};
has_ordinary_parts = best_candidate->merge_tree_ordinary_select_result_ptr != nullptr;
if (has_ordinary_parts)
reading->setAnalyzedResult(std::move(best_candidate->merge_tree_ordinary_select_result_ptr));
@ -746,7 +733,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
node.children.push_back(&expr_or_filter_node);
}
return true;
return best_candidate->projection->name;
}
}

View File

@ -73,16 +73,16 @@ static bool hasAllRequiredColumns(const ProjectionDescription * projection, cons
}
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
const auto & frame = stack.back();
auto * reading = typeid_cast<ReadFromMergeTree *>(frame.node->step.get());
if (!reading)
return false;
return {};
if (!canUseProjectionForReadingStep(reading))
return false;
return {};
auto iter = stack.rbegin();
while (std::next(iter) != stack.rend())
@ -96,7 +96,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
/// Dangling query plan node. This might be generated by StorageMerge.
if (iter->node->step.get() == reading)
return false;
return {};
const auto metadata = reading->getStorageMetadata();
const auto & projections = metadata->projections;
@ -107,7 +107,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
normal_projections.push_back(&projection);
if (normal_projections.empty())
return false;
return {};
ContextPtr context = reading->getContext();
auto it = std::find_if(normal_projections.begin(), normal_projections.end(), [&](const auto * projection)
@ -126,7 +126,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
{
auto & child = iter->node->children[iter->next_child - 1];
if (!query.build(*child))
return false;
return {};
if (query.dag)
query.dag->removeUnusedActions();
@ -146,7 +146,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (ordinary_reading_marks == 0)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
const auto & parts_with_ranges = ordinary_reading_select_result->parts_with_ranges;
@ -185,7 +185,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
if (!best_candidate)
{
reading->setAnalyzedResult(std::move(ordinary_reading_select_result));
return false;
return {};
}
auto storage_snapshot = reading->getStorageSnapshot();
@ -283,8 +283,7 @@ bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes)
/// Here we remove last steps from stack to be able to optimize again.
/// In theory, read-in-order can be applied to projection.
stack.resize(iter.base() - stack.begin());
return true;
return best_candidate->projection->name;
}
}

View File

@ -707,11 +707,11 @@ void HTTPHandler::processQuery(
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
bool in_post_compressed = false;
bool is_in_post_compressed = false;
if (params.getParsed<bool>("decompress", false))
{
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
in_post_compressed = true;
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post, /* allow_different_codecs_ = */ false, /* external_data_ = */ true);
is_in_post_compressed = true;
}
else
in_post_maybe_compressed = std::move(in_post);
@ -845,7 +845,7 @@ void HTTPHandler::processQuery(
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
if (is_in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin

View File

@ -254,6 +254,10 @@ AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(const ContextPt
auto settings_ptr = std::make_unique<AzureObjectStorageSettings>();
settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings_ptr->strict_upload_part_size = context_settings.azure_strict_upload_part_size;
settings_ptr->max_upload_part_size = context_settings.azure_max_upload_part_size;
settings_ptr->max_blocks_in_multipart_upload = context_settings.azure_max_blocks_in_multipart_upload;
settings_ptr->min_upload_part_size = context_settings.azure_min_upload_part_size;
settings_ptr->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
return settings_ptr;

View File

@ -5,6 +5,7 @@
#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
@ -53,8 +54,13 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int FAULT_INJECTED;
}
namespace FailPoints
{
extern const char stripe_log_sink_write_fallpoint[];
}
/// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading,
/// because we read ranges of data that do not change.
@ -234,6 +240,11 @@ public:
/// Save the new indices.
storage.saveIndices(lock);
// While executing save file sizes the exception might occurs. S3::TooManyRequests for example.
fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint,
{
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table");
});
/// Save the new file sizes.
storage.saveFileSizes(lock);

View File

@ -110,6 +110,11 @@ def cluster():
main_configs=[path],
with_azurite=True,
)
cluster.add_instance(
"node3",
main_configs=[path],
with_azurite=True,
)
cluster.start()
yield cluster
@ -216,3 +221,37 @@ def test_backup_restore_on_merge_tree_different_container(cluster):
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket")
azure_query(node2, f"DROP TABLE test_simple_merge_tree_different_bucket_restored")
def test_backup_restore_on_merge_tree_native_copy_async(cluster):
node3 = cluster.instances["node3"]
azure_query(
node3,
f"CREATE TABLE test_simple_merge_tree_async(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='policy_azure_cache'",
)
azure_query(node3, f"INSERT INTO test_simple_merge_tree_async VALUES (1, 'a')")
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', 'test_simple_merge_tree_async_backup')"
print("BACKUP DEST", backup_destination)
azure_query(
node3,
f"BACKUP TABLE test_simple_merge_tree_async TO {backup_destination}",
settings={"azure_max_single_part_copy_size": 0},
)
assert node3.contains_in_log("using native copy")
azure_query(
node3,
f"RESTORE TABLE test_simple_merge_tree_async AS test_simple_merge_tree_async_restored FROM {backup_destination};",
settings={"azure_max_single_part_copy_size": 0},
)
assert (
azure_query(node3, f"SELECT * from test_simple_merge_tree_async_restored")
== "1\ta\n"
)
assert node3.contains_in_log("using native copy")
azure_query(node3, f"DROP TABLE test_simple_merge_tree_async")
azure_query(node3, f"DROP TABLE test_simple_merge_tree_async_restored")

View File

@ -281,7 +281,10 @@ def test_backup_restore_on_merge_tree(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'",
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'
""",
)
azure_query(node, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
@ -299,3 +302,85 @@ def test_backup_restore_on_merge_tree(cluster):
)
azure_query(node, f"DROP TABLE test_simple_merge_tree")
azure_query(node, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_correct_block_ids(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String)
Engine = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='blob_storage_policy'""",
)
data_query = "SELECT number, repeat('a', 100) FROM numbers(1000)"
azure_query(
node,
f"INSERT INTO test_simple_merge_tree {data_query}",
)
for min_upload_size, max_upload_size, max_blocks, expected_block_size in [
(42, 100, 1000, 42),
(42, 52, 86, 52),
]:
data_path = f"test_backup_correct_block_ids_{max_blocks}"
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{data_path}')"
azure_query(
node,
f"""
SET azure_min_upload_part_size = {min_upload_size};
SET azure_max_upload_part_size = {max_upload_size};
SET azure_max_blocks_in_multipart_upload = {max_blocks};
BACKUP TABLE test_simple_merge_tree TO {backup_destination} SETTINGS allow_azure_native_copy = 0;
""",
)
port = cluster.azurite_port
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
container_name = "cont"
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs()
data_blob = (
f"{data_path}/data/default/test_simple_merge_tree/all_1_1_0/data.bin"
)
found = False
for blob in blobs:
if data_blob == blob.get("name"):
found = True
break
assert found
blob_client = blob_service_client.get_blob_client(
blob=data_blob, container=container_name
)
blocks_num = len(blob_client.get_block_list()[0])
assert blocks_num > 50
count = 0
for block in blob_client.get_block_list()[0]:
count += 1
if count < blocks_num:
assert block.get("size") == expected_block_size
else:
assert block.get("size") < expected_block_size
azure_query(
node,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored_{max_blocks} FROM {backup_destination};",
)
assert azure_query(
node,
f"SELECT * from test_simple_merge_tree_restored_{max_blocks} ORDER BY key",
) == node.query(data_query)

View File

@ -0,0 +1,88 @@
import logging
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage_policy.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
def list_of_files_on_ch_disk(node, disk, path):
disk_path = node.query(
f"SELECT path FROM system.disks WHERE name='{disk}'"
).splitlines()[0]
return node.exec_in_container(
["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root"
)
@pytest.mark.parametrize(
"engine",
[
pytest.param("Log"),
],
)
@pytest.mark.parametrize(
"disk,check_s3",
[
pytest.param("default", False),
pytest.param("s3", True),
],
)
@pytest.mark.parametrize(
"delay",
[
pytest.param(0),
pytest.param(4),
],
)
def test_drop_table(cluster, engine, disk, check_s3, delay):
node = cluster.instances["node"]
node.query("DROP DATABASE IF EXISTS lazy")
node.query("CREATE DATABASE lazy ENGINE=Lazy(2)")
node.query(
"CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format(
engine,
disk,
)
)
node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)")
assert node.query("SELECT count(*) FROM lazy.table") == "10\n"
if delay:
time.sleep(delay)
node.query("DROP TABLE lazy.table SYNC")
if check_s3:
# There mustn't be any orphaned data
assert_objects_count(cluster, 0)
# Local data must be removed
assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == ""

View File

@ -0,0 +1,34 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<s3_no_retries>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>1</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>
</disks>
<policies>
<s3_no_retries>
<volumes>
<main>
<disk>s3_no_retries</disk>
</main>
</volumes>
</s3_no_retries>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -11,7 +11,7 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/minio.xml", "configs/ssl.xml"],
main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i
assert_objects_count(cluster, 0)
finally:
node.query("DROP TABLE s3_test")
# Imitate case when error occurs while inserting into table.
# For examle S3::TooManyRequests.
# In that case we can update data file, but not the size file.
# So due to exception we should do truncate of the data file to undo the insert query.
# See FileChecker::repair().
def test_stripe_log_truncate(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE stripe_table (
a int
) ENGINE = StripeLog()
SETTINGS storage_policy='s3_no_retries'
"""
)
node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query(
"""
INSERT INTO stripe_table SELECT number FROM numbers(10)
""",
ignore_error=True,
)
node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query("SELECT count(*) FROM stripe_table") == "0\n"
node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)")
node.query("SELECT count(*) FROM stripe_table") == "10\n"
# Make sure that everything is okey with the table after restart.
node.query("DETACH TABLE stripe_table")
node.query("ATTACH TABLE stripe_table")
assert node.query("DROP TABLE stripe_table") == ""

View File

@ -15,6 +15,6 @@
<shard>01</shard>
</macros>
<default_replica_path>/lol/kek/'/{uuid}</default_replica_path>
<default_replica_path>/clickhouse/'/{database}/{table}/{uuid}</default_replica_path>
</clickhouse>

View File

@ -6,7 +6,7 @@ cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_unusual.xml",
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
@ -63,7 +63,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedReplacingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', D)"
"ReplicatedReplacingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', D)"
)
)
assert (
@ -73,7 +73,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedVersionedCollapsingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', Sign, Version)"
"ReplicatedVersionedCollapsingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', Sign, Version)"
)
)

View File

@ -0,0 +1,69 @@
import pytest
from test_modify_engine_on_restart.common import (
get_table_path,
set_convert_flags,
)
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "modify_engine_zk_path"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=database_name, sql=query)
def test_modify_engine_fails_if_zk_path_exists(started_cluster):
ch1.query("CREATE DATABASE " + database_name)
q(
ch1,
"CREATE TABLE already_exists_1 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;",
)
uuid = q(
ch1,
f"SELECT uuid FROM system.tables WHERE table = 'already_exists_1' and database = '{database_name}'",
).strip("'[]\n")
q(
ch1,
f"CREATE TABLE already_exists_2 ( A Int64, D Date, S String ) ENGINE ReplicatedMergeTree('/clickhouse/\\'/{database_name}/already_exists_1/{uuid}', 'r2') PARTITION BY toYYYYMM(D) ORDER BY A;",
)
set_convert_flags(ch1, database_name, ["already_exists_1"])
table_data_path = get_table_path(ch1, "already_exists_1", database_name)
ch1.stop_clickhouse()
ch1.start_clickhouse(retry_start=False, expected_to_fail=True)
# Check if we can cancel convertation
ch1.exec_in_container(
[
"bash",
"-c",
f"rm {table_data_path}convert_to_replicated",
]
)
ch1.start_clickhouse()

View File

@ -1,4 +0,0 @@
SET send_logs_level = 'fatal';
SET max_block_size = 0;
SELECT number FROM system.numbers; -- { serverError 12 }

View File

@ -132,3 +132,7 @@ SELECT * FROM merge_table ORDER BY id, val;
2 a
2 b
3 c
select sum(number) from numbers(10) settings final=1;
45
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;
90

View File

@ -102,3 +102,6 @@ insert into table_to_merge_c values (3,'c');
-- expected output:
-- 1 c, 2 a, 2 b, 3 c
SELECT * FROM merge_table ORDER BY id, val;
select sum(number) from numbers(10) settings final=1;
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;

View File

@ -103,11 +103,11 @@ SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError INCORRECT_DATA }
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError INCORRECT_DATA }
drop table if exists aggr;

View File

@ -236,3 +236,6 @@ Check asan bug
0
Check bug found fuzzing
9042C6691B1A75F0EA3314B6F55728BB
Check bug 2 found fuzzing
608E1FF030C9E206185B112C2A25F1A7
ABB65AE97711A2E053E324ED88B1D08B

View File

@ -338,3 +338,10 @@ SELECT sipHash128((toUInt64(9223372036854775806), 1)) = sipHash128(1) GROUP BY s
SELECT 'Check bug found fuzzing';
SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED }
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1])));
SELECT 'Check bug 2 found fuzzing';
DROP TABLE IF EXISTS sipHashKeyed_keys;
CREATE TABLE sipHashKeyed_keys (`a` Map(String, String)) ENGINE = Memory;
INSERT INTO sipHashKeyed_keys FORMAT VALUES ({'a':'b', 'c':'d'}), ({'e':'f', 'g':'h'});
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, materialize(0::UInt64)), a)) FROM sipHashKeyed_keys ORDER BY a;
DROP TABLE sipHashKeyed_keys;

View File

@ -1,3 +1,5 @@
DROP TABLE IF EXISTS test;
CREATE TABLE test
(
`id` UInt64,
@ -18,3 +20,16 @@ SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='pro
SELECT name FROM test GROUP BY name SETTINGS force_optimize_projection_name='non_existing_projection'; -- { serverError 117 }
SELECT name FROM test SETTINGS force_optimize_projection_name='projection_name'; -- { serverError 117 }
INSERT INTO test SELECT number, 'test' FROM numbers(1, 100) SETTINGS force_optimize_projection_name='projection_name';
SELECT 1 SETTINGS force_optimize_projection_name='projection_name';
SYSTEM FLUSH LOGS;
SELECT read_rows FROM system.query_log
WHERE current_database = currentDatabase()
AND query LIKE '%SELECT name FROM test%'
AND Settings['force_optimize_projection_name'] = 'projection_name'
AND type = 'ExceptionBeforeStart';
DROP TABLE test;

View File

@ -0,0 +1,11 @@
<-- Read UNIX endings -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1
<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Data preparation step
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
UNIX_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_without_crlf.tsv"
DOS_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_with_crlf.tsv"
DATA_FILE_UNIX_ENDINGS="${USER_FILES_PATH:?}/${UNIX_ENDINGS}"
DATA_FILE_DOS_ENDINGS="${USER_FILES_PATH:?}/${DOS_ENDINGS}"
touch $DATA_FILE_UNIX_ENDINGS
touch $DATA_FILE_DOS_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\nAegithina_tiphia\t2018-02-01\t34\n1971-72_Utah_Stars_season\t2016-10-01\t1\n" > $DATA_FILE_UNIX_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\r\nAegithina_tiphia\t2018-02-01\t34\r\n1971-72_Utah_Stars_season\t2016-10-01\t1\r\n" > $DATA_FILE_DOS_ENDINGS
echo -e "<-- Read UNIX endings -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${UNIX_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32');"
$CLICKHOUSE_CLIENT --multiquery --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32'); --{serverError 117}"
echo -e "\n<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32') SETTINGS input_format_tsv_crlf_end_of_line = 1;"
# Test teardown
rm $DATA_FILE_UNIX_ENDINGS
rm $DATA_FILE_DOS_ENDINGS

View File

@ -0,0 +1,2 @@
Hello, World! From client.
Hello, World! From local.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client"
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "SELECT count(*) FROM numbers(10) AS a, numbers(11) AS b, numbers(12) AS c SETTINGS max_block_size = 0" 2>&1 | grep -q "Sanity check: 'max_block_size' cannot be 0. Set to default value" && echo "OK" || echo "FAIL"

View File

@ -7,8 +7,6 @@ export LC_ALL=C # The "total" should be printed without localization
TU_EXCLUDES=(
AggregateFunctionUniq
Aggregator
# FIXME: Exclude for now
FunctionsConversion
)
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \