Merge branch 'master' into gcmaster-parquet

This commit is contained in:
ZhiHong Zhang 2024-05-22 23:31:00 +08:00 committed by GitHub
commit 3d7befef4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
77 changed files with 904 additions and 151 deletions

View File

@ -68,8 +68,9 @@ if (ENABLE_CHECK_HEAVY_BUILDS)
set (RLIMIT_AS 20000000000)
endif()
# For some files currently building RISCV64 might be too slow. TODO: Improve compilation times per file
if (ARCH_RISCV64)
# For some files currently building RISCV64/LOONGARCH64 might be too slow.
# TODO: Improve compilation times per file
if (ARCH_RISCV64 OR ARCH_LOONGARCH64)
set (RLIMIT_CPU 1800)
endif()

View File

@ -197,6 +197,7 @@ SELECT * FROM nestedt FORMAT TSV
- [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`.
- [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`.
- [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV input format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`.
- [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`.
- [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.

View File

@ -831,7 +831,13 @@ Default value: `0`.
### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line}
Use DOC/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Use DOS/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Disabled by default.
### input_format_tsv_crlf_end_of_line {#input_format_tsv_crlf_end_of_line}
Use DOS/Windows-style line separator (CRLF) for TSV input files instead of Unix style (LF).
Disabled by default.

View File

@ -119,6 +119,7 @@ Hello\nworld
Hello\
world
```
`\n\r` (CRLF) поддерживается с помощью настройки `input_format_tsv_crlf_end_of_line`.
Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа.

View File

@ -1178,7 +1178,7 @@ void Client::processConfig()
pager = config().getString("pager", "");
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial();

View File

@ -607,7 +607,7 @@ void LocalServer::processConfig()
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
/// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config());

View File

@ -14,7 +14,7 @@ struct Settings;
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
extern const int INCORRECT_DATA;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}
@ -198,7 +198,7 @@ public:
this->data(place).value().read(buf, *serialization_val, arena);
if (unlikely(this->data(place).value().has() != this->data(place).result().has()))
throw Exception(
ErrorCodes::CORRUPTED_DATA,
ErrorCodes::INCORRECT_DATA,
"Invalid state of the aggregate function {}: has_value ({}) != has_result ({})",
getName(),
this->data(place).value().has(),

View File

@ -42,7 +42,7 @@ private:
return;
const auto & storage = table_node ? table_node->getStorage() : table_function_node->getStorage();
bool is_final_supported = storage && storage->supportsFinal();
bool is_final_supported = storage && !storage->isRemote() && storage->supportsFinal();
if (!is_final_supported)
return;

View File

@ -192,7 +192,7 @@ void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node)
void QueryTreePassManager::runOnlyResolve(QueryTreeNodePtr query_tree_node)
{
// Run only QueryAnalysisPass and GroupingFunctionsResolvePass passes.
run(query_tree_node, 2);
run(query_tree_node, 3);
}
void QueryTreePassManager::run(QueryTreeNodePtr query_tree_node, size_t up_to_pass_index)
@ -249,6 +249,7 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
{
manager.addPass(std::make_unique<QueryAnalysisPass>(only_analyze));
manager.addPass(std::make_unique<GroupingFunctionsResolvePass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<RemoveUnusedProjectionColumnsPass>());
manager.addPass(std::make_unique<FunctionToSubcolumnsPass>());
@ -294,7 +295,6 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze)
manager.addPass(std::make_unique<LogicalExpressionOptimizerPass>());
manager.addPass(std::make_unique<AutoFinalOnQueryPass>());
manager.addPass(std::make_unique<CrossToInnerJoinPass>());
manager.addPass(std::make_unique<ShardNumColumnToFunctionPass>());

View File

@ -21,6 +21,7 @@
#include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h>
#include <Common/NetException.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/FormatFactory.h>
@ -643,6 +644,9 @@ try
bool extras_into_stdout = need_render_progress || logs_into_stdout;
bool select_only_into_file = select_into_file && !select_into_file_and_stdout;
if (!out_file_buf && default_output_compression_method != CompressionMethod::None)
out_file_buf = wrapWriteBufferWithCompressionMethod(out_buf, default_output_compression_method, 3, 0);
/// It is not clear how to write progress and logs
/// intermixed with data with parallel formatting.
/// It may increase code complexity significantly.
@ -735,7 +739,7 @@ bool ClientBase::isRegularFile(int fd)
return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
void ClientBase::setDefaultFormatsFromConfiguration()
void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
{
if (config().has("output-format"))
{
@ -759,6 +763,10 @@ void ClientBase::setDefaultFormatsFromConfiguration()
default_output_format = *format_from_file_name;
else
default_output_format = "TSV";
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(STDOUT_FILENO);
if (file_name)
default_output_compression_method = chooseCompressionMethod(*file_name, "");
}
else if (is_interactive)
{

View File

@ -190,7 +190,7 @@ protected:
/// Adjust some settings after command line options and config had been processed.
void adjustSettings();
void setDefaultFormatsFromConfiguration();
void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress);
@ -224,6 +224,7 @@ protected:
String pager;
String default_output_format; /// Query results output format.
CompressionMethod default_output_compression_method = CompressionMethod::None;
String default_input_format; /// Tables' format for clickhouse-local.
bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering.

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
extern const int LOGICAL_ERROR;
}
@ -433,7 +434,7 @@ void AsyncLoader::wait(const LoadJobPtr & job, bool no_throw)
std::unique_lock job_lock{job->mutex};
wait(job_lock, job);
if (!no_throw && job->load_exception)
std::rethrow_exception(job->load_exception);
throw Exception(ErrorCodes::ASYNC_LOAD_WAIT_FAILED, "Waited job failed: {}", getExceptionMessage(job->load_exception, /* with_stacktrace = */ false));
}
void AsyncLoader::remove(const LoadJobSet & jobs)

View File

@ -600,7 +600,8 @@
M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \
M(720, USER_EXPIRED) \
M(721, DEPRECATED_FUNCTION) \
M(722, PARQUET_EXCEPTION) \
M(722, ASYNC_LOAD_WAIT_FAILED) \
M(723, PARQUET_EXCEPTION) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -40,6 +40,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(replicated_sends_failpoint) \
REGULAR(stripe_log_sink_write_fallpoint)\
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \
@ -58,6 +59,7 @@ static struct InitFiu
ONCE(execute_query_calling_empty_set_result_func_on_exception) \
ONCE(receive_timeout_on_table_status_response)
namespace FailPoints
{
#define M(NAME) extern const char(NAME)[] = #NAME "";

View File

@ -35,6 +35,7 @@ namespace DB::ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
}
struct Initializer {
@ -262,7 +263,8 @@ TEST(AsyncLoader, CancelPendingJob)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -288,7 +290,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -298,7 +301,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -325,7 +329,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -335,7 +340,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -451,8 +457,9 @@ TEST(AsyncLoader, JobFailure)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains(error_message));
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_FAILED"));
}
}
@ -489,8 +496,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
try
{
@ -499,8 +507,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
}
@ -531,7 +540,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
{
@ -540,7 +550,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}

View File

@ -0,0 +1,33 @@
#include <Common/tryGetFileNameByFileDescriptor.h>
#ifdef OS_LINUX
# include <unistd.h>
#elif defined(OS_DARWIN)
# include <fcntl.h>
#endif
#include <fmt/format.h>
namespace DB
{
std::optional<String> tryGetFileNameFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return file_path;
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return file_path;
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <optional>
#include <base/types.h>
namespace DB
{
/// Supports only Linux/MacOS. On other platforms, returns nullopt.
std::optional<String> tryGetFileNameFromFileDescriptor(int fd);
}

View File

@ -80,6 +80,7 @@ class IColumn;
M(UInt64, connections_with_failover_max_tries, 3, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to S3 (some implementations does not supports variable size parts).", 0) \
M(UInt64, azure_strict_upload_part_size, 0, "The exact size of part to upload during multipart upload to Azure blob storage.", 0) \
M(UInt64, azure_max_blocks_in_multipart_upload, 50000, "Maximum number of blocks in multipart upload for Azure.", 0) \
M(UInt64, s3_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_upload_part_size, 5ull*1024*1024*1024, "The maximum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, azure_min_upload_part_size, 16*1024*1024, "The minimum size of part to upload during multipart upload to Azure blob storage.", 0) \
@ -1079,6 +1080,7 @@ class IColumn;
M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
\
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
\

View File

@ -87,6 +87,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
{"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."},
{"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."},
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},
@ -94,6 +95,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"prefer_external_sort_block_bytes", 0, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging."},
{"input_format_parquet_use_native_reader", false, false, "When reading Parquet files, to use native reader instead of arrow reader."},
{"input_format_force_null_for_omitted_fields", false, false, "Disable type-defaults for omitted fields when needed"},
{"azure_max_blocks_in_multipart_upload", 50000, 50000, "Maximum number of blocks in multipart upload for Azure."},
}},
{"24.4", {{"input_format_json_throw_on_bad_escape_sequence", true, true, "Allow to save JSON strings with bad escape sequences"},
{"max_parsing_threads", 0, 0, "Add a separate setting to control number of threads in parallel parsing from files"},

View File

@ -146,10 +146,10 @@ void SerializationAggregateFunction::serializeTextEscaped(const IColumn & column
}
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readEscapedString(s, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr);
deserializeFromString(function, column, s, version);
}

View File

@ -242,8 +242,10 @@ void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & is
{
if (istr.eof())
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF.");
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
if (settings.tsv.crlf_end_of_line_input)
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || *buf.position() == '\r'; });
else
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
}
bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const

View File

@ -75,7 +75,7 @@ void SerializationCustomSimpleText::serializeTextEscaped(const IColumn & column,
void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String str;
readEscapedString(str, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(str, istr) : readEscapedString(str, istr);
deserializeFromString(*this, column, str, settings);
}

View File

@ -29,7 +29,7 @@ void SerializationEnum<Type>::deserializeTextEscaped(IColumn & column, ReadBuffe
{
/// NOTE It would be nice to do without creating a temporary object - at least extract std::string out.
std::string field_name;
readEscapedString(field_name, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field_name, istr) : readEscapedString(field_name, istr);
assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true));
}
}

View File

@ -10,8 +10,10 @@
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include "Common/PODArray.h"
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "base/types.h"
namespace DB
{
@ -183,14 +185,17 @@ static inline bool tryRead(const SerializationFixedString & self, IColumn & colu
}
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); });
read(*this, column, [&istr, &settings](ColumnFixedString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<ColumnFixedString::Chars,true>(data, istr) : readEscapedStringInto<ColumnFixedString::Chars,false>(data, istr);
});
}
bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr); return true; });
}

View File

@ -286,7 +286,7 @@ bool SerializationNullable::tryDeserializeNullRaw(DB::ReadBuffer & istr, const D
}
template<typename ReturnType, bool escaped>
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
@ -319,10 +319,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Check if we have enough data in buffer to check if it's a null.
if (istr.available() > null_representation.size())
{
auto check_for_null = [&null_representation](ReadBuffer & buf)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf)
{
auto * pos = buf.position();
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n'))
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.position() = pos;
return false;
@ -334,14 +334,14 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Use PeekableReadBuffer to make a checkpoint before checking null
/// representation and rollback if check was failed.
PeekableReadBuffer peekable_buf(istr, true);
auto check_for_null = [&null_representation](ReadBuffer & buf_)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf_)
{
auto & buf = assert_cast<PeekableReadBuffer &>(buf_);
buf.setCheckpoint();
SCOPE_EXIT(buf.dropCheckpoint());
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'))
return true;
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.rollbackToCheckpoint();
return false;
};
@ -371,7 +371,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\t' or '\\n' may not work correctly for large input.");
"containing '\\t' or '\\n' may not work correctly for large input.");
if (settings.tsv.crlf_end_of_line_input && null_representation.find('\r') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\r' may not work correctly for large input.");
WriteBufferFromOwnString parsed_value;
if constexpr (escaped)

View File

@ -104,9 +104,9 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
}
template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); });
deserializeTextImpl(column, [&](String & s) { settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); });
}
template <typename Parser>

View File

@ -147,7 +147,6 @@ void SerializationString::serializeBinaryBulk(const IColumn & column, WriteBuffe
}
}
template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
{
@ -324,14 +323,17 @@ bool SerializationString::tryDeserializeWholeText(IColumn & column, ReadBuffer &
return read<bool>(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; });
}
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read<void>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); });
read<void>(column, [&](ColumnString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr) : readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr);
});
}
bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr); return true; });
}
void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -599,14 +599,14 @@ void SerializationVariant::serializeTextEscaped(const IColumn & column, size_t r
bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
return tryDeserializeTextEscapedImpl(column, field, settings);
}
void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
if (!tryDeserializeTextEscapedImpl(column, field, settings))
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field);
}

View File

@ -5,6 +5,7 @@
#include <span>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
StoragePtr table = detachTable(local_context, table_name);
/// This is possible for Lazy database.
if (!table)
return;
bool renamed = false;
try
{
fs::rename(table_metadata_path, table_metadata_path_drop);
renamed = true;
table->drop();
table->is_dropped = true;
fs::path table_data_dir(local_context->getPath() + table_data_path_relative);
if (fs::exists(table_data_dir))
(void)fs::remove_all(table_data_dir);
// The table might be not loaded for Lazy database engine.
if (table)
{
table->drop();
table->is_dropped = true;
}
}
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
attachTable(local_context, table_name, table, table_data_path_relative);
if (table)
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
throw;
}
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isReadOnly() || !disk->exists(table_data_path_relative))
continue;
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
disk->removeRecursive(table_data_path_relative);
}
(void)fs::remove(table_metadata_path_drop);
}

View File

@ -721,11 +721,10 @@ public:
if (!block.checkCheckSum())
{
std::string calculated_check_sum = std::to_string(block.calculateCheckSum());
std::string check_sum = std::to_string(block.getCheckSum());
std::string expected_check_sum = std::to_string(block.getCheckSum());
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Cache data corrupted. Checksum validation failed. Calculated {} in block {}",
calculated_check_sum,
check_sum);
"Cache data corrupted. Checksum validation failed. Calculated {} expected in block {}, in file {}",
calculated_check_sum, expected_check_sum, file_path);
}
func(blocks_to_fetch[block_to_fetch_index], block.getBlockData());

View File

@ -244,6 +244,13 @@ public:
return delegate_transaction->writeFile(wrapped_path, buf_size, mode, settings);
}
/// Truncate file to the target size.
void truncateFile(const std::string & src_path, size_t target_size) override
{
auto wrapped_path = wrappedPath(src_path);
delegate_transaction->truncateFile(wrapped_path, target_size);
}
private:

View File

@ -2,10 +2,16 @@
#include <Disks/IDiskTransaction.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/// Fake disk transaction implementation.
/// Just execute all operations immediately, commit is noop operation.
/// No support for atomicity and rollback.
@ -134,6 +140,11 @@ public:
disk.createHardLink(src_path, dst_path);
}
void truncateFile(const std::string & /* src_path */, size_t /* target_size */) override
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Operation `truncateFile` is not implemented");
}
private:
IDisk & disk;
};

View File

@ -128,6 +128,9 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const std::string & src_path, const std::string & dst_path) = 0;
/// Truncate file to the target size.
virtual void truncateFile(const std::string & src_path, size_t target_size) = 0;
};
using DiskTransactionPtr = std::shared_ptr<IDiskTransaction>;

View File

@ -257,6 +257,7 @@ std::unique_ptr<AzureObjectStorageSettings> getAzureBlobStorageSettings(const Po
settings->max_upload_part_size = config.getUInt64(config_prefix + ".max_upload_part_size", context->getSettings().azure_max_upload_part_size);
settings->max_single_part_copy_size = config.getUInt64(config_prefix + ".max_single_part_copy_size", context->getSettings().azure_max_single_part_copy_size);
settings->use_native_copy = config.getBool(config_prefix + ".use_native_copy", false);
settings->max_blocks_in_multipart_upload = config.getUInt64(config_prefix + ".max_blocks_in_multipart_upload", 50000);
settings->max_unexpected_write_error_retries = config.getUInt64(config_prefix + ".max_unexpected_write_error_retries", context->getSettings().azure_max_unexpected_write_error_retries);
settings->max_inflight_parts_for_one_file = config.getUInt64(config_prefix + ".max_inflight_parts_for_one_file", context->getSettings().azure_max_inflight_parts_for_one_file);
settings->strict_upload_part_size = config.getUInt64(config_prefix + ".strict_upload_part_size", context->getSettings().azure_strict_upload_part_size);

View File

@ -63,6 +63,7 @@ struct AzureObjectStorageSettings
bool use_native_copy = false;
size_t max_unexpected_write_error_retries = 4;
size_t max_inflight_parts_for_one_file = 20;
size_t max_blocks_in_multipart_upload = 50000;
size_t strict_upload_part_size = 0;
size_t upload_part_size_multiply_factor = 2;
size_t upload_part_size_multiply_parts_count_threshold = 500;

View File

@ -133,6 +133,14 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::truncateFile(const String & path, size_t size)
{
LOG_TEST(log, "Truncate file operation {} to size : {}", path, size);
auto transaction = createObjectStorageTransaction();
transaction->truncateFile(path, size);
transaction->commit();
}
void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,

View File

@ -84,6 +84,8 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void truncateFile(const String & path, size_t size) override;
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int LOGICAL_ERROR;
}
void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
@ -207,6 +208,18 @@ void DiskObjectStorageMetadata::addObject(ObjectStorageKey key, size_t size)
keys_with_meta.emplace_back(std::move(key), ObjectMetadata{size, {}, {}});
}
ObjectKeyWithMetadata DiskObjectStorageMetadata::popLastObject()
{
if (keys_with_meta.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't pop last object from metadata {}. Metadata already empty", metadata_file_path);
ObjectKeyWithMetadata object = std::move(keys_with_meta.back());
keys_with_meta.pop_back();
total_size -= object.metadata.size_bytes;
return object;
}
bool DiskObjectStorageMetadata::getWriteFullObjectKeySetting()
{
#ifndef CLICKHOUSE_KEEPER_STANDALONE_BUILD

View File

@ -52,6 +52,7 @@ public:
void addObject(ObjectStorageKey key, size_t size);
ObjectKeyWithMetadata popLastObject();
void deserialize(ReadBuffer & buf);
void deserializeFromString(const std::string & data);

View File

@ -559,6 +559,54 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
}
};
struct TruncateFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
size_t size;
TruncateFileOperationOutcomePtr truncate_outcome;
TruncateFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & path_,
size_t size_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, path(path_)
, size(size_)
{}
std::string getInfoForLog() const override
{
return fmt::format("TruncateFileObjectStorageOperation (path: {}, size: {})", path, size);
}
void execute(MetadataTransactionPtr tx) override
{
if (metadata_storage.exists(path))
{
if (!metadata_storage.isFile(path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not a file", path);
truncate_outcome = tx->truncateFile(path, size);
}
}
void undo() override
{
}
void finalize() override
{
if (!truncate_outcome)
return;
if (!truncate_outcome->objects_to_remove.empty())
object_storage.removeObjectsIfExist(truncate_outcome->objects_to_remove);
}
};
}
void DiskObjectStorageTransaction::createDirectory(const std::string & path)
@ -598,6 +646,13 @@ void DiskObjectStorageTransaction::moveFile(const String & from_path, const Stri
}));
}
void DiskObjectStorageTransaction::truncateFile(const String & path, size_t size)
{
operations_to_execute.emplace_back(
std::make_unique<TruncateFileObjectStorageOperation>(object_storage, metadata_storage, path, size)
);
}
void DiskObjectStorageTransaction::replaceFile(const std::string & from_path, const std::string & to_path)
{
auto operation = std::make_unique<ReplaceFileObjectStorageOperation>(object_storage, metadata_storage, from_path, to_path);

View File

@ -92,6 +92,8 @@ public:
void createFile(const String & path) override;
void truncateFile(const String & path, size_t size) override;
void copyFile(const std::string & from_file_path, const std::string & to_file_path, const ReadSettings & read_settings, const WriteSettings &) override;
/// writeFile is a difficult function for transactions.

View File

@ -31,7 +31,15 @@ struct UnlinkMetadataFileOperationOutcome
UInt32 num_hardlinks = std::numeric_limits<UInt32>::max();
};
struct TruncateFileOperationOutcome
{
StoredObjects objects_to_remove;
};
using UnlinkMetadataFileOperationOutcomePtr = std::shared_ptr<UnlinkMetadataFileOperationOutcome>;
using TruncateFileOperationOutcomePtr = std::shared_ptr<TruncateFileOperationOutcome>;
/// Tries to provide some "transactions" interface, which allow
/// to execute (commit) operations simultaneously. We don't provide
@ -143,6 +151,11 @@ public:
return nullptr;
}
virtual TruncateFileOperationOutcomePtr truncateFile(const std::string & /* path */, size_t /* size */)
{
throwNotImplemented();
}
virtual ~IMetadataTransaction() = default;
protected:

View File

@ -259,4 +259,12 @@ UnlinkMetadataFileOperationOutcomePtr MetadataStorageFromDiskTransaction::unlink
return result;
}
TruncateFileOperationOutcomePtr MetadataStorageFromDiskTransaction::truncateFile(const std::string & path, size_t target_size)
{
auto operation = std::make_unique<TruncateMetadataFileOperation>(path, target_size, metadata_storage, *metadata_storage.getDisk());
auto result = operation->outcome;
addOperation(std::move(operation));
return result;
}
}

View File

@ -129,6 +129,8 @@ public:
UnlinkMetadataFileOperationOutcomePtr unlinkMetadata(const std::string & path) override;
TruncateFileOperationOutcomePtr truncateFile(const std::string & src_path, size_t target_size) override;
};

View File

@ -4,9 +4,12 @@
#include <Common/getRandomASCIIString.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <optional>
#include <ranges>
#include <filesystem>
#include <utility>
namespace fs = std::filesystem;
@ -14,6 +17,11 @@ namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static std::string getTempFileName(const std::string & dir)
{
return fs::path(dir) / getRandomASCIIString(32);
@ -341,6 +349,35 @@ void UnlinkMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
outcome->num_hardlinks++;
}
void TruncateMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
if (metadata_storage.exists(path))
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
while (metadata->getTotalSizeBytes() > target_size)
{
auto object_key_with_metadata = metadata->popLastObject();
outcome->objects_to_remove.emplace_back(object_key_with_metadata.key.serialize(), path, object_key_with_metadata.metadata.size_bytes);
}
if (metadata->getTotalSizeBytes() != target_size)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "File {} can't be truncated to size {}", path, target_size);
}
LOG_TEST(getLogger("TruncateMetadataFileOperation"), "Going to remove {} blobs.", outcome->objects_to_remove.size());
write_operation = std::make_unique<WriteFileOperation>(path, disk, metadata->serializeToString());
write_operation->execute(metadata_lock);
}
}
void TruncateMetadataFileOperation::undo(std::unique_lock<SharedMutex> & lock)
{
if (write_operation)
write_operation->undo(lock);
}
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);

View File

@ -282,4 +282,34 @@ private:
std::unique_ptr<WriteFileOperation> write_operation;
};
struct TruncateMetadataFileOperation final : public IMetadataOperation
{
const TruncateFileOperationOutcomePtr outcome = std::make_shared<TruncateFileOperationOutcome>();
TruncateMetadataFileOperation(
const std::string & path_,
size_t target_size_,
const MetadataStorageFromDisk & metadata_storage_,
IDisk & disk_)
: path(path_)
, target_size(target_size_)
, metadata_storage(metadata_storage_)
, disk(disk_)
{
}
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo(std::unique_lock<SharedMutex> & lock) override;
private:
std::string path;
size_t target_size;
const MetadataStorageFromDisk & metadata_storage;
IDisk & disk;
std::unique_ptr<WriteFileOperation> write_operation;
};
}

View File

@ -76,7 +76,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
/// Empty field, just skip spaces
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedStringInto(out, buf);
readEscapedStringInto<NullOutput,false>(out, buf);
break;
case FormatSettings::EscapingRule::Quoted:
readQuotedFieldInto(out, buf);

View File

@ -1,6 +1,7 @@
#include <Formats/FormatFactory.h>
#include <algorithm>
#include <unistd.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
@ -15,7 +16,7 @@
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/KnownObjectNames.h>
#include <unistd.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -203,6 +204,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header;
format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines;
format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns;
format_settings.tsv.crlf_end_of_line_input = settings.input_format_tsv_crlf_end_of_line;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
@ -694,21 +696,12 @@ String FormatFactory::getFormatFromFileName(String file_name)
std::optional<String> FormatFactory::tryGetFormatFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return tryGetFormatFromFileName(file_path);
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(fd);
if (file_name)
return tryGetFormatFromFileName(*file_name);
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return tryGetFormatFromFileName(file_path);
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
String FormatFactory::getFormatFromFileDescriptor(int fd)

View File

@ -362,6 +362,7 @@ struct FormatSettings
bool try_detect_header = true;
bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false;
bool crlf_end_of_line_input = false;
} tsv{};
struct

View File

@ -31,7 +31,7 @@ extract_into_parent_list(clickhouse_functions_headers dbms_headers
add_library(clickhouse_functions_obj OBJECT ${clickhouse_functions_headers} ${clickhouse_functions_sources})
if (OMIT_HEAVY_DEBUG_SYMBOLS)
target_compile_options(clickhouse_functions_obj PRIVATE "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} PROPERTIES COMPILE_FLAGS "-g0")
set_source_files_properties(${DBMS_FUNCTIONS} DIRECTORY .. PROPERTIES COMPILE_FLAGS "-g0")
endif()
list (APPEND OBJECT_LIBS $<TARGET_OBJECTS:clickhouse_functions_obj>)

View File

@ -49,6 +49,8 @@
#include <base/bit_cast.h>
#include <base/unaligned.h>
#include <algorithm>
namespace DB
{
@ -75,17 +77,29 @@ namespace impl
ColumnPtr key0;
ColumnPtr key1;
bool is_const;
const ColumnArray::Offsets * offsets{};
size_t size() const
{
assert(key0 && key1);
assert(key0->size() == key1->size());
assert(offsets == nullptr || offsets->size() == key0->size());
if (offsets != nullptr)
return offsets->back();
return key0->size();
}
SipHashKey getKey(size_t i) const
{
if (is_const)
i = 0;
if (offsets != nullptr)
{
const auto *const begin = offsets->begin();
const auto * upper = std::upper_bound(begin, offsets->end(), i);
if (upper == offsets->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "offset {} not found in function SipHashKeyColumns::getKey", i);
i = upper - begin;
}
const auto & key0data = assert_cast<const ColumnUInt64 &>(*key0).getData();
const auto & key1data = assert_cast<const ColumnUInt64 &>(*key1).getData();
return {key0data[i], key1data[i]};
@ -1112,7 +1126,15 @@ private:
typename ColumnVector<ToType>::Container vec_temp(nested_size);
bool nested_is_first = true;
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
if constexpr (Keyed)
{
KeyColumnsType key_cols_tmp{key_cols};
key_cols_tmp.offsets = &offsets;
executeForArgument(key_cols_tmp, nested_type, nested_column, vec_temp, nested_is_first);
}
else
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
const size_t size = offsets.size();

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int AZURE_BLOB_STORAGE_ERROR;
extern const int LOGICAL_ERROR;
}
namespace
@ -94,11 +95,56 @@ namespace
void calculatePartSize()
{
auto max_upload_part_size = settings->max_upload_part_size;
if (!max_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be 0");
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
auto max_part_number = settings->max_blocks_in_multipart_upload;
const auto min_upload_part_size = settings->min_upload_part_size;
const auto max_upload_part_size = settings->max_upload_part_size;
if (!max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_blocks_in_multipart_upload must not be 0");
else if (!min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (max_upload_part_size < min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = min_upload_part_size;
auto num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > max_part_number)
{
part_size = (total_size + max_part_number - 1) / max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > max_upload_part_size)
{
part_size = max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to Azure. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = max_upload_part_size;
normal_part_size = part_size;
}
public:
@ -219,21 +265,22 @@ namespace
auto block_blob_client = client->GetBlockBlobClient(dest_blob);
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), task.part_offset, task.part_size);
while (!read_buffer->eof())
{
auto size = read_buffer->available();
if (size > 0)
{
auto block_id = getRandomASCIIString(64);
Azure::Core::IO::MemoryBodyStream memory(reinterpret_cast<const uint8_t *>(read_buffer->position()), size);
block_blob_client.StageBlock(block_id, memory);
task.block_ids.emplace_back(block_id);
read_buffer->ignore(size);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}", dest_container_for_logging, dest_blob, block_id);
}
}
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
LOG_TRACE(log, "Writing part finished. Container: {}, Blob: {}, Parts: {}", dest_container_for_logging, dest_blob, bg_tasks.size());
/// task.part_size is already normalized according to min_upload_part_size and max_upload_part_size.
size_t size_to_stage = task.part_size;
PODArray<char> memory;
memory.resize(size_to_stage);
WriteBufferFromVector<PODArray<char>> wb(memory);
copyData(*read_buffer, wb, size_to_stage);
Azure::Core::IO::MemoryBodyStream stream(reinterpret_cast<const uint8_t *>(memory.data()), size_to_stage);
const auto & block_id = task.block_ids.emplace_back(getRandomASCIIString(64));
block_blob_client.StageBlock(block_id, stream);
LOG_TRACE(log, "Writing part. Container: {}, Blob: {}, block_id: {}, size: {}",
dest_container_for_logging, dest_blob, block_id, size_to_stage);
}
@ -333,8 +380,8 @@ void copyAzureBlobStorageFile(
LOG_TRACE(&Poco::Logger::get("copyAzureBlobStorageFile"), "Reading from Container: {}, Blob: {}", src_container_for_logging, src_blob);
auto create_read_buffer = [&]
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(src_client, src_blob, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
return std::make_unique<ReadBufferFromAzureBlobStorage>(
src_client, src_blob, read_settings, settings->max_single_read_retries, settings->max_single_download_retries);
};
UploadHelper helper{create_read_buffer, dest_client, offset, size, dest_container_for_logging, dest_blob, settings, schedule, &Poco::Logger::get("copyAzureBlobStorageFile")};

View File

@ -352,7 +352,6 @@ static ReturnType parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
{
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
}
s.push_back(unhex2(hex_code));
}
else if (char_after_backslash == 'N')
@ -608,13 +607,20 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf, bool kee
}
template <typename Vector, bool parse_complex_escape_sequence>
template <typename Vector, bool parse_complex_escape_sequence, bool support_crlf>
void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
char * next_pos;
if constexpr (support_crlf)
{
next_pos = find_first_symbols<'\t', '\n', '\\','\r'>(buf.position(), buf.buffer().end());
}
else
{
next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
}
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
@ -641,25 +647,46 @@ void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
}
}
}
if constexpr (support_crlf)
{
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() != '\n')
{
s.push_back('\r');
continue;
}
return;
}
}
}
}
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf)
{
readEscapedStringIntoImpl<Vector, true>(s, buf);
readEscapedStringIntoImpl<Vector, true, support_crlf>(s, buf);
}
void readEscapedString(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto(s, buf);
readEscapedStringInto<String,false>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto<String,true>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>,false>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,false>(NullOutput & s, ReadBuffer & buf);
template void readEscapedStringInto<PaddedPODArray<UInt8>,true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,true>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true,
* strings like 'abc''def' will be parsed as abc'def.
@ -2069,7 +2096,14 @@ bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON &
void readTSVField(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false>(s, buf);
readEscapedStringIntoImpl<String, false, false>(s, buf);
}
void readTSVFieldCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false, true>(s, buf);
}
}

View File

@ -583,6 +583,8 @@ void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf);
void readQuotedString(String & s, ReadBuffer & buf);
void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf);
@ -645,7 +647,7 @@ void readStringInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readNullTerminated(Vector & s, ReadBuffer & buf);
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf);
template <bool enable_sql_style_quoting, typename Vector>
@ -1901,6 +1903,7 @@ void readJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & se
bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings);
void readTSVField(String & s, ReadBuffer & buf);
void readTSVFieldCRLF(String & s, ReadBuffer & buf);
/** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters).
* It is assumed that the cursor is located on the `\` symbol

View File

@ -316,23 +316,23 @@ namespace
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > max_part_number || part_size < min_upload_part_size || part_size > max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, max_upload_part_size);
String error;
if (num_parts < 1)
error = "Number of parts is zero";
else if (num_parts > max_part_number)
error = fmt::format("Number of parts exceeds {}/{}", num_parts, max_part_number);
else if (part_size < min_upload_part_size)
error = fmt::format("Size of a part is less than {}/{}", part_size, min_upload_part_size);
else if (part_size > max_upload_part_size)
error = fmt::format("Size of a part exceeds {}/{}", part_size, max_upload_part_size);
if (!error.empty())
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}",
msg, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
error, total_size, max_part_number, min_upload_part_size, max_upload_part_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).

View File

@ -1392,7 +1392,16 @@ void executeQuery(
const char * begin;
const char * end;
istr.nextIfAtEnd();
try
{
istr.nextIfAtEnd();
}
catch (...)
{
/// If buffer contains invalid data and we failed to decompress, we still want to have some information about the query in the log.
logQuery("<cannot parse>", context, /* internal = */ false, QueryProcessingStage::Complete);
throw;
}
size_t max_query_size = context->getSettingsRef().max_query_size;

View File

@ -135,7 +135,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// If the key is not found, skip the value.
NullOutput sink;
readEscapedStringInto(sink, *in);
readEscapedStringInto<NullOutput,false>(sink, *in);
}
else
{

View File

@ -10,6 +10,8 @@
#include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <boost/range/adaptor/map.hpp>
#include "Formats/FormatSettings.h"
namespace DB
{
@ -28,7 +30,8 @@ static void checkForCarriageReturn(ReadBuffer & in)
throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.");
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r"
"\nor else enable setting 'input_format_tsv_crlf_end_of_line'");
}
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(
@ -92,7 +95,12 @@ void TabSeparatedFormatReader::skipRowEndDelimiter()
if (buf->eof())
return;
if (unlikely(first_row))
if (format_settings.tsv.crlf_end_of_line_input)
{
if (*buf->position() == '\r')
++buf->position();
}
else if (unlikely(first_row))
{
checkForCarriageReturn(*buf);
first_row = false;
@ -105,14 +113,15 @@ template <bool read_string>
String TabSeparatedFormatReader::readFieldIntoString()
{
String field;
bool support_crlf = format_settings.tsv.crlf_end_of_line_input;
if (is_raw)
readString(field, *buf);
else
{
if constexpr (read_string)
readEscapedString(field, *buf);
support_crlf ? readEscapedStringCRLF(field, *buf) : readEscapedString(field, *buf);
else
readTSVField(field, *buf);
support_crlf ? readTSVFieldCRLF(field, *buf) : readTSVField(field, *buf);
}
return field;
}
@ -123,7 +132,7 @@ void TabSeparatedFormatReader::skipField()
if (is_raw)
readStringInto(out, *buf);
else
readEscapedStringInto(out, *buf);
format_settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<NullOutput,true>(out, *buf) : readEscapedStringInto<NullOutput,false>(out, *buf);
}
void TabSeparatedFormatReader::skipHeaderRow()
@ -155,7 +164,7 @@ bool TabSeparatedFormatReader::readField(IColumn & column, const DataTypePtr & t
const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/)
{
const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n');
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r'));
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
@ -220,7 +229,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
try
{
assertChar('\n', *buf);
if (!format_settings.tsv.crlf_end_of_line_input)
assertChar('\n', *buf);
else
assertChar('\r', *buf);
}
catch (const DB::Exception &)
{
@ -233,7 +245,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
else if (*buf->position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
" It's like your file has DOS/Windows style line separators. \n"
"You must transform your file to Unix format. \n"
"But if you really need carriage return at end of string value of last column, you need to escape it as \\r \n"
"or else enable setting 'input_format_tsv_crlf_end_of_line'";
}
else
{
@ -348,7 +363,7 @@ void TabSeparatedFormatReader::skipRow()
bool TabSeparatedFormatReader::checkForEndOfRow()
{
return buf->eof() || *buf->position() == '\n';
return buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r');
}
TabSeparatedSchemaReader::TabSeparatedSchemaReader(

View File

@ -707,11 +707,11 @@ void HTTPHandler::processQuery(
/// The data can also be compressed using incompatible internal algorithm. This is indicated by
/// 'decompress' query parameter.
std::unique_ptr<ReadBuffer> in_post_maybe_compressed;
bool in_post_compressed = false;
bool is_in_post_compressed = false;
if (params.getParsed<bool>("decompress", false))
{
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post);
in_post_compressed = true;
in_post_maybe_compressed = std::make_unique<CompressedReadBuffer>(*in_post, /* allow_different_codecs_ = */ false, /* external_data_ = */ true);
is_in_post_compressed = true;
}
else
in_post_maybe_compressed = std::move(in_post);
@ -845,7 +845,7 @@ void HTTPHandler::processQuery(
/// If 'http_native_compression_disable_checksumming_on_decompress' setting is turned on,
/// checksums of client data compressed with internal algorithm are not checked.
if (in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
if (is_in_post_compressed && settings.http_native_compression_disable_checksumming_on_decompress)
static_cast<CompressedReadBuffer &>(*in_post_maybe_compressed).disableChecksumming();
/// Add CORS header if 'add_http_cors_header' setting is turned on send * in Access-Control-Allow-Origin

View File

@ -254,6 +254,10 @@ AzureObjectStorage::SettingsPtr StorageAzureBlob::createSettings(const ContextPt
auto settings_ptr = std::make_unique<AzureObjectStorageSettings>();
settings_ptr->max_single_part_upload_size = context_settings.azure_max_single_part_upload_size;
settings_ptr->max_single_read_retries = context_settings.azure_max_single_read_retries;
settings_ptr->strict_upload_part_size = context_settings.azure_strict_upload_part_size;
settings_ptr->max_upload_part_size = context_settings.azure_max_upload_part_size;
settings_ptr->max_blocks_in_multipart_upload = context_settings.azure_max_blocks_in_multipart_upload;
settings_ptr->min_upload_part_size = context_settings.azure_min_upload_part_size;
settings_ptr->list_object_keys_size = static_cast<int32_t>(context_settings.azure_list_object_keys_size);
return settings_ptr;

View File

@ -5,6 +5,7 @@
#include <Common/escapeForFileName.h>
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Compression/CompressedReadBuffer.h>
@ -53,8 +54,13 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int CANNOT_RESTORE_TABLE;
extern const int NOT_IMPLEMENTED;
extern const int FAULT_INJECTED;
}
namespace FailPoints
{
extern const char stripe_log_sink_write_fallpoint[];
}
/// NOTE: The lock `StorageStripeLog::rwlock` is NOT kept locked while reading,
/// because we read ranges of data that do not change.
@ -234,6 +240,11 @@ public:
/// Save the new indices.
storage.saveIndices(lock);
// While executing save file sizes the exception might occurs. S3::TooManyRequests for example.
fiu_do_on(FailPoints::stripe_log_sink_write_fallpoint,
{
throw Exception(ErrorCodes::FAULT_INJECTED, "Injecting fault for inserting into StipeLog table");
});
/// Save the new file sizes.
storage.saveFileSizes(lock);

View File

@ -281,7 +281,10 @@ def test_backup_restore_on_merge_tree(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'",
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String) Engine = MergeTree() ORDER BY tuple() SETTINGS storage_policy='blob_storage_policy'
""",
)
azure_query(node, f"INSERT INTO test_simple_merge_tree VALUES (1, 'a')")
@ -299,3 +302,85 @@ def test_backup_restore_on_merge_tree(cluster):
)
azure_query(node, f"DROP TABLE test_simple_merge_tree")
azure_query(node, f"DROP TABLE test_simple_merge_tree_restored")
def test_backup_restore_correct_block_ids(cluster):
node = cluster.instances["node"]
azure_query(
node,
f"""
DROP TABLE IF EXISTS test_simple_merge_tree;
CREATE TABLE test_simple_merge_tree(key UInt64, data String)
Engine = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='blob_storage_policy'""",
)
data_query = "SELECT number, repeat('a', 100) FROM numbers(1000)"
azure_query(
node,
f"INSERT INTO test_simple_merge_tree {data_query}",
)
for min_upload_size, max_upload_size, max_blocks, expected_block_size in [
(42, 100, 1000, 42),
(42, 52, 86, 52),
]:
data_path = f"test_backup_correct_block_ids_{max_blocks}"
backup_destination = f"AzureBlobStorage('{cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{data_path}')"
azure_query(
node,
f"""
SET azure_min_upload_part_size = {min_upload_size};
SET azure_max_upload_part_size = {max_upload_size};
SET azure_max_blocks_in_multipart_upload = {max_blocks};
BACKUP TABLE test_simple_merge_tree TO {backup_destination} SETTINGS allow_azure_native_copy = 0;
""",
)
port = cluster.azurite_port
connection_string = (
f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;"
f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;"
)
container_name = "cont"
blob_service_client = BlobServiceClient.from_connection_string(
connection_string
)
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs()
data_blob = (
f"{data_path}/data/default/test_simple_merge_tree/all_1_1_0/data.bin"
)
found = False
for blob in blobs:
if data_blob == blob.get("name"):
found = True
break
assert found
blob_client = blob_service_client.get_blob_client(
blob=data_blob, container=container_name
)
blocks_num = len(blob_client.get_block_list()[0])
assert blocks_num > 50
count = 0
for block in blob_client.get_block_list()[0]:
count += 1
if count < blocks_num:
assert block.get("size") == expected_block_size
else:
assert block.get("size") < expected_block_size
azure_query(
node,
f"RESTORE TABLE test_simple_merge_tree AS test_simple_merge_tree_restored_{max_blocks} FROM {backup_destination};",
)
assert azure_query(
node,
f"SELECT * from test_simple_merge_tree_restored_{max_blocks} ORDER BY key",
) == node.query(data_query)

View File

@ -0,0 +1,88 @@
import logging
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage_policy.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
def list_of_files_on_ch_disk(node, disk, path):
disk_path = node.query(
f"SELECT path FROM system.disks WHERE name='{disk}'"
).splitlines()[0]
return node.exec_in_container(
["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root"
)
@pytest.mark.parametrize(
"engine",
[
pytest.param("Log"),
],
)
@pytest.mark.parametrize(
"disk,check_s3",
[
pytest.param("default", False),
pytest.param("s3", True),
],
)
@pytest.mark.parametrize(
"delay",
[
pytest.param(0),
pytest.param(4),
],
)
def test_drop_table(cluster, engine, disk, check_s3, delay):
node = cluster.instances["node"]
node.query("DROP DATABASE IF EXISTS lazy")
node.query("CREATE DATABASE lazy ENGINE=Lazy(2)")
node.query(
"CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format(
engine,
disk,
)
)
node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)")
assert node.query("SELECT count(*) FROM lazy.table") == "10\n"
if delay:
time.sleep(delay)
node.query("DROP TABLE lazy.table SYNC")
if check_s3:
# There mustn't be any orphaned data
assert_objects_count(cluster, 0)
# Local data must be removed
assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == ""

View File

@ -0,0 +1,34 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
<s3_no_retries>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>1</retry_attempts>
<s3_use_adaptive_timeouts>0</s3_use_adaptive_timeouts>
<s3_max_single_read_retries>1</s3_max_single_read_retries>
<connect_timeout_ms>20000</connect_timeout_ms>
</s3_no_retries>
</disks>
<policies>
<s3_no_retries>
<volumes>
<main>
<disk>s3_no_retries</disk>
</main>
</volumes>
</s3_no_retries>
</policies>
</storage_configuration>
</clickhouse>

View File

@ -11,7 +11,7 @@ def cluster():
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/minio.xml", "configs/ssl.xml"],
main_configs=["configs/storage_configuration.xml", "configs/ssl.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
@ -84,3 +84,39 @@ def test_log_family_s3(cluster, log_engine, files_overhead, files_overhead_per_i
assert_objects_count(cluster, 0)
finally:
node.query("DROP TABLE s3_test")
# Imitate case when error occurs while inserting into table.
# For examle S3::TooManyRequests.
# In that case we can update data file, but not the size file.
# So due to exception we should do truncate of the data file to undo the insert query.
# See FileChecker::repair().
def test_stripe_log_truncate(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE stripe_table (
a int
) ENGINE = StripeLog()
SETTINGS storage_policy='s3_no_retries'
"""
)
node.query("SYSTEM ENABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query(
"""
INSERT INTO stripe_table SELECT number FROM numbers(10)
""",
ignore_error=True,
)
node.query("SYSTEM DISABLE FAILPOINT stripe_log_sink_write_fallpoint")
node.query("SELECT count(*) FROM stripe_table") == "0\n"
node.query("INSERT INTO stripe_table SELECT number FROM numbers(10)")
node.query("SELECT count(*) FROM stripe_table") == "10\n"
# Make sure that everything is okey with the table after restart.
node.query("DETACH TABLE stripe_table")
node.query("ATTACH TABLE stripe_table")
assert node.query("DROP TABLE stripe_table") == ""

View File

@ -132,3 +132,7 @@ SELECT * FROM merge_table ORDER BY id, val;
2 a
2 b
3 c
select sum(number) from numbers(10) settings final=1;
45
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;
90

View File

@ -102,3 +102,6 @@ insert into table_to_merge_c values (3,'c');
-- expected output:
-- 1 c, 2 a, 2 b, 3 c
SELECT * FROM merge_table ORDER BY id, val;
select sum(number) from numbers(10) settings final=1;
select sum(number) from remote('127.0.0.{1,2}', numbers(10)) settings final=1;

View File

@ -103,11 +103,11 @@ SELECT '2^30-1', maxMerge(x) from (select CAST(unhex('ffffff3f') || randomString
SELECT '1M without 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || 'x', 'AggregateFunction(max, String)') as x);
SELECT '1M with 0', length(maxMerge(x)) from (select CAST(unhex('00001000') || randomString(0x00100000 - 1) || '\0', 'AggregateFunction(max, String)') as x);
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz1', finalizeAggregation(CAST(unhex('3000000\0303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353600010000000000000000'), 'AggregateFunction(argMax, String, UInt64)')); -- { serverError INCORRECT_DATA }
SELECT 'fuzz2', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '01' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x);
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError CORRUPTED_DATA }
SELECT 'fuzz3', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00' || 'ffffffffffffffff'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz4', finalizeAggregation(CAST(unhex('04000000' || '30313233' || '00'), 'AggregateFunction(argMax, String, UInt64)')) as x, length(x); -- { serverError INCORRECT_DATA }
SELECT 'fuzz5', finalizeAggregation(CAST(unhex('0100000000000000000FFFFFFFF0'), 'AggregateFunction(argMax, UInt64, String)')); -- { serverError INCORRECT_DATA }
drop table if exists aggr;

View File

@ -236,3 +236,6 @@ Check asan bug
0
Check bug found fuzzing
9042C6691B1A75F0EA3314B6F55728BB
Check bug 2 found fuzzing
608E1FF030C9E206185B112C2A25F1A7
ABB65AE97711A2E053E324ED88B1D08B

View File

@ -338,3 +338,10 @@ SELECT sipHash128((toUInt64(9223372036854775806), 1)) = sipHash128(1) GROUP BY s
SELECT 'Check bug found fuzzing';
SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED }
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1])));
SELECT 'Check bug 2 found fuzzing';
DROP TABLE IF EXISTS sipHashKeyed_keys;
CREATE TABLE sipHashKeyed_keys (`a` Map(String, String)) ENGINE = Memory;
INSERT INTO sipHashKeyed_keys FORMAT VALUES ({'a':'b', 'c':'d'}), ({'e':'f', 'g':'h'});
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, materialize(0::UInt64)), a)) FROM sipHashKeyed_keys ORDER BY a;
DROP TABLE sipHashKeyed_keys;

View File

@ -0,0 +1,11 @@
<-- Read UNIX endings -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1
<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Data preparation step
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
UNIX_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_without_crlf.tsv"
DOS_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_with_crlf.tsv"
DATA_FILE_UNIX_ENDINGS="${USER_FILES_PATH:?}/${UNIX_ENDINGS}"
DATA_FILE_DOS_ENDINGS="${USER_FILES_PATH:?}/${DOS_ENDINGS}"
touch $DATA_FILE_UNIX_ENDINGS
touch $DATA_FILE_DOS_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\nAegithina_tiphia\t2018-02-01\t34\n1971-72_Utah_Stars_season\t2016-10-01\t1\n" > $DATA_FILE_UNIX_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\r\nAegithina_tiphia\t2018-02-01\t34\r\n1971-72_Utah_Stars_season\t2016-10-01\t1\r\n" > $DATA_FILE_DOS_ENDINGS
echo -e "<-- Read UNIX endings -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${UNIX_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32');"
$CLICKHOUSE_CLIENT --multiquery --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32'); --{serverError 117}"
echo -e "\n<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32') SETTINGS input_format_tsv_crlf_end_of_line = 1;"
# Test teardown
rm $DATA_FILE_UNIX_ENDINGS
rm $DATA_FILE_DOS_ENDINGS

View File

@ -0,0 +1,2 @@
Hello, World! From client.
Hello, World! From local.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client"
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"

View File

@ -7,8 +7,6 @@ export LC_ALL=C # The "total" should be printed without localization
TU_EXCLUDES=(
AggregateFunctionUniq
Aggregator
# FIXME: Exclude for now
FunctionsConversion
)
if find $1 -name '*.o' | xargs wc -c | grep --regexp='\.o$' | sort -rn | awk '{ if ($1 > 50000000) print }' \