Merge branch 'ClickHouse:master' into fix_wrong_request_protocol_proxy

This commit is contained in:
Arthur Passos 2024-05-23 12:23:47 +02:00 committed by GitHub
commit 13f02db8e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
99 changed files with 1074 additions and 371 deletions

View File

@ -42,25 +42,25 @@ At a minimum, the following information should be added (but add more as needed)
> Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/
<details>
<summary>Modify your CI run</summary>
<summary>CI Settings</summary>
**NOTE:** If your merge the PR with modified CI you **MUST KNOW** what you are doing
**NOTE:** Checked options will be applied if set before CI RunConfig/PrepareRunConfig step
#### Include tests (required builds will be added automatically):
- [ ] <!---ci_include_fast--> Fast test
#### Run these jobs only (required builds will be added automatically):
- [ ] <!---ci_include_integration--> Integration Tests
- [ ] <!---ci_include_stateless--> Stateless tests
- [ ] <!---ci_include_stateful--> Stateful tests
- [ ] <!---ci_include_unit--> Unit tests
- [ ] <!---ci_include_performance--> Performance tests
- [ ] <!---ci_include_aarch64--> All with aarch64
- [ ] <!---ci_include_asan--> All with ASAN
- [ ] <!---ci_include_tsan--> All with TSAN
- [ ] <!---ci_include_analyzer--> All with Analyzer
- [ ] <!---ci_include_azure --> All with Azure
- [ ] <!---ci_include_KEYWORD--> Add your option here
#### Exclude tests:
#### Deny these jobs:
- [ ] <!---ci_exclude_fast--> Fast test
- [ ] <!---ci_exclude_integration--> Integration Tests
- [ ] <!---ci_exclude_stateless--> Stateless tests
@ -72,7 +72,6 @@ At a minimum, the following information should be added (but add more as needed)
- [ ] <!---ci_exclude_ubsan--> All with UBSAN
- [ ] <!---ci_exclude_coverage--> All with Coverage
- [ ] <!---ci_exclude_aarch64--> All with Aarch64
- [ ] <!---ci_exclude_KEYWORD--> Add your option here
#### Extra options:
- [ ] <!---do_not_test--> do not test (only style check)

View File

@ -22,6 +22,9 @@ jobs:
clear-repository: true # to ensure correct digests
fetch-depth: 0 # to get version
filter: tree:0
- name: Cancel PR workflow
run: |
python3 "$GITHUB_WORKSPACE/tests/ci/ci.py" --cancel-previous-run
- name: Python unit tests
run: |
cd "$GITHUB_WORKSPACE/tests/ci"

View File

@ -11,6 +11,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
aspell \
curl \
git \
gh \
file \
libxml2-utils \
moreutils \

View File

@ -197,6 +197,7 @@ SELECT * FROM nestedt FORMAT TSV
- [input_format_tsv_enum_as_number](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_enum_as_number) - treat inserted enum values in TSV formats as enum indices. Default value - `false`.
- [input_format_tsv_use_best_effort_in_schema_inference](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_use_best_effort_in_schema_inference) - use some tweaks and heuristics to infer schema in TSV format. If disabled, all fields will be inferred as Strings. Default value - `true`.
- [output_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#output_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV output format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_crlf_end_of_line](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_crlf_end_of_line) - if it is set true, end of line in TSV input format will be `\r\n` instead of `\n`. Default value - `false`.
- [input_format_tsv_skip_first_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_first_lines) - skip specified number of lines at the beginning of data. Default value - `0`.
- [input_format_tsv_detect_header](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_detect_header) - automatically detect header with names and types in TSV format. Default value - `true`.
- [input_format_tsv_skip_trailing_empty_lines](/docs/en/operations/settings/settings-formats.md/#input_format_tsv_skip_trailing_empty_lines) - skip trailing empty lines at the end of data. Default value - `false`.

View File

@ -561,6 +561,25 @@ Default value: 5000
<max_table_num_to_warn>400</max_table_num_to_warn>
```
## max\_view\_num\_to\_warn {#max-view-num-to-warn}
If the number of attached views exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.
Default value: 10000
**Example**
``` xml
<max_view_num_to_warn>400</max_view_num_to_warn>
```
## max\_dictionary\_num\_to\_warn {#max-dictionary-num-to-warn}
If the number of attached dictionaries exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.
Default value: 1000
**Example**
``` xml
<max_dictionary_num_to_warn>400</max_dictionary_num_to_warn>
```
## max\_part\_num\_to\_warn {#max-part-num-to-warn}
If the number of active parts exceeds the specified value, clickhouse server will add warning messages to `system.warnings` table.

View File

@ -831,7 +831,13 @@ Default value: `0`.
### output_format_tsv_crlf_end_of_line {#output_format_tsv_crlf_end_of_line}
Use DOC/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Use DOS/Windows-style line separator (CRLF) in TSV instead of Unix style (LF).
Disabled by default.
### input_format_tsv_crlf_end_of_line {#input_format_tsv_crlf_end_of_line}
Use DOS/Windows-style line separator (CRLF) for TSV input files instead of Unix style (LF).
Disabled by default.

View File

@ -119,6 +119,7 @@ Hello\nworld
Hello\
world
```
`\n\r` (CRLF) поддерживается с помощью настройки `input_format_tsv_crlf_end_of_line`.
Второй вариант поддерживается, так как его использует MySQL при записи tab-separated дампа.

View File

@ -1178,7 +1178,7 @@ void Client::processConfig()
pager = config().getString("pager", "");
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial();

View File

@ -182,6 +182,11 @@ std::string Keeper::getDefaultConfigFileName() const
return "keeper_config.xml";
}
bool Keeper::allowTextLog() const
{
return false;
}
void Keeper::handleCustomArguments(const std::string & arg, [[maybe_unused]] const std::string & value) // NOLINT
{
if (arg == "force-recovery")

View File

@ -65,6 +65,8 @@ protected:
std::string getDefaultConfigFileName() const override;
bool allowTextLog() const override;
private:
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;

View File

@ -607,7 +607,7 @@ void LocalServer::processConfig()
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
setDefaultFormatsFromConfiguration();
setDefaultFormatsAndCompressionFromConfiguration();
/// Sets external authenticators config (LDAP, Kerberos).
global_context->setExternalAuthenticatorsConfig(config());

View File

@ -1476,6 +1476,8 @@ try
global_context->setMaxTableSizeToDrop(new_server_settings.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(new_server_settings.max_partition_size_to_drop);
global_context->setMaxTableNumToWarn(new_server_settings.max_table_num_to_warn);
global_context->setMaxViewNumToWarn(new_server_settings.max_view_num_to_warn);
global_context->setMaxDictionaryNumToWarn(new_server_settings.max_dictionary_num_to_warn);
global_context->setMaxDatabaseNumToWarn(new_server_settings.max_database_num_to_warn);
global_context->setMaxPartNumToWarn(new_server_settings.max_part_num_to_warn);

View File

@ -4617,6 +4617,36 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
std::unordered_set<std::string> table_expression_column_names_to_skip;
QueryTreeNodesWithNames result;
if (matcher_node_typed.getMatcherType() == MatcherNodeType::COLUMNS_LIST)
{
auto identifiers = matcher_node_typed.getColumnsIdentifiers();
result.reserve(identifiers.size());
for (const auto & identifier : identifiers)
{
auto resolve_result = tryResolveIdentifier(IdentifierLookup{identifier, IdentifierLookupContext::EXPRESSION}, scope);
if (!resolve_result.isResolved())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Unknown identifier '{}' inside COLUMNS matcher. In scope {}",
identifier.getFullName(), scope.dump());
// TODO: Introduce IdentifierLookupContext::COLUMN and get rid of this check
auto * resolved_column = resolve_result.resolved_identifier->as<ColumnNode>();
if (!resolved_column)
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER,
"Identifier '{}' inside COLUMNS matcher must resolve into a column, but got {}. In scope {}",
identifier.getFullName(),
resolve_result.resolved_identifier->getNodeTypeName(),
scope.scope_node->formatASTForErrorMessage());
result.emplace_back(resolve_result.resolved_identifier, resolved_column->getColumnName());
}
return result;
}
result.resize(matcher_node_typed.getColumnsIdentifiers().size());
for (auto & table_expression : table_expressions_stack)
{
bool table_expression_in_resolve_process = nearest_query_scope->table_expressions_in_resolve_process.contains(table_expression.get());
@ -4784,8 +4814,6 @@ QueryAnalyzer::QueryTreeNodesWithNames QueryAnalyzer::resolveUnqualifiedMatcher(
table_expressions_column_nodes_with_names_stack.push_back(std::move(matched_column_nodes_with_names));
}
QueryTreeNodesWithNames result;
for (auto & table_expression_column_nodes_with_names : table_expressions_column_nodes_with_names_stack)
{
for (auto && table_expression_column_node_with_name : table_expression_column_nodes_with_names)

View File

@ -21,6 +21,7 @@
#include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h>
#include <Common/NetException.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Formats/FormatFactory.h>
@ -643,6 +644,9 @@ try
bool extras_into_stdout = need_render_progress || logs_into_stdout;
bool select_only_into_file = select_into_file && !select_into_file_and_stdout;
if (!out_file_buf && default_output_compression_method != CompressionMethod::None)
out_file_buf = wrapWriteBufferWithCompressionMethod(out_buf, default_output_compression_method, 3, 0);
/// It is not clear how to write progress and logs
/// intermixed with data with parallel formatting.
/// It may increase code complexity significantly.
@ -735,7 +739,7 @@ bool ClientBase::isRegularFile(int fd)
return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
void ClientBase::setDefaultFormatsFromConfiguration()
void ClientBase::setDefaultFormatsAndCompressionFromConfiguration()
{
if (config().has("output-format"))
{
@ -759,6 +763,10 @@ void ClientBase::setDefaultFormatsFromConfiguration()
default_output_format = *format_from_file_name;
else
default_output_format = "TSV";
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(STDOUT_FILENO);
if (file_name)
default_output_compression_method = chooseCompressionMethod(*file_name, "");
}
else if (is_interactive)
{

View File

@ -190,7 +190,7 @@ protected:
/// Adjust some settings after command line options and config had been processed.
void adjustSettings();
void setDefaultFormatsFromConfiguration();
void setDefaultFormatsAndCompressionFromConfiguration();
void initTTYBuffer(ProgressOption progress);
@ -224,6 +224,7 @@ protected:
String pager;
String default_output_format; /// Query results output format.
CompressionMethod default_output_compression_method = CompressionMethod::None;
String default_input_format; /// Tables' format for clickhouse-local.
bool select_into_file = false; /// If writing result INTO OUTFILE. It affects progress rendering.

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
extern const int LOGICAL_ERROR;
}
@ -433,7 +434,7 @@ void AsyncLoader::wait(const LoadJobPtr & job, bool no_throw)
std::unique_lock job_lock{job->mutex};
wait(job_lock, job);
if (!no_throw && job->load_exception)
std::rethrow_exception(job->load_exception);
throw Exception(ErrorCodes::ASYNC_LOAD_WAIT_FAILED, "Waited job failed: {}", getExceptionMessage(job->load_exception, /* with_stacktrace = */ false));
}
void AsyncLoader::remove(const LoadJobSet & jobs)

View File

@ -224,6 +224,8 @@
M(PartsActive, "Active data part, used by current and upcoming SELECTs.") \
M(AttachedDatabase, "Active database, used by current and upcoming SELECTs.") \
M(AttachedTable, "Active table, used by current and upcoming SELECTs.") \
M(AttachedView, "Active view, used by current and upcoming SELECTs.") \
M(AttachedDictionary, "Active dictionary, used by current and upcoming SELECTs.") \
M(PartsOutdated, "Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.") \
M(PartsDeleting, "Not active data part with identity refcounter, it is deleting right now by a cleaner.") \
M(PartsDeleteOnDestroy, "Part was moved to another disk and should be deleted in own destructor.") \

View File

@ -600,6 +600,7 @@
M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \
M(720, USER_EXPIRED) \
M(721, DEPRECATED_FUNCTION) \
M(722, ASYNC_LOAD_WAIT_FAILED) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -35,6 +35,7 @@ namespace DB::ErrorCodes
extern const int ASYNC_LOAD_CYCLE;
extern const int ASYNC_LOAD_FAILED;
extern const int ASYNC_LOAD_CANCELED;
extern const int ASYNC_LOAD_WAIT_FAILED;
}
struct Initializer {
@ -262,7 +263,8 @@ TEST(AsyncLoader, CancelPendingJob)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -288,7 +290,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -298,7 +301,8 @@ TEST(AsyncLoader, CancelPendingTask)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -325,7 +329,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
@ -335,7 +340,8 @@ TEST(AsyncLoader, CancelPendingDependency)
}
catch (Exception & e)
{
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}
@ -451,8 +457,9 @@ TEST(AsyncLoader, JobFailure)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains(error_message));
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_FAILED"));
}
}
@ -489,8 +496,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
try
{
@ -499,8 +507,9 @@ TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_TRUE(e.message().find(error_message) != String::npos);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
ASSERT_TRUE(e.message().contains(error_message));
}
}
@ -531,7 +540,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
try
{
@ -540,7 +550,8 @@ TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
}
catch (Exception & e)
{
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_WAIT_FAILED);
ASSERT_TRUE(e.message().contains("ASYNC_LOAD_CANCELED"));
}
}

View File

@ -0,0 +1,33 @@
#include <Common/tryGetFileNameByFileDescriptor.h>
#ifdef OS_LINUX
# include <unistd.h>
#elif defined(OS_DARWIN)
# include <fcntl.h>
#endif
#include <fmt/format.h>
namespace DB
{
std::optional<String> tryGetFileNameFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return file_path;
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return file_path;
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <optional>
#include <base/types.h>
namespace DB
{
/// Supports only Linux/MacOS. On other platforms, returns nullopt.
std::optional<String> tryGetFileNameFromFileDescriptor(int fd);
}

View File

@ -97,6 +97,8 @@ namespace DB
M(UInt64, max_table_size_to_drop, 50000000000lu, "If size of a table is greater than this value (in bytes) than table could not be dropped with any DROP query.", 0) \
M(UInt64, max_partition_size_to_drop, 50000000000lu, "Same as max_table_size_to_drop, but for the partitions.", 0) \
M(UInt64, max_table_num_to_warn, 5000lu, "If number of tables is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_view_num_to_warn, 10000lu, "If number of views is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_dictionary_num_to_warn, 1000lu, "If number of dictionaries is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_database_num_to_warn, 1000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, max_part_num_to_warn, 100000lu, "If number of databases is greater than this value, server will create a warning that will displayed to user.", 0) \
M(UInt64, concurrent_threads_soft_limit_num, 0, "Sets how many concurrent thread can be allocated before applying CPU pressure. Zero means unlimited.", 0) \

View File

@ -1079,6 +1079,7 @@ class IColumn;
M(Bool, input_format_csv_skip_trailing_empty_lines, false, "Skip trailing empty lines in CSV format", 0) \
M(Bool, input_format_tsv_skip_trailing_empty_lines, false, "Skip trailing empty lines in TSV format", 0) \
M(Bool, input_format_custom_skip_trailing_empty_lines, false, "Skip trailing empty lines in CustomSeparated format", 0) \
M(Bool, input_format_tsv_crlf_end_of_line, false, "If it is set true, file function will read TSV format with \\r\\n instead of \\n.", 0) \
\
M(Bool, input_format_native_allow_types_conversion, true, "Allow data types conversion in Native input format", 0) \
\

View File

@ -87,6 +87,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{
{"24.5", {{"allow_deprecated_functions", true, false, "Allow usage of deprecated functions"},
{"allow_experimental_join_condition", false, false, "Support join with inequal conditions which involve columns from both left and right table. e.g. t1.y < t2.y."},
{"input_format_tsv_crlf_end_of_line", false, false, "Enables reading of CRLF line endings with TSV formats"},
{"output_format_parquet_use_custom_encoder", false, true, "Enable custom Parquet encoder."},
{"cross_join_min_rows_to_compress", 0, 10000000, "A new setting."},
{"cross_join_min_bytes_to_compress", 0, 1_GiB, "A new setting."},

View File

@ -146,10 +146,10 @@ void SerializationAggregateFunction::serializeTextEscaped(const IColumn & column
}
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationAggregateFunction::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String s;
readEscapedString(s, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr);
deserializeFromString(function, column, s, version);
}

View File

@ -242,8 +242,10 @@ void SerializationBool::deserializeTextEscaped(IColumn & column, ReadBuffer & is
{
if (istr.eof())
throw Exception(ErrorCodes::CANNOT_PARSE_BOOL, "Expected boolean value but get EOF.");
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
if (settings.tsv.crlf_end_of_line_input)
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || *buf.position() == '\r'; });
else
deserializeImpl(column, istr, settings, [](ReadBuffer & buf){ return buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'; });
}
bool SerializationBool::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const

View File

@ -75,7 +75,7 @@ void SerializationCustomSimpleText::serializeTextEscaped(const IColumn & column,
void SerializationCustomSimpleText::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String str;
readEscapedString(str, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(str, istr) : readEscapedString(str, istr);
deserializeFromString(*this, column, str, settings);
}

View File

@ -29,7 +29,7 @@ void SerializationEnum<Type>::deserializeTextEscaped(IColumn & column, ReadBuffe
{
/// NOTE It would be nice to do without creating a temporary object - at least extract std::string out.
std::string field_name;
readEscapedString(field_name, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field_name, istr) : readEscapedString(field_name, istr);
assert_cast<ColumnType &>(column).getData().push_back(ref_enum_values.getValue(StringRef(field_name), true));
}
}

View File

@ -10,8 +10,10 @@
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include "Common/PODArray.h"
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include "base/types.h"
namespace DB
{
@ -183,14 +185,17 @@ static inline bool tryRead(const SerializationFixedString & self, IColumn & colu
}
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationFixedString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); });
read(*this, column, [&istr, &settings](ColumnFixedString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<ColumnFixedString::Chars,true>(data, istr) : readEscapedStringInto<ColumnFixedString::Chars,false>(data, istr);
});
}
bool SerializationFixedString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return tryRead(*this, column, [&istr](ColumnFixedString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr); return true; });
}

View File

@ -286,7 +286,7 @@ bool SerializationNullable::tryDeserializeNullRaw(DB::ReadBuffer & istr, const D
}
template<typename ReturnType, bool escaped>
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const SerializationPtr & nested_serialization, bool & is_null)
{
static constexpr bool throw_exception = std::is_same_v<ReturnType, void>;
@ -319,10 +319,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Check if we have enough data in buffer to check if it's a null.
if (istr.available() > null_representation.size())
{
auto check_for_null = [&null_representation](ReadBuffer & buf)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf)
{
auto * pos = buf.position();
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n'))
if (checkString(null_representation, buf) && (*buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.position() = pos;
return false;
@ -334,14 +334,14 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
/// Use PeekableReadBuffer to make a checkpoint before checking null
/// representation and rollback if check was failed.
PeekableReadBuffer peekable_buf(istr, true);
auto check_for_null = [&null_representation](ReadBuffer & buf_)
auto check_for_null = [&null_representation, &settings](ReadBuffer & buf_)
{
auto & buf = assert_cast<PeekableReadBuffer &>(buf_);
buf.setCheckpoint();
SCOPE_EXIT(buf.dropCheckpoint());
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n'))
return true;
if (checkString(null_representation, buf) && (buf.eof() || *buf.position() == '\t' || *buf.position() == '\n' || (settings.tsv.crlf_end_of_line_input && *buf.position() == '\r')))
return true;
buf.rollbackToCheckpoint();
return false;
};
@ -371,7 +371,10 @@ ReturnType deserializeTextEscapedAndRawImpl(IColumn & column, ReadBuffer & istr,
if (null_representation.find('\t') != std::string::npos || null_representation.find('\n') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\t' or '\\n' may not work correctly for large input.");
"containing '\\t' or '\\n' may not work correctly for large input.");
if (settings.tsv.crlf_end_of_line_input && null_representation.find('\r') != std::string::npos)
throw DB::Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "TSV custom null representation "
"containing '\\r' may not work correctly for large input.");
WriteBufferFromOwnString parsed_value;
if constexpr (escaped)

View File

@ -104,9 +104,9 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
}
template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); });
deserializeTextImpl(column, [&](String & s) { settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(s, istr) : readEscapedString(s, istr); });
}
template <typename Parser>

View File

@ -147,7 +147,6 @@ void SerializationString::serializeBinaryBulk(const IColumn & column, WriteBuffe
}
}
template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
{
@ -324,14 +323,17 @@ bool SerializationString::tryDeserializeWholeText(IColumn & column, ReadBuffer &
return read<bool>(column, [&](ColumnString::Chars & data) { readStringUntilEOFInto(data, istr); return true; });
}
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
void SerializationString::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
read<void>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); });
read<void>(column, [&](ColumnString::Chars & data)
{
settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr) : readEscapedStringInto<PaddedPODArray<UInt8>,false>(data, istr);
});
}
bool SerializationString::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto(data, istr); return true; });
return read<bool>(column, [&](ColumnString::Chars & data) { readEscapedStringInto<PaddedPODArray<UInt8>,true>(data, istr); return true; });
}
void SerializationString::serializeTextQuoted(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -599,14 +599,14 @@ void SerializationVariant::serializeTextEscaped(const IColumn & column, size_t r
bool SerializationVariant::tryDeserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
return tryDeserializeTextEscapedImpl(column, field, settings);
}
void SerializationVariant::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
String field;
readEscapedString(field, istr);
settings.tsv.crlf_end_of_line_input ? readEscapedStringCRLF(field, istr) : readEscapedString(field, istr);
if (!tryDeserializeTextEscapedImpl(column, field, settings))
throw Exception(ErrorCodes::INCORRECT_DATA, "Cannot parse escaped value of type {} here: {}", variant_name, field);
}

View File

@ -1,6 +1,14 @@
#include <Databases/DatabaseLazy.h>
#include <base/sort.h>
#include <iomanip>
#include <filesystem>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Core/Settings.h>
#include <Databases/DatabaseFactory.h>
#include <Databases/DatabaseLazy.h>
#include <Databases/DatabaseOnDisk.h>
#include <Databases/DatabasesCommon.h>
#include <Interpreters/Context.h>
@ -10,13 +18,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <base/sort.h>
#include <iomanip>
#include <filesystem>
namespace fs = std::filesystem;

View File

@ -5,6 +5,7 @@
#include <span>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabaseOrdinary.h>
#include <Disks/IDisk.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
StoragePtr table = detachTable(local_context, table_name);
/// This is possible for Lazy database.
if (!table)
return;
bool renamed = false;
try
{
fs::rename(table_metadata_path, table_metadata_path_drop);
renamed = true;
table->drop();
table->is_dropped = true;
fs::path table_data_dir(local_context->getPath() + table_data_path_relative);
if (fs::exists(table_data_dir))
(void)fs::remove_all(table_data_dir);
// The table might be not loaded for Lazy database engine.
if (table)
{
table->drop();
table->is_dropped = true;
}
}
catch (...)
{
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
attachTable(local_context, table_name, table, table_data_path_relative);
if (table)
attachTable(local_context, table_name, table, table_data_path_relative);
if (renamed)
fs::rename(table_metadata_path_drop, table_metadata_path);
throw;
}
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
if (disk->isReadOnly() || !disk->exists(table_data_path_relative))
continue;
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
disk->removeRecursive(table_data_path_relative);
}
(void)fs::remove(table_metadata_path_drop);
}

View File

@ -76,20 +76,6 @@ static void setReplicatedEngine(ASTCreateQuery * create_query, ContextPtr contex
String replica_path = server_settings.default_replica_path;
String replica_name = server_settings.default_replica_name;
/// Check that replica path doesn't exist
Macros::MacroExpansionInfo info;
StorageID table_id = StorageID(create_query->getDatabase(), create_query->getTable(), create_query->uuid);
info.table_id = table_id;
info.expand_special_macros_only = false;
String zookeeper_path = context->getMacros()->expand(replica_path, info);
if (context->getZooKeeper()->exists(zookeeper_path))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Found existing ZooKeeper path {} while trying to convert table {} to replicated. Table will not be converted.",
zookeeper_path, backQuote(table_id.getFullTableName())
);
auto args = std::make_shared<ASTExpressionList>();
args->children.push_back(std::make_shared<ASTLiteral>(replica_path));
args->children.push_back(std::make_shared<ASTLiteral>(replica_name));

View File

@ -1,4 +1,10 @@
#include <Databases/DatabasesCommon.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
@ -8,17 +14,8 @@
#include <Parsers/formatAST.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/escapeForFileName.h>
#include <Storages/Utils.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/RestorerFromBackup.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
}
namespace DB
@ -263,7 +260,7 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
res = it->second;
tables.erase(it);
res->is_detached = true;
CurrentMetrics::sub(CurrentMetrics::AttachedTable, 1);
CurrentMetrics::sub(getAttachedCounterForStorage(res), 1);
auto table_id = res->getStorageID();
if (table_id.hasUUID())
@ -304,7 +301,7 @@ void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, c
/// It is important to reset is_detached here since in case of RENAME in
/// non-Atomic database the is_detached is set to true before RENAME.
table->is_detached = false;
CurrentMetrics::add(CurrentMetrics::AttachedTable, 1);
CurrentMetrics::add(getAttachedCounterForStorage(table), 1);
}
void DatabaseWithOwnTablesBase::shutdown()

View File

@ -76,7 +76,7 @@ void skipFieldByEscapingRule(ReadBuffer & buf, FormatSettings::EscapingRule esca
/// Empty field, just skip spaces
break;
case FormatSettings::EscapingRule::Escaped:
readEscapedStringInto(out, buf);
readEscapedStringInto<NullOutput,false>(out, buf);
break;
case FormatSettings::EscapingRule::Quoted:
readQuotedFieldInto(out, buf);

View File

@ -1,6 +1,7 @@
#include <Formats/FormatFactory.h>
#include <algorithm>
#include <unistd.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
@ -15,7 +16,7 @@
#include <Poco/URI.h>
#include <Common/Exception.h>
#include <Common/KnownObjectNames.h>
#include <unistd.h>
#include <Common/tryGetFileNameByFileDescriptor.h>
#include <boost/algorithm/string/case_conv.hpp>
@ -202,6 +203,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.tsv.try_detect_header = settings.input_format_tsv_detect_header;
format_settings.tsv.skip_trailing_empty_lines = settings.input_format_tsv_skip_trailing_empty_lines;
format_settings.tsv.allow_variable_number_of_columns = settings.input_format_tsv_allow_variable_number_of_columns;
format_settings.tsv.crlf_end_of_line_input = settings.input_format_tsv_crlf_end_of_line;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.allow_data_after_semicolon = settings.input_format_values_allow_data_after_semicolon;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
@ -693,21 +695,12 @@ String FormatFactory::getFormatFromFileName(String file_name)
std::optional<String> FormatFactory::tryGetFormatFromFileDescriptor(int fd)
{
#ifdef OS_LINUX
std::string proc_path = fmt::format("/proc/self/fd/{}", fd);
char file_path[PATH_MAX] = {'\0'};
if (readlink(proc_path.c_str(), file_path, sizeof(file_path) - 1) != -1)
return tryGetFormatFromFileName(file_path);
std::optional<String> file_name = tryGetFileNameFromFileDescriptor(fd);
if (file_name)
return tryGetFormatFromFileName(*file_name);
return std::nullopt;
#elif defined(OS_DARWIN)
char file_path[PATH_MAX] = {'\0'};
if (fcntl(fd, F_GETPATH, file_path) != -1)
return tryGetFormatFromFileName(file_path);
return std::nullopt;
#else
(void)fd;
return std::nullopt;
#endif
}
String FormatFactory::getFormatFromFileDescriptor(int fd)

View File

@ -361,6 +361,7 @@ struct FormatSettings
bool try_detect_header = true;
bool skip_trailing_empty_lines = false;
bool allow_variable_number_of_columns = false;
bool crlf_end_of_line_input = false;
} tsv{};
struct

View File

@ -49,6 +49,8 @@
#include <base/bit_cast.h>
#include <base/unaligned.h>
#include <algorithm>
namespace DB
{
@ -75,17 +77,29 @@ namespace impl
ColumnPtr key0;
ColumnPtr key1;
bool is_const;
const ColumnArray::Offsets * offsets{};
size_t size() const
{
assert(key0 && key1);
assert(key0->size() == key1->size());
assert(offsets == nullptr || offsets->size() == key0->size());
if (offsets != nullptr)
return offsets->back();
return key0->size();
}
SipHashKey getKey(size_t i) const
{
if (is_const)
i = 0;
if (offsets != nullptr)
{
const auto *const begin = offsets->begin();
const auto * upper = std::upper_bound(begin, offsets->end(), i);
if (upper == offsets->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "offset {} not found in function SipHashKeyColumns::getKey", i);
i = upper - begin;
}
const auto & key0data = assert_cast<const ColumnUInt64 &>(*key0).getData();
const auto & key1data = assert_cast<const ColumnUInt64 &>(*key1).getData();
return {key0data[i], key1data[i]};
@ -1112,7 +1126,15 @@ private:
typename ColumnVector<ToType>::Container vec_temp(nested_size);
bool nested_is_first = true;
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
if constexpr (Keyed)
{
KeyColumnsType key_cols_tmp{key_cols};
key_cols_tmp.offsets = &offsets;
executeForArgument(key_cols_tmp, nested_type, nested_column, vec_temp, nested_is_first);
}
else
executeForArgument(key_cols, nested_type, nested_column, vec_temp, nested_is_first);
const size_t size = offsets.size();

View File

@ -352,7 +352,6 @@ static ReturnType parseComplexEscapeSequence(Vector & s, ReadBuffer & buf)
{
return error("Cannot parse escape sequence", ErrorCodes::CANNOT_PARSE_ESCAPE_SEQUENCE);
}
s.push_back(unhex2(hex_code));
}
else if (char_after_backslash == 'N')
@ -608,13 +607,20 @@ static ReturnType parseJSONEscapeSequence(Vector & s, ReadBuffer & buf, bool kee
}
template <typename Vector, bool parse_complex_escape_sequence>
template <typename Vector, bool parse_complex_escape_sequence, bool support_crlf>
void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
char * next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
char * next_pos;
if constexpr (support_crlf)
{
next_pos = find_first_symbols<'\t', '\n', '\\','\r'>(buf.position(), buf.buffer().end());
}
else
{
next_pos = find_first_symbols<'\t', '\n', '\\'>(buf.position(), buf.buffer().end());
}
appendToStringOrVector(s, buf, next_pos);
buf.position() = next_pos;
@ -641,25 +647,46 @@ void readEscapedStringIntoImpl(Vector & s, ReadBuffer & buf)
}
}
}
if constexpr (support_crlf)
{
if (*buf.position() == '\r')
{
++buf.position();
if (!buf.eof() && *buf.position() != '\n')
{
s.push_back('\r');
continue;
}
return;
}
}
}
}
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf)
{
readEscapedStringIntoImpl<Vector, true>(s, buf);
readEscapedStringIntoImpl<Vector, true, support_crlf>(s, buf);
}
void readEscapedString(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto(s, buf);
readEscapedStringInto<String,false>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput>(NullOutput & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringInto<String,true>(s, buf);
}
template void readEscapedStringInto<PaddedPODArray<UInt8>,false>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,false>(NullOutput & s, ReadBuffer & buf);
template void readEscapedStringInto<PaddedPODArray<UInt8>,true>(PaddedPODArray<UInt8> & s, ReadBuffer & buf);
template void readEscapedStringInto<NullOutput,true>(NullOutput & s, ReadBuffer & buf);
/** If enable_sql_style_quoting == true,
* strings like 'abc''def' will be parsed as abc'def.
@ -2069,7 +2096,14 @@ bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON &
void readTSVField(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false>(s, buf);
readEscapedStringIntoImpl<String, false, false>(s, buf);
}
void readTSVFieldCRLF(String & s, ReadBuffer & buf)
{
s.clear();
readEscapedStringIntoImpl<String, false, true>(s, buf);
}
}

View File

@ -583,6 +583,8 @@ void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf);
void readEscapedStringCRLF(String & s, ReadBuffer & buf);
void readQuotedString(String & s, ReadBuffer & buf);
void readQuotedStringWithSQLStyle(String & s, ReadBuffer & buf);
@ -645,7 +647,7 @@ void readStringInto(Vector & s, ReadBuffer & buf);
template <typename Vector>
void readNullTerminated(Vector & s, ReadBuffer & buf);
template <typename Vector>
template <typename Vector, bool support_crlf>
void readEscapedStringInto(Vector & s, ReadBuffer & buf);
template <bool enable_sql_style_quoting, typename Vector>
@ -1901,6 +1903,7 @@ void readJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & se
bool tryReadJSONField(String & s, ReadBuffer & buf, const FormatSettings::JSON & settings);
void readTSVField(String & s, ReadBuffer & buf);
void readTSVFieldCRLF(String & s, ReadBuffer & buf);
/** Parse the escape sequence, which can be simple (one character after backslash) or more complex (multiple characters).
* It is assumed that the cursor is located on the `\` symbol

View File

@ -160,6 +160,8 @@ namespace CurrentMetrics
extern const Metric TablesLoaderForegroundThreadsScheduled;
extern const Metric IOWriterThreadsScheduled;
extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
extern const Metric AttachedDatabase;
extern const Metric PartsActive;
}
@ -359,6 +361,8 @@ struct ContextSharedPart : boost::noncopyable
/// No lock required for format_schema_path modified only during initialization
std::atomic_size_t max_database_num_to_warn = 1000lu;
std::atomic_size_t max_table_num_to_warn = 5000lu;
std::atomic_size_t max_view_num_to_warn = 10000lu;
std::atomic_size_t max_dictionary_num_to_warn = 1000lu;
std::atomic_size_t max_part_num_to_warn = 100000lu;
String format_schema_path; /// Path to a directory that contains schema files used by input formats.
String google_protos_path; /// Path to a directory that contains the proto files for the well-known Protobuf types.
@ -935,6 +939,10 @@ Strings Context::getWarnings() const
common_warnings = shared->warnings;
if (CurrentMetrics::get(CurrentMetrics::AttachedTable) > static_cast<Int64>(shared->max_table_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached tables is more than {}", shared->max_table_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedView) > static_cast<Int64>(shared->max_view_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached views is more than {}", shared->max_view_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedDictionary) > static_cast<Int64>(shared->max_dictionary_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached dictionaries is more than {}", shared->max_dictionary_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::AttachedDatabase) > static_cast<Int64>(shared->max_database_num_to_warn))
common_warnings.emplace_back(fmt::format("The number of attached databases is more than {}", shared->max_database_num_to_warn));
if (CurrentMetrics::get(CurrentMetrics::PartsActive) > static_cast<Int64>(shared->max_part_num_to_warn))
@ -3711,6 +3719,18 @@ void Context::setMaxTableNumToWarn(size_t max_table_to_warn)
shared->max_table_num_to_warn= max_table_to_warn;
}
void Context::setMaxViewNumToWarn(size_t max_view_to_warn)
{
SharedLockGuard lock(shared->mutex);
shared->max_view_num_to_warn= max_view_to_warn;
}
void Context::setMaxDictionaryNumToWarn(size_t max_dictionary_to_warn)
{
SharedLockGuard lock(shared->mutex);
shared->max_dictionary_num_to_warn= max_dictionary_to_warn;
}
void Context::setMaxDatabaseNumToWarn(size_t max_database_to_warn)
{
SharedLockGuard lock(shared->mutex);

View File

@ -861,6 +861,8 @@ public:
const HTTPHeaderFilter & getHTTPHeaderFilter() const;
void setMaxTableNumToWarn(size_t max_table_to_warn);
void setMaxViewNumToWarn(size_t max_view_to_warn);
void setMaxDictionaryNumToWarn(size_t max_dictionary_to_warn);
void setMaxDatabaseNumToWarn(size_t max_database_to_warn);
void setMaxPartNumToWarn(size_t max_part_to_warn);
/// The port that the server listens for executing SQL queries.

View File

@ -977,6 +977,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
if (as_create.is_ordinary_view)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Cannot CREATE a table AS {}, it is a View", qualified_name);
if (as_create.is_materialized_view && as_create.to_table_id)
throw Exception(
ErrorCodes::INCORRECT_QUERY,
"Cannot CREATE a table AS {}, it is a Materialized View without storage. Use \"AS `{}`\" instead",
qualified_name,
as_create.to_table_id.getQualifiedName());
if (as_create.is_live_view)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Cannot CREATE a table AS {}, it is a Live View", qualified_name);

View File

@ -263,7 +263,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
}
}
#ifndef WITHOUT_TEXT_LOG
if (config.has("text_log"))
if (allowTextLog() && config.has("text_log"))
{
String text_log_level_str = config.getString("text_log.level", "trace");
int text_log_level = Poco::Logger::parseLevel(text_log_level_str);

View File

@ -23,6 +23,10 @@ public:
/// Close log files. On next log write files will be reopened.
void closeLogs(Poco::Logger & logger);
virtual ~Loggers() = default;
protected:
virtual bool allowTextLog() const { return true; }
private:
Poco::AutoPtr<Poco::FileChannel> log_file;

View File

@ -107,6 +107,10 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
[[maybe_unused]] bool push_result = logs_queue->emplace(std::move(columns));
}
auto text_log_locked = text_log.lock();
if (!text_log_locked)
return;
/// Also log to system.text_log table, if message is not too noisy
auto text_log_max_priority_loaded = text_log_max_priority.load(std::memory_order_relaxed);
if (text_log_max_priority_loaded && msg.getPriority() <= text_log_max_priority_loaded)
@ -146,10 +150,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
#undef SET_VALUE_IF_EXISTS
std::shared_ptr<SystemLogQueue<TextLogElement>> text_log_locked{};
text_log_locked = text_log.lock();
if (text_log_locked)
text_log_locked->push(std::move(elem));
text_log_locked->push(std::move(elem));
}
#endif
}

View File

@ -1,7 +1,6 @@
#pragma once
#include <atomic>
#include <vector>
#include <map>
#include <Poco/AutoPtr.h>
#include <Poco/Channel.h>

View File

@ -135,7 +135,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex
/// If the key is not found, skip the value.
NullOutput sink;
readEscapedStringInto(sink, *in);
readEscapedStringInto<NullOutput,false>(sink, *in);
}
else
{

View File

@ -10,6 +10,8 @@
#include <Formats/verbosePrintString.h>
#include <Formats/EscapingRuleUtils.h>
#include <Processors/Formats/Impl/TabSeparatedRowInputFormat.h>
#include <boost/range/adaptor/map.hpp>
#include "Formats/FormatSettings.h"
namespace DB
{
@ -28,7 +30,8 @@ static void checkForCarriageReturn(ReadBuffer & in)
throw Exception(ErrorCodes::INCORRECT_DATA, "\nYou have carriage return (\\r, 0x0D, ASCII 13) at end of first row."
"\nIt's like your input data has DOS/Windows style line separators, that are illegal in TabSeparated format."
" You must transform your file to Unix format."
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r.");
"\nBut if you really need carriage return at end of string value of last column, you need to escape it as \\r"
"\nor else enable setting 'input_format_tsv_crlf_end_of_line'");
}
TabSeparatedRowInputFormat::TabSeparatedRowInputFormat(
@ -92,7 +95,12 @@ void TabSeparatedFormatReader::skipRowEndDelimiter()
if (buf->eof())
return;
if (unlikely(first_row))
if (format_settings.tsv.crlf_end_of_line_input)
{
if (*buf->position() == '\r')
++buf->position();
}
else if (unlikely(first_row))
{
checkForCarriageReturn(*buf);
first_row = false;
@ -105,14 +113,15 @@ template <bool read_string>
String TabSeparatedFormatReader::readFieldIntoString()
{
String field;
bool support_crlf = format_settings.tsv.crlf_end_of_line_input;
if (is_raw)
readString(field, *buf);
else
{
if constexpr (read_string)
readEscapedString(field, *buf);
support_crlf ? readEscapedStringCRLF(field, *buf) : readEscapedString(field, *buf);
else
readTSVField(field, *buf);
support_crlf ? readTSVFieldCRLF(field, *buf) : readTSVField(field, *buf);
}
return field;
}
@ -123,7 +132,7 @@ void TabSeparatedFormatReader::skipField()
if (is_raw)
readStringInto(out, *buf);
else
readEscapedStringInto(out, *buf);
format_settings.tsv.crlf_end_of_line_input ? readEscapedStringInto<NullOutput,true>(out, *buf) : readEscapedStringInto<NullOutput,false>(out, *buf);
}
void TabSeparatedFormatReader::skipHeaderRow()
@ -155,7 +164,7 @@ bool TabSeparatedFormatReader::readField(IColumn & column, const DataTypePtr & t
const SerializationPtr & serialization, bool is_last_file_column, const String & /*column_name*/)
{
const bool at_delimiter = !is_last_file_column && !buf->eof() && *buf->position() == '\t';
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n');
const bool at_last_column_line_end = is_last_file_column && (buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r'));
if (format_settings.tsv.empty_as_default && (at_delimiter || at_last_column_line_end))
{
@ -220,7 +229,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
try
{
assertChar('\n', *buf);
if (!format_settings.tsv.crlf_end_of_line_input)
assertChar('\n', *buf);
else
assertChar('\r', *buf);
}
catch (const DB::Exception &)
{
@ -233,7 +245,10 @@ bool TabSeparatedFormatReader::parseRowEndWithDiagnosticInfo(WriteBuffer & out)
else if (*buf->position() == '\r')
{
out << "ERROR: Carriage return found where line feed is expected."
" It's like your file has DOS/Windows style line separators, that is illegal in TabSeparated format.\n";
" It's like your file has DOS/Windows style line separators. \n"
"You must transform your file to Unix format. \n"
"But if you really need carriage return at end of string value of last column, you need to escape it as \\r \n"
"or else enable setting 'input_format_tsv_crlf_end_of_line'";
}
else
{
@ -348,7 +363,7 @@ void TabSeparatedFormatReader::skipRow()
bool TabSeparatedFormatReader::checkForEndOfRow()
{
return buf->eof() || *buf->position() == '\n';
return buf->eof() || *buf->position() == '\n' || (format_settings.tsv.crlf_end_of_line_input && *buf->position() == '\r');
}
TabSeparatedSchemaReader::TabSeparatedSchemaReader(

View File

@ -45,8 +45,13 @@ IInflatingTransform::Status IInflatingTransform::prepare()
{
if (input.isFinished())
{
output.finish();
return Status::Finished;
if (is_finished)
{
output.finish();
return Status::Finished;
}
is_finished = true;
return Status::Ready;
}
input.setNeeded();
@ -73,6 +78,14 @@ void IInflatingTransform::work()
generated = true;
can_generate = canGenerate();
}
else if (is_finished)
{
if (can_generate || generated || has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "IInflatingTransform cannot finish work because it has generated data or has input data");
current_chunk = getRemaining();
generated = !current_chunk.empty();
}
else
{
if (!has_input)

View File

@ -10,13 +10,14 @@ namespace DB
/// for (chunk : input_chunks)
/// {
/// transform.consume(chunk);
///
/// while (transform.canGenerate())
/// {
/// transformed_chunk = transform.generate();
/// ... (process transformed chunk)
/// }
/// }
/// transformed_chunk = transform.getRemaining();
/// ... (process remaining data)
///
class IInflatingTransform : public IProcessor
{
@ -32,6 +33,7 @@ protected:
virtual void consume(Chunk chunk) = 0;
virtual bool canGenerate() = 0;
virtual Chunk generate() = 0;
virtual Chunk getRemaining() { return {}; }
public:
IInflatingTransform(Block input_header, Block output_header);
@ -41,6 +43,9 @@ public:
InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }
/// canGenerate can flush data when input is finished.
bool is_finished = false;
};
}

View File

@ -624,14 +624,11 @@ SplitPartsRangesResult splitPartsRanges(RangesInDataParts ranges_in_data_parts,
}
/// Process parts ranges with undefined value at end mark
bool is_intersecting = part_index_start_to_range.size() > 1;
/// The last parts ranges could be non-intersect only if: (1) there is only one part range left, (2) it belongs to a non-L0 part,
/// and (3) the begin value of this range is larger than the largest end value of all previous ranges. This is too complicated
/// to check, so we just add the last part ranges to the intersecting ranges.
for (const auto & [part_range_index, mark_range] : part_index_start_to_range)
{
if (is_intersecting)
add_intersecting_range(part_range_index.part_index, mark_range);
else
add_non_intersecting_range(part_range_index.part_index, mark_range);
}
add_intersecting_range(part_range_index.part_index, mark_range);
auto && non_intersecting_ranges_in_data_parts = std::move(non_intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());
auto && intersecting_ranges_in_data_parts = std::move(intersecting_ranges_in_data_parts_builder.getCurrentRangesInDataParts());

View File

@ -56,49 +56,34 @@ void SquashingChunksTransform::work()
SimpleSquashingChunksTransform::SimpleSquashingChunksTransform(
const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes)
: ISimpleTransform(header, header, true), squashing(min_block_size_rows, min_block_size_bytes)
: IInflatingTransform(header, header), squashing(min_block_size_rows, min_block_size_bytes)
{
}
void SimpleSquashingChunksTransform::transform(Chunk & chunk)
void SimpleSquashingChunksTransform::consume(Chunk chunk)
{
if (!finished)
{
if (auto block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns())))
chunk.setColumns(block.getColumns(), block.rows());
}
else
{
if (chunk.hasRows())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk expected to be empty, otherwise it will be lost");
auto block = squashing.add({});
chunk.setColumns(block.getColumns(), block.rows());
}
Block current_block = squashing.add(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
}
IProcessor::Status SimpleSquashingChunksTransform::prepare()
Chunk SimpleSquashingChunksTransform::generate()
{
if (!finished && input.isFinished())
{
if (output.isFinished())
return Status::Finished;
if (squashed_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in SimpleSquashingChunksTransform");
if (!output.canPush())
return Status::PortFull;
return std::move(squashed_chunk);
}
if (has_output)
{
output.pushData(std::move(output_data));
has_output = false;
return Status::PortFull;
}
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
}
finished = true;
/// On the next call to transform() we will return all data buffered in `squashing` (if any)
return Status::Ready;
}
return ISimpleTransform::prepare();
Chunk SimpleSquashingChunksTransform::getRemaining()
{
Block current_block = squashing.add({});
squashed_chunk.setColumns(current_block.getColumns(), current_block.rows());
return std::move(squashed_chunk);
}
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/SquashingTransform.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/IInflatingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
namespace DB
@ -29,7 +30,7 @@ private:
};
/// Doesn't care about propagating exceptions and thus doesn't throw LOGICAL_ERROR if the following transform closes its input port.
class SimpleSquashingChunksTransform : public ISimpleTransform
class SimpleSquashingChunksTransform : public IInflatingTransform
{
public:
explicit SimpleSquashingChunksTransform(const Block & header, size_t min_block_size_rows, size_t min_block_size_bytes);
@ -37,14 +38,14 @@ public:
String getName() const override { return "SimpleSquashingTransform"; }
protected:
void transform(Chunk &) override;
IProcessor::Status prepare() override;
void consume(Chunk chunk) override;
bool canGenerate() override;
Chunk generate() override;
Chunk getRemaining() override;
private:
SquashingTransform squashing;
/// When consumption is finished we need to release the final chunk regardless of its size.
bool finished = false;
Chunk squashed_chunk;
};
}

View File

@ -25,6 +25,7 @@
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeTuple.h>
@ -188,6 +189,7 @@ namespace ErrorCodes
extern const int CANNOT_SCHEDULE_TASK;
extern const int LIMIT_EXCEEDED;
extern const int CANNOT_FORGET_PARTITION;
extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY;
}
static void checkSuspiciousIndices(const ASTFunction * index_function)
@ -8538,6 +8540,16 @@ void MergeTreeData::unloadPrimaryKeys()
}
}
void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)
{
/// Aggregate functions already forbidden, but SimpleAggregateFunction are not
for (const auto & data_type : sorting_key.data_types)
{
if (dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(data_type->getCustomName()))
throw Exception(ErrorCodes::DATA_TYPE_CANNOT_BE_USED_IN_KEY, "Column with type {} is not allowed in key expression", data_type->getCustomName()->getName());
}
}
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove)
{
for (const auto & command : commands)

View File

@ -736,6 +736,8 @@ public:
const ASTPtr & new_settings,
AlterLockHolder & table_lock_holder);
static void verifySortingKey(const KeyDescription & sorting_key);
/// Should be called if part data is suspected to be corrupted.
/// Has the ability to check all other parts
/// which reside on the same disk of the suspicious part.

View File

@ -14,7 +14,6 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSetQuery.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/Context.h>
@ -32,7 +31,6 @@ namespace ErrorCodes
extern const int UNKNOWN_STORAGE;
extern const int NO_REPLICA_NAME_GIVEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY;
}
@ -113,16 +111,6 @@ static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_
return ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_path) / "columns", &columns_stat));
}
static void verifySortingKey(const KeyDescription & sorting_key)
{
/// Aggregate functions already forbidden, but SimpleAggregateFunction are not
for (const auto & data_type : sorting_key.data_types)
{
if (dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(data_type->getCustomName()))
throw Exception(ErrorCodes::DATA_TYPE_CANNOT_BE_USED_IN_KEY, "Column with type {} is not allowed in key expression", data_type->getCustomName()->getName());
}
}
/// Returns whether a new syntax is used to define a table engine, i.e. MergeTree() PRIMARY KEY ... PARTITION BY ... SETTINGS ...
/// instead of MergeTree(MergeTree(date, [sample_key], primary_key).
static bool isExtendedStorageDef(const ASTCreateQuery & query)
@ -678,8 +666,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// column if sorting key will be changed.
metadata.sorting_key = KeyDescription::getSortingKeyFromAST(
args.storage_def->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
if (!local_settings.allow_suspicious_primary_key)
verifySortingKey(metadata.sorting_key);
if (!local_settings.allow_suspicious_primary_key && args.mode <= LoadingStrictnessLevel::CREATE)
MergeTreeData::verifySortingKey(metadata.sorting_key);
/// If primary key explicitly defined, than get it from AST
if (args.storage_def->primary_key)
@ -792,8 +780,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
/// column if sorting key will be changed.
metadata.sorting_key
= KeyDescription::getSortingKeyFromAST(engine_args[arg_num], metadata.columns, context, merging_param_key_arg);
if (!local_settings.allow_suspicious_primary_key)
verifySortingKey(metadata.sorting_key);
if (!local_settings.allow_suspicious_primary_key && args.mode <= LoadingStrictnessLevel::CREATE)
MergeTreeData::verifySortingKey(metadata.sorting_key);
/// In old syntax primary_key always equals to sorting key.
metadata.primary_key = KeyDescription::getKeyFromAST(engine_args[arg_num], metadata.columns, context);

View File

@ -155,7 +155,7 @@ std::vector<Chunk> EmbeddedRocksDBBulkSink::squash(Chunk chunk)
return {};
}
std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::serializeChunks(const std::vector<Chunk> & input_chunks) const
std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::serializeChunks(std::vector<Chunk> && input_chunks) const
{
auto serialized_key_column = ColumnString::create();
auto serialized_value_column = ColumnString::create();
@ -168,7 +168,7 @@ std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::seriali
WriteBufferFromVector<ColumnString::Chars> writer_key(serialized_key_data);
WriteBufferFromVector<ColumnString::Chars> writer_value(serialized_value_data);
for (const auto & chunk : input_chunks)
for (auto && chunk : input_chunks)
{
const auto & columns = chunk.getColumns();
auto rows = chunk.getNumRows();
@ -193,13 +193,14 @@ std::pair<ColumnString::Ptr, ColumnString::Ptr> EmbeddedRocksDBBulkSink::seriali
void EmbeddedRocksDBBulkSink::consume(Chunk chunk_)
{
std::vector<Chunk> to_written = squash(std::move(chunk_));
std::vector<Chunk> chunks_to_write = squash(std::move(chunk_));
if (to_written.empty())
if (chunks_to_write.empty())
return;
auto [serialized_key_column, serialized_value_column] = serializeChunks(to_written);
auto [serialized_key_column, serialized_value_column] = serializeChunks(std::move(chunks_to_write));
auto sst_file_path = getTemporarySSTFilePath();
LOG_DEBUG(getLogger("EmbeddedRocksDBBulkSink"), "Writing {} rows to SST file {}", serialized_key_column->size(), sst_file_path);
if (auto status = buildSSTFile(sst_file_path, *serialized_key_column, *serialized_value_column); !status.ok())
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
@ -209,6 +210,7 @@ void EmbeddedRocksDBBulkSink::consume(Chunk chunk_)
if (auto status = storage.rocksdb_ptr->IngestExternalFile({sst_file_path}, ingest_options); !status.ok())
throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString());
LOG_DEBUG(getLogger("EmbeddedRocksDBBulkSink"), "SST file {} has been ingested", sst_file_path);
if (fs::exists(sst_file_path))
(void)fs::remove(sst_file_path);
}
@ -237,4 +239,5 @@ bool EmbeddedRocksDBBulkSink::isEnoughSize(const Chunk & chunk) const
{
return chunk.getNumRows() >= min_block_size_rows;
}
}

View File

@ -49,7 +49,7 @@ private:
bool isEnoughSize(const std::vector<Chunk> & input_chunks) const;
bool isEnoughSize(const Chunk & chunk) const;
/// Serialize chunks to rocksdb key-value pairs
std::pair<ColumnString::Ptr, ColumnString::Ptr> serializeChunks(const std::vector<Chunk> & input_chunks) const;
std::pair<ColumnString::Ptr, ColumnString::Ptr> serializeChunks(std::vector<Chunk> && input_chunks) const;
StorageEmbeddedRocksDB & storage;
StorageMetadataPtr metadata_snapshot;

View File

@ -189,6 +189,7 @@ StorageEmbeddedRocksDB::StorageEmbeddedRocksDB(const StorageID & table_id_,
, rocksdb_dir(std::move(rocksdb_dir_))
, ttl(ttl_)
, read_only(read_only_)
, log(getLogger(fmt::format("StorageEmbeddedRocksDB ({})", getStorageID().getNameForLogs())))
{
setInMemoryMetadata(metadata_);
setSettings(std::move(settings_));
@ -316,6 +317,7 @@ void StorageEmbeddedRocksDB::mutate(const MutationCommands & commands, ContextPt
void StorageEmbeddedRocksDB::drop()
{
std::lock_guard lock(rocksdb_ptr_mx);
rocksdb_ptr->Close();
rocksdb_ptr = nullptr;
}
@ -463,18 +465,13 @@ void StorageEmbeddedRocksDB::initDB()
{
rocksdb::DB * db;
if (read_only)
{
status = rocksdb::DB::OpenForReadOnly(merged, rocksdb_dir, &db);
}
else
{
status = rocksdb::DB::Open(merged, rocksdb_dir, &db);
}
if (!status.ok())
{
throw Exception(ErrorCodes::ROCKSDB_ERROR, "Failed to open rocksdb path at: {}: {}",
rocksdb_dir, status.ToString());
}
throw Exception(ErrorCodes::ROCKSDB_ERROR, "Failed to open rocksdb path at: {}: {}", rocksdb_dir, status.ToString());
rocksdb_ptr = std::unique_ptr<rocksdb::DB>(db);
}
}
@ -589,8 +586,12 @@ SinkToStoragePtr StorageEmbeddedRocksDB::write(
const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context, bool /*async_insert*/)
{
if (getSettings().optimize_for_bulk_insert)
{
LOG_DEBUG(log, "Using bulk insert");
return std::make_shared<EmbeddedRocksDBBulkSink>(query_context, *this, metadata_snapshot);
}
LOG_DEBUG(log, "Using regular insert");
return std::make_shared<EmbeddedRocksDBSink>(*this, metadata_snapshot);
}

View File

@ -124,5 +124,7 @@ private:
bool read_only;
void initDB();
LoggerPtr log;
};
}

View File

@ -333,17 +333,21 @@ void StorageMergeTree::alter(
auto table_id = getStorageID();
auto old_storage_settings = getSettings();
const auto & query_settings = local_context->getSettingsRef();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, local_context->getSettingsRef().materialize_ttl_after_modify, local_context);
auto maybe_mutation_commands = commands.getMutationCommands(new_metadata, query_settings.materialize_ttl_after_modify, local_context);
if (!maybe_mutation_commands.empty())
delayMutationOrThrowIfNeeded(nullptr, local_context);
Int64 mutation_version = -1;
commands.apply(new_metadata, local_context);
if (!query_settings.allow_suspicious_primary_key)
MergeTreeData::verifySortingKey(new_metadata.sorting_key);
/// This alter can be performed at new_metadata level only
if (commands.isSettingsAlter())
{
@ -396,7 +400,7 @@ void StorageMergeTree::alter(
resetObjectColumnsFromActiveParts(parts_lock);
}
if (!maybe_mutation_commands.empty() && local_context->getSettingsRef().alter_sync > 0)
if (!maybe_mutation_commands.empty() && query_settings.alter_sync > 0)
waitForMutation(mutation_version, false);
}

View File

@ -590,6 +590,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id);
zkutil::EventPtr wait_event = std::make_shared<Poco::Event>();
constexpr size_t MAX_RETRIES_ON_FAILED_MUTATION = 30;
size_t retries_on_failed_mutation = 0;
while (!partial_shutdown_called)
{
/// Mutation maybe killed or whole replica was deleted.
@ -637,18 +640,32 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas(
}
}
/// If mutation status is empty, than local replica may just not loaded it into memory.
if (mutation_status && !mutation_status->latest_fail_reason.empty())
{
LOG_DEBUG(log, "Mutation {} is done {} or failed {} (status: '{}')", mutation_id, mutation_status->is_done, !mutation_status->latest_fail_reason.empty(), mutation_status->latest_fail_reason);
break;
}
/// Replica can become inactive, so wait with timeout, if nothing happened -> recheck it
if (!wait_event->tryWait(1000))
{
LOG_TRACE(log, "Failed to wait for mutation '{}', will recheck", mutation_id);
}
/// If mutation status is empty, than local replica may just not loaded it into memory.
if (mutation_status && !mutation_status->latest_fail_reason.empty())
{
LOG_DEBUG(log, "Mutation {} is done {} or failed {} (status: '{}')", mutation_id, mutation_status->is_done, !mutation_status->latest_fail_reason.empty(), mutation_status->latest_fail_reason);
/// In some cases latest_fail_reason may be retryable and there's a chance it will be cleared after the next attempt
if (++retries_on_failed_mutation <= MAX_RETRIES_ON_FAILED_MUTATION)
continue;
if (mutation_status->is_done)
{
LOG_DEBUG(log, "Looks like mutation {} is done, rechecking", mutation_id);
continue;
}
/// It's still possible that latest_fail_reason will be cleared just before queue.getIncompleteMutationsStatus(...) below,
/// but it's unlikely. Anyway, rethrow the exception here to avoid exiting with is_done=false
checkMutationStatus(mutation_status, {mutation_id});
throw Exception(ErrorCodes::LOGICAL_ERROR, "checkMutationStatus didn't throw when checking status of {}: {}", mutation_id, mutation_status->latest_fail_reason);
}
}
/// This replica inactive, don't check anything
@ -6027,6 +6044,7 @@ void StorageReplicatedMergeTree::alter(
assertNotReadonly();
auto table_id = getStorageID();
const auto & query_settings = query_context->getSettingsRef();
if (commands.isSettingsAlter())
{
@ -6054,6 +6072,13 @@ void StorageReplicatedMergeTree::alter(
return;
}
if (!query_settings.allow_suspicious_primary_key)
{
StorageInMemoryMetadata future_metadata = getInMemoryMetadata();
commands.apply(future_metadata, query_context);
MergeTreeData::verifySortingKey(future_metadata.sorting_key);
}
auto ast_to_str = [](ASTPtr query) -> String
{
@ -6186,7 +6211,7 @@ void StorageReplicatedMergeTree::alter(
auto maybe_mutation_commands = commands.getMutationCommands(
*current_metadata,
query_context->getSettingsRef().materialize_ttl_after_modify,
query_settings.materialize_ttl_after_modify,
query_context);
bool have_mutation = !maybe_mutation_commands.empty();
@ -6309,7 +6334,7 @@ void StorageReplicatedMergeTree::alter(
{
LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes.");
merge_selecting_task->schedule();
waitMutation(*mutation_znode, query_context->getSettingsRef().alter_sync);
waitMutation(*mutation_znode, query_settings.alter_sync);
LOG_DEBUG(log, "Data changes applied.");
}
}

30
src/Storages/Utils.cpp Normal file
View File

@ -0,0 +1,30 @@
#include <Storages/Utils.h>
#include <Storages/IStorage.h>
namespace CurrentMetrics
{
extern const Metric AttachedTable;
extern const Metric AttachedView;
extern const Metric AttachedDictionary;
}
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage)
{
if (storage->isView())
{
return CurrentMetrics::AttachedView;
}
else if (storage->isDictionary())
{
return CurrentMetrics::AttachedDictionary;
}
else
{
return CurrentMetrics::AttachedTable;
}
}
}

10
src/Storages/Utils.h Normal file
View File

@ -0,0 +1,10 @@
#pragma once
#include <Common/CurrentMetrics.h>
#include <Storages/IStorage_fwd.h>
namespace DB
{
CurrentMetrics::Metric getAttachedCounterForStorage(const StoragePtr & storage);
}

View File

@ -44,6 +44,7 @@ from env_helper import (
REPORT_PATH,
S3_BUILDS_BUCKET,
TEMP_PATH,
GITHUB_RUN_ID,
)
from get_robot_token import get_best_robot_token
from git_helper import GIT_PREFIX, Git
@ -52,6 +53,7 @@ from github_helper import GitHub
from pr_info import PRInfo
from report import ERROR, SUCCESS, BuildResult, JobReport
from s3_helper import S3Helper
from ci_metadata import CiMetadata
from version_helper import get_version_from_repo
# pylint: disable=too-many-lines
@ -66,12 +68,12 @@ class PendingState:
class CiCache:
"""
CI cache is a bunch of records. Record is a file stored under special location on s3.
The file name has following format
The file name has a format:
<RECORD_TYPE>_[<ATTRIBUTES>]--<JOB_NAME>_<JOB_DIGEST>_<BATCH>_<NUM_BATCHES>.ci
RECORD_TYPE:
SUCCESSFUL - for successfuly finished jobs
SUCCESSFUL - for successful jobs
PENDING - for pending jobs
ATTRIBUTES:
@ -503,7 +505,7 @@ class CiCache:
self, job: str, batch: int, num_batches: int, release_branch: bool
) -> bool:
"""
checks if a given job have already been done successfuly
checks if a given job have already been done successfully
"""
return self.exist(
self.RecordType.SUCCESSFUL, job, batch, num_batches, release_branch
@ -744,7 +746,7 @@ class CiOptions:
# list of specified jobs to run
ci_jobs: Optional[List[str]] = None
# btaches to run for all multi-batch jobs
# batches to run for all multi-batch jobs
job_batches: Optional[List[int]] = None
do_not_test: bool = False
@ -948,7 +950,7 @@ class CiOptions:
jobs_params[job] = {
"batches": list(range(num_batches)),
"num_batches": num_batches,
"run_if_ci_option_include_set": job_config.run_by_ci_option
"run_by_ci_option": job_config.run_by_ci_option
and pr_info.is_pr,
}
@ -963,10 +965,7 @@ class CiOptions:
for job in jobs_to_do[:]:
job_param = jobs_params[job]
if (
job_param["run_if_ci_option_include_set"]
and job not in jobs_to_do_requested
):
if job_param["run_by_ci_option"] and job not in jobs_to_do_requested:
print(
f"Erasing job '{job}' from list because it's not in included set, but will run only by include"
)
@ -991,7 +990,11 @@ def normalize_check_name(check_name: str) -> str:
def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
# FIXME: consider switching to sub_parser for configure, pre, run, post actions
parser.add_argument(
"--cancel-previous-run",
action="store_true",
help="Action that cancels previous running PR workflow if PR added into the Merge Queue",
)
parser.add_argument(
"--configure",
action="store_true",
@ -1000,17 +1003,19 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
parser.add_argument(
"--update-gh-statuses",
action="store_true",
help="Action that recreate success GH statuses for jobs that finished successfully in past and will be skipped this time",
help="Action that recreate success GH statuses for jobs that finished successfully in past and will be "
"skipped this time",
)
parser.add_argument(
"--pre",
action="store_true",
help="Action that executes prerequesetes for the job provided in --job-name",
help="Action that executes prerequisites for the job provided in --job-name",
)
parser.add_argument(
"--run",
action="store_true",
help="Action that executes run action for specified --job-name. run_command must be configured for a given job name.",
help="Action that executes run action for specified --job-name. run_command must be configured for a given "
"job name.",
)
parser.add_argument(
"--post",
@ -1075,7 +1080,7 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--skip-jobs",
action="store_true",
default=False,
help="skip fetching data about job runs, used in --configure action (for debugging and nigthly ci)",
help="skip fetching data about job runs, used in --configure action (for debugging and nightly ci)",
)
parser.add_argument(
"--force",
@ -1088,7 +1093,8 @@ def parse_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
"--rebuild-all-binaries",
action="store_true",
default=False,
help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in any case, used in --configure action (for release branches)",
help="[DEPRECATED. to be removed, once no wf use it] will create run config without skipping build jobs in "
"any case, used in --configure action (for release branches)",
)
parser.add_argument(
"--commit-message",
@ -1293,7 +1299,7 @@ def _configure_docker_jobs(docker_digest_or_latest: bool) -> Dict:
missing_amd64 = []
missing_aarch64 = []
if not docker_digest_or_latest:
# look for missing arm and amd images only among missing multiarch manifests @missing_multi_dict
# look for missing arm and amd images only among missing multi-arch manifests @missing_multi_dict
# to avoid extra dockerhub api calls
missing_amd64 = list(
check_missing_images_on_dockerhub(missing_multi_dict, "amd64")
@ -1391,7 +1397,7 @@ def _configure_jobs(
):
continue
# fill job randomization buckets (for jobs with configured @random_bucket property))
# fill job randomization buckets (for jobs with configured @random_bucket property)
if job_config.random_bucket:
if not job_config.random_bucket in randomization_buckets:
randomization_buckets[job_config.random_bucket] = set()
@ -1440,8 +1446,7 @@ def _configure_jobs(
jobs_params[job] = {
"batches": batches_to_do,
"num_batches": num_batches,
"run_if_ci_option_include_set": job_config.run_by_ci_option
and pr_info.is_pr,
"run_by_ci_option": job_config.run_by_ci_option and pr_info.is_pr,
}
elif add_to_skip:
# treat job as being skipped only if it's controlled by digest
@ -1485,8 +1490,8 @@ def _configure_jobs(
def _generate_ci_stage_config(jobs_data: Dict[str, Any]) -> Dict[str, Dict[str, Any]]:
"""
populates GH Actions' workflow with real jobs
"Builds_1": [{"job_name": NAME, "runner_type": RUNER_TYPE}]
"Tests_1": [{"job_name": NAME, "runner_type": RUNER_TYPE}]
"Builds_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}]
"Tests_1": [{"job_name": NAME, "runner_type": RUNNER_TYPE}]
...
"""
result = {} # type: Dict[str, Any]
@ -1577,7 +1582,7 @@ def _fetch_commit_tokens(message: str, pr_info: PRInfo) -> List[str]:
for match in matches
if match in CILabels or match.startswith("job_") or match.startswith("batch_")
]
print(f"CI modifyers from commit message: [{res}]")
print(f"CI modifiers from commit message: [{res}]")
res_2 = []
if pr_info.is_pr:
matches = [match[-1] for match in re.findall(pattern, pr_info.body)]
@ -1588,7 +1593,7 @@ def _fetch_commit_tokens(message: str, pr_info: PRInfo) -> List[str]:
or match.startswith("job_")
or match.startswith("batch_")
]
print(f"CI modifyers from PR body: [{res_2}]")
print(f"CI modifiers from PR body: [{res_2}]")
return list(set(res + res_2))
@ -1654,7 +1659,7 @@ def _upload_build_artifacts(
report_url = ci_cache.upload_build_report(build_result)
print(f"Report file has been uploaded to [{report_url}]")
# Upload head master binaries
# Upload master head's binaries
static_bin_name = CI_CONFIG.build_config[build_name].static_binary_name
if pr_info.is_master and static_bin_name:
# Full binary with debug info:
@ -1902,6 +1907,15 @@ def _get_ext_check_name(check_name: str) -> str:
return check_name_with_group
def _cancel_pr_wf(s3: S3Helper, pr_number: int) -> None:
run_id = CiMetadata(s3, pr_number).fetch_meta().run_id
if not run_id:
print(f"ERROR: FIX IT: Run id has not been found PR [{pr_number}]!")
else:
print(f"Canceling PR workflow run_id: [{run_id}], pr: [{pr_number}]")
GitHub.cancel_wf(run_id)
def main() -> int:
logging.basicConfig(level=logging.INFO)
exit_code = 0
@ -1930,6 +1944,12 @@ def main() -> int:
### CONFIGURE action: start
if args.configure:
if CI and pr_info.is_pr:
# store meta on s3 (now we need it only for PRs)
meta = CiMetadata(s3, pr_info.number)
meta.run_id = int(GITHUB_RUN_ID)
meta.push_meta()
ci_options = CiOptions.create_from_pr_message(
args.commit_message or None, update_from_api=True
)
@ -2222,6 +2242,13 @@ def main() -> int:
assert indata, "Run config must be provided via --infile"
_update_gh_statuses_action(indata=indata, s3=s3)
### CANCEL PREVIOUS WORKFLOW RUN
elif args.cancel_previous_run:
assert (
pr_info.is_merge_queue
), "Currently it's supposed to be used in MQ wf to cancel running PR wf if any"
_cancel_pr_wf(s3, pr_info.merged_pr)
### print results
_print_results(result, args.outfile, args.pretty)

View File

@ -52,9 +52,9 @@ class CILabels(metaclass=WithIter):
CI_SET_ARM = "ci_set_arm"
CI_SET_INTEGRATION = "ci_set_integration"
CI_SET_OLD_ANALYZER = "ci_set_old_analyzer"
CI_SET_STATLESS = "ci_set_stateless"
CI_SET_STATELESS = "ci_set_stateless"
CI_SET_STATEFUL = "ci_set_stateful"
CI_SET_STATLESS_ASAN = "ci_set_stateless_asan"
CI_SET_STATELESS_ASAN = "ci_set_stateless_asan"
CI_SET_STATEFUL_ASAN = "ci_set_stateful_asan"
libFuzzer = "libFuzzer"
@ -206,7 +206,7 @@ class DigestConfig:
include_paths: List[Union[str, Path]] = field(default_factory=list)
# file suffixes to exclude from digest
exclude_files: List[str] = field(default_factory=list)
# directories to exlude from digest
# directories to exclude from digest
exclude_dirs: List[Union[str, Path]] = field(default_factory=list)
# docker names to include into digest
docker: List[str] = field(default_factory=list)
@ -217,7 +217,7 @@ class DigestConfig:
@dataclass
class LabelConfig:
"""
configures different CI scenarious per GH label
configures different CI scenarios per GH label
"""
run_jobs: Iterable[str] = frozenset()
@ -231,7 +231,7 @@ class JobConfig:
# configures digest calculation for the job
digest: DigestConfig = field(default_factory=DigestConfig)
# will be triggered for the job if omited in CI workflow yml
# will be triggered for the job if omitted in CI workflow yml
run_command: str = ""
# job timeout, seconds
timeout: Optional[int] = None
@ -242,7 +242,7 @@ class JobConfig:
# to run always regardless of the job digest or/and label
run_always: bool = False
# if the job needs to be run on the release branch, including master (e.g. building packages, docker server).
# NOTE: Subsequent runs on the same branch with the similar digest are still considered skippable.
# NOTE: Subsequent runs on the same branch with the similar digest are still considered skip-able.
required_on_release_branch: bool = False
# job is for pr workflow only
pr_only: bool = False
@ -470,7 +470,7 @@ compatibility_test_common_params = {
"digest": compatibility_check_digest,
"run_command": "compatibility_check.py",
}
statless_test_common_params = {
stateless_test_common_params = {
"digest": stateless_check_digest,
"run_command": 'functional_test_check.py "$CHECK_NAME" $KILL_TIMEOUT',
"timeout": 10800,
@ -665,7 +665,7 @@ class CIConfig:
# crosscompile - no arm required
pass
else:
# switch to aarch64 runnner
# switch to aarch64 runner
result += "-aarch64"
return result
@ -712,7 +712,7 @@ class CIConfig:
break
assert (
res
), f"Error: Experimantal feature... Invlid request or not supported job [{check_name}]"
), f"Error: Experimental feature... Invalid request or not supported job [{check_name}]"
return res
def get_digest_config(self, check_name: str) -> DigestConfig:
@ -815,16 +815,16 @@ class CIConfig:
f"The following names of the build report '{build_report_name}' "
f"are missed in build_config: {missed_names}",
)
# And finally, all of tests' requirements must be in the builds
# And finally, all tests' requirements must be in the builds
for test_name, test_config in self.test_configs.items():
if test_config.required_build not in self.build_config.keys():
logging.error(
"The requierment '%s' for '%s' is not found in builds",
"The requirement '%s' for '%s' is not found in builds",
test_config,
test_name,
)
errors.append(
f"The requierment '{test_config}' for "
f"The requirement '{test_config}' for "
f"'{test_name}' is not found in builds"
)
@ -865,7 +865,7 @@ CI_CONFIG = CIConfig(
JobNames.INTEGRATION_TEST_ASAN_OLD_ANALYZER,
]
),
CILabels.CI_SET_STATLESS: LabelConfig(
CILabels.CI_SET_STATELESS: LabelConfig(
run_jobs=[
JobNames.STYLE_CHECK,
JobNames.FAST_TEST,
@ -873,7 +873,7 @@ CI_CONFIG = CIConfig(
JobNames.STATELESS_TEST_RELEASE,
]
),
CILabels.CI_SET_STATLESS_ASAN: LabelConfig(
CILabels.CI_SET_STATELESS_ASAN: LabelConfig(
run_jobs=[
JobNames.STYLE_CHECK,
JobNames.FAST_TEST,
@ -1180,49 +1180,49 @@ CI_CONFIG = CIConfig(
# End stateful tests for parallel replicas
JobNames.STATELESS_TEST_ASAN: TestConfig(
Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=4, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_TSAN: TestConfig(
Build.PACKAGE_TSAN,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_MSAN: TestConfig(
Build.PACKAGE_MSAN,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_UBSAN: TestConfig(
Build.PACKAGE_UBSAN,
job_config=JobConfig(num_batches=2, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=2, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_DEBUG: TestConfig(
Build.PACKAGE_DEBUG,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_RELEASE: TestConfig(
Build.PACKAGE_RELEASE, job_config=JobConfig(**statless_test_common_params) # type: ignore
Build.PACKAGE_RELEASE, job_config=JobConfig(**stateless_test_common_params) # type: ignore
),
JobNames.STATELESS_TEST_RELEASE_COVERAGE: TestConfig(
Build.PACKAGE_RELEASE_COVERAGE,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_AARCH64: TestConfig(
Build.PACKAGE_AARCH64, job_config=JobConfig(**statless_test_common_params) # type: ignore
Build.PACKAGE_AARCH64, job_config=JobConfig(**stateless_test_common_params) # type: ignore
),
JobNames.STATELESS_TEST_OLD_ANALYZER_S3_REPLICATED_RELEASE: TestConfig(
Build.PACKAGE_RELEASE,
job_config=JobConfig(num_batches=4, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=4, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_S3_DEBUG: TestConfig(
Build.PACKAGE_DEBUG,
job_config=JobConfig(num_batches=6, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=6, **stateless_test_common_params), # type: ignore
),
JobNames.STATELESS_TEST_AZURE_ASAN: TestConfig(
Build.PACKAGE_ASAN,
job_config=JobConfig(num_batches=4, **statless_test_common_params, release_only=True, run_by_ci_option=True), # type: ignore
job_config=JobConfig(num_batches=4, **stateless_test_common_params, release_only=True, run_by_ci_option=True), # type: ignore
),
JobNames.STATELESS_TEST_S3_TSAN: TestConfig(
Build.PACKAGE_TSAN,
job_config=JobConfig(num_batches=5, **statless_test_common_params), # type: ignore
job_config=JobConfig(num_batches=5, **stateless_test_common_params), # type: ignore
),
JobNames.STRESS_TEST_DEBUG: TestConfig(
Build.PACKAGE_DEBUG, job_config=JobConfig(**stress_test_common_params) # type: ignore
@ -1271,8 +1271,7 @@ CI_CONFIG = CIConfig(
),
JobNames.INTEGRATION_TEST_ARM: TestConfig(
Build.PACKAGE_AARCH64,
# add [run_by_label="test arm"] to not run in regular pr workflow by default
job_config=JobConfig(num_batches=6, **integration_test_common_params, run_by_label="test arm"), # type: ignore
job_config=JobConfig(num_batches=6, **integration_test_common_params), # type: ignore
),
JobNames.INTEGRATION_TEST: TestConfig(
Build.PACKAGE_RELEASE,
@ -1326,7 +1325,7 @@ CI_CONFIG = CIConfig(
JobNames.STATELESS_TEST_FLAKY_ASAN: TestConfig(
# replace to non-default
Build.PACKAGE_ASAN,
job_config=JobConfig(pr_only=True, **{**statless_test_common_params, "timeout": 3600}), # type: ignore
job_config=JobConfig(pr_only=True, **{**stateless_test_common_params, "timeout": 3600}), # type: ignore
),
JobNames.JEPSEN_KEEPER: TestConfig(
Build.BINARY_RELEASE,
@ -1486,7 +1485,7 @@ CHECK_DESCRIPTIONS = [
"Checks if new added or modified tests are flaky by running them repeatedly, "
"in parallel, with more randomization. Functional tests are run 100 times "
"with address sanitizer, and additional randomization of thread scheduling. "
"Integrational tests are run up to 10 times. If at least once a new test has "
"Integration tests are run up to 10 times. If at least once a new test has "
"failed, or was too long, this check will be red. We don't allow flaky tests, "
'read <a href="https://clickhouse.com/blog/decorating-a-christmas-tree-with-'
'the-help-of-flaky-tests/">the doc</a>',
@ -1576,7 +1575,7 @@ CHECK_DESCRIPTIONS = [
lambda x: x.startswith("ClickBench"),
),
CheckDescription(
"Falback for unknown",
"Fallback for unknown",
"There's no description for the check yet, please add it to "
"tests/ci/ci_config.py:CHECK_DESCRIPTIONS",
lambda x: True,

116
tests/ci/ci_metadata.py Normal file
View File

@ -0,0 +1,116 @@
from pathlib import Path
from typing import Optional
from env_helper import (
S3_BUILDS_BUCKET,
TEMP_PATH,
)
from s3_helper import S3Helper
from ci_utils import GHActions
# pylint: disable=too-many-lines
class CiMetadata:
"""
CI Metadata class owns data like workflow run_id for a given pr, etc.
Goal is to have everything we need to manage workflows on S3 and rely on GH api as little as possible
"""
_S3_PREFIX = "CI_meta_v1"
_LOCAL_PATH = Path(TEMP_PATH) / "ci_meta"
_FILE_SUFFIX = ".cimd"
_FILENAME_RUN_ID = "run_id" + _FILE_SUFFIX
def __init__(
self,
s3: S3Helper,
pr_number: Optional[int] = None,
sha: Optional[str] = None,
git_ref: Optional[str] = None,
):
assert pr_number or (sha and git_ref)
self.sha = sha
self.pr_number = pr_number
self.git_ref = git_ref
self.s3 = s3
self.run_id = 0
if self.pr_number:
self.s3_path = f"{self._S3_PREFIX}/PRs/{self.pr_number}/"
else:
self.s3_path = f"{self._S3_PREFIX}/{self.git_ref}/{self.sha}/"
self._updated = False
if not self._LOCAL_PATH.exists():
self._LOCAL_PATH.mkdir(parents=True, exist_ok=True)
def fetch_meta(self):
"""
Fetches meta from s3
"""
# clean up
for file in self._LOCAL_PATH.glob("*" + self._FILE_SUFFIX):
file.unlink()
_ = self.s3.download_files(
bucket=S3_BUILDS_BUCKET,
s3_path=self.s3_path,
file_suffix=self._FILE_SUFFIX,
local_directory=self._LOCAL_PATH,
)
meta_files = Path(self._LOCAL_PATH).rglob("*" + self._FILE_SUFFIX)
for file_name in meta_files:
path_in_str = str(file_name)
with open(path_in_str, "r", encoding="utf-8") as f:
# Read all lines in the file
lines = f.readlines()
assert len(lines) == 1
if file_name.name == self._FILENAME_RUN_ID:
self.run_id = int(lines[0])
self._updated = True
return self
def push_meta(
self,
) -> None:
"""
Uploads meta on s3
"""
assert self.run_id
GHActions.print_in_group(
f"Storing workflow metadata: PR [{self.pr_number}]",
[f"run_id: {self.run_id}"],
)
local_file = self._LOCAL_PATH / self._FILENAME_RUN_ID
with open(local_file, "w", encoding="utf-8") as file:
file.write(f"{self.run_id}\n")
_ = self.s3.upload_file(
bucket=S3_BUILDS_BUCKET,
file_path=local_file,
s3_path=self.s3_path + local_file.name,
)
if __name__ == "__main__":
# TEST:
s3 = S3Helper()
a = CiMetadata(s3, 12345, "deadbeaf", "test_branch")
a.run_id = 111
a.push_meta()
b = CiMetadata(s3, 12345, "deadbeaf", "test_branch")
assert b.fetch_meta().run_id == a.run_id
a = CiMetadata(s3, 0, "deadbeaf", "test_branch")
a.run_id = 112
a.push_meta()
b = CiMetadata(s3, 0, "deadbeaf", "test_branch")
assert b.fetch_meta().run_id == a.run_id

View File

@ -9,6 +9,7 @@ from time import sleep
from typing import List, Optional, Tuple, Union
import github
import requests
# explicit reimport
# pylint: disable=useless-import-alias
@ -21,6 +22,9 @@ from github.NamedUser import NamedUser as NamedUser
from github.PullRequest import PullRequest as PullRequest
from github.Repository import Repository as Repository
from env_helper import GITHUB_REPOSITORY
from get_robot_token import get_best_robot_token
# pylint: enable=useless-import-alias
CACHE_PATH = p.join(p.dirname(p.realpath(__file__)), "gh_cache")
@ -260,3 +264,18 @@ class GitHub(github.Github):
def retries(self, value: int) -> None:
assert isinstance(value, int)
self._retries = value
# minimalistic static methods not using pygithub
@staticmethod
def cancel_wf(run_id, strict=False):
token = get_best_robot_token()
headers = {"Authorization": f"token {token}"}
url = f"https://api.github.com/repos/{GITHUB_REPOSITORY}/actions/runs/{run_id}/cancel"
try:
response = requests.post(url, headers=headers, timeout=10)
response.raise_for_status()
print(f"NOTE: Workflow [{run_id}] has been cancelled")
except Exception as ex:
print("ERROR: Got exception executing wf cancel request", ex)
if strict:
raise ex

View File

@ -161,7 +161,7 @@ class TestCIOptions(unittest.TestCase):
"Stateless tests (azure, asan)": {
"batches": list(range(3)),
"num_batches": 3,
"run_if_ci_option_include_set": True,
"run_by_ci_option": True,
}
}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
@ -226,10 +226,10 @@ class TestCIOptions(unittest.TestCase):
job_params[job] = {
"batches": list(range(3)),
"num_batches": 3,
"run_if_ci_option_include_set": "azure" in job,
"run_by_ci_option": "azure" in job,
}
else:
job_params[job] = {"run_if_ci_option_include_set": False}
job_params[job] = {"run_by_ci_option": False}
jobs_to_do, jobs_to_skip, job_params = ci_options.apply(
jobs_to_do, jobs_to_skip, job_params, PRInfo()

View File

@ -1,5 +1,7 @@
<clickhouse>
<max_table_num_to_warn>5</max_table_num_to_warn>
<max_view_num_to_warn>5</max_view_num_to_warn>
<max_dictionary_num_to_warn>5</max_dictionary_num_to_warn>
<max_database_num_to_warn>2</max_database_num_to_warn>
<max_part_num_to_warn>10</max_part_num_to_warn>
</clickhouse>

View File

@ -0,0 +1,12 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
</storage_configuration>
</clickhouse>

View File

@ -0,0 +1,88 @@
import logging
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["configs/storage_policy.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def assert_objects_count(cluster, objects_count, path="data/"):
minio = cluster.minio_client
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
if objects_count != len(s3_objects):
for s3_object in s3_objects:
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
logging.info("Existing S3 object: %s", str(object_meta))
assert objects_count == len(s3_objects)
def list_of_files_on_ch_disk(node, disk, path):
disk_path = node.query(
f"SELECT path FROM system.disks WHERE name='{disk}'"
).splitlines()[0]
return node.exec_in_container(
["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root"
)
@pytest.mark.parametrize(
"engine",
[
pytest.param("Log"),
],
)
@pytest.mark.parametrize(
"disk,check_s3",
[
pytest.param("default", False),
pytest.param("s3", True),
],
)
@pytest.mark.parametrize(
"delay",
[
pytest.param(0),
pytest.param(4),
],
)
def test_drop_table(cluster, engine, disk, check_s3, delay):
node = cluster.instances["node"]
node.query("DROP DATABASE IF EXISTS lazy")
node.query("CREATE DATABASE lazy ENGINE=Lazy(2)")
node.query(
"CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format(
engine,
disk,
)
)
node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)")
assert node.query("SELECT count(*) FROM lazy.table") == "10\n"
if delay:
time.sleep(delay)
node.query("DROP TABLE lazy.table SYNC")
if check_s3:
# There mustn't be any orphaned data
assert_objects_count(cluster, 0)
# Local data must be removed
assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == ""

View File

@ -19,4 +19,4 @@
<shard>01</shard>
</macros>
</clickhouse>
</clickhouse>

View File

@ -15,6 +15,6 @@
<shard>01</shard>
</macros>
<default_replica_path>/clickhouse/'/{database}/{table}/{uuid}</default_replica_path>
<default_replica_path>/lol/kek/'/{uuid}</default_replica_path>
</clickhouse>

View File

@ -6,7 +6,7 @@ cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/clusters_unusual.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
@ -63,7 +63,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedReplacingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', D)"
"ReplicatedReplacingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', D)"
)
)
assert (
@ -73,7 +73,7 @@ def check_tables():
)
.strip()
.startswith(
"ReplicatedVersionedCollapsingMergeTree(\\'/clickhouse/\\\\\\'/{database}/{table}/{uuid}\\', \\'{replica}\\', Sign, Version)"
"ReplicatedVersionedCollapsingMergeTree(\\'/lol/kek/\\\\\\'/{uuid}\\', \\'{replica}\\', Sign, Version)"
)
)

View File

@ -1,69 +0,0 @@
import pytest
from test_modify_engine_on_restart.common import (
get_table_path,
set_convert_flags,
)
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance(
"ch1",
main_configs=[
"configs/config.d/clusters_zk_path.xml",
"configs/config.d/distributed_ddl.xml",
],
with_zookeeper=True,
macros={"replica": "node1"},
stay_alive=True,
)
database_name = "modify_engine_zk_path"
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def q(node, query):
return node.query(database=database_name, sql=query)
def test_modify_engine_fails_if_zk_path_exists(started_cluster):
ch1.query("CREATE DATABASE " + database_name)
q(
ch1,
"CREATE TABLE already_exists_1 ( A Int64, D Date, S String ) ENGINE MergeTree() PARTITION BY toYYYYMM(D) ORDER BY A;",
)
uuid = q(
ch1,
f"SELECT uuid FROM system.tables WHERE table = 'already_exists_1' and database = '{database_name}'",
).strip("'[]\n")
q(
ch1,
f"CREATE TABLE already_exists_2 ( A Int64, D Date, S String ) ENGINE ReplicatedMergeTree('/clickhouse/\\'/{database_name}/already_exists_1/{uuid}', 'r2') PARTITION BY toYYYYMM(D) ORDER BY A;",
)
set_convert_flags(ch1, database_name, ["already_exists_1"])
table_data_path = get_table_path(ch1, "already_exists_1", database_name)
ch1.stop_clickhouse()
ch1.start_clickhouse(retry_start=False, expected_to_fail=True)
# Check if we can cancel convertation
ch1.exec_in_container(
[
"bash",
"-c",
f"rm {table_data_path}convert_to_replicated",
]
)
ch1.start_clickhouse()

View File

@ -5,7 +5,7 @@ import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
@ -18,6 +18,10 @@ from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsReques
from kafka.protocol.group import MemberAssignment
import socket
if is_arm():
# skip due to no arm support for clickhouse/kerberos-kdc docker image
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",

View File

@ -23,10 +23,6 @@ select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a ORDER BY t1.a;
SELECT '-';
-- make sure data is fully written when reading from distributed
optimize table t1_local final;
optimize table t2_local final;
set distributed_product_mode = 'global';
select * from t1_all t1 where t1.a in (select t2.a from t2_all t2);
explain syntax select t1.* from t1_all t1 join t2_all t2 on t1.a = t2.a;

View File

@ -61,6 +61,11 @@ CREATE TABLE github_events
)
ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);
with top_repos as ( select repo_name from github_events where event_type = 'WatchEvent' and toDate(created_at) = today() - 1 group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toMonday(created_at) = toMonday(today() - interval 1 week) group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toYear(created_at) = toYear(today()) - 1 group by repo_name order by count() desc limit 100 ), last_day as ( select repo_name, count() as count_last_day, rowNumberInAllBlocks() + 1 as position_last_day from github_events where repo_name in (select repo_name from top_repos) and toDate(created_at) = today() - 1 group by repo_name order by count_last_day desc ), last_week as ( select repo_name, count() as count_last_week, rowNumberInAllBlocks() + 1 as position_last_week from github_events where repo_name in (select repo_name from top_repos) and toMonday(created_at) = toMonday(today()) - interval 1 week group by repo_name order by count_last_week desc ), last_month as ( select repo_name, count() as count_last_month, rowNumberInAllBlocks() + 1 as position_last_month from github_events where repo_name in (select repo_name from top_repos) and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count_last_month desc ) select d.repo_name, columns(count) from last_day d join last_week w on d.repo_name = w.repo_name join last_month m on d.repo_name = m.repo_name;
with
top_repos as ( select repo_name from github_events where event_type = 'WatchEvent' and toDate(created_at) = today() - 1 group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toMonday(created_at) = toMonday(today() - interval 1 week) group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count() desc limit 100 union distinct select repo_name from github_events where event_type = 'WatchEvent' and toYear(created_at) = toYear(today()) - 1 group by repo_name order by count() desc limit 100 ),
last_day as ( select repo_name, count() as count_last_day, rowNumberInAllBlocks() + 1 as position_last_day from github_events where repo_name in (select repo_name from top_repos) and toDate(created_at) = today() - 1 group by repo_name order by count_last_day desc ),
last_week as ( select repo_name, count() as count_last_week, rowNumberInAllBlocks() + 1 as position_last_week from github_events where repo_name in (select repo_name from top_repos) and toMonday(created_at) = toMonday(today()) - interval 1 week group by repo_name order by count_last_week desc ),
last_month as ( select repo_name, count() as count_last_month, rowNumberInAllBlocks() + 1 as position_last_month from github_events where repo_name in (select repo_name from top_repos) and toStartOfMonth(created_at) = toStartOfMonth(today()) - interval 1 month group by repo_name order by count_last_month desc )
select d.repo_name, columns('count') from last_day d join last_week w on d.repo_name = w.repo_name join last_month m on d.repo_name = m.repo_name;
DROP TABLE github_events;

View File

@ -236,3 +236,6 @@ Check asan bug
0
Check bug found fuzzing
9042C6691B1A75F0EA3314B6F55728BB
Check bug 2 found fuzzing
608E1FF030C9E206185B112C2A25F1A7
ABB65AE97711A2E053E324ED88B1D08B

View File

@ -338,3 +338,10 @@ SELECT sipHash128((toUInt64(9223372036854775806), 1)) = sipHash128(1) GROUP BY s
SELECT 'Check bug found fuzzing';
SELECT [(255, 1048575)], sipHash128ReferenceKeyed((toUInt64(2147483646), toUInt64(9223372036854775807)), ([(NULL, 100), (NULL, NULL), (1024, 10)], toUInt64(2), toUInt64(1024)), ''), hex(sipHash128ReferenceKeyed((-9223372036854775807, 1.), '-1', NULL)), ('', toUInt64(65535), [(9223372036854775807, 9223372036854775806)], toUInt64(65536)), arrayJoin((NULL, 65537, 255), [(NULL, NULL)]) GROUP BY tupleElement((NULL, NULL, NULL, -1), toUInt64(2), 2) = NULL; -- { serverError NOT_IMPLEMENTED }
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, 0::UInt64), ([1, 1])));
SELECT 'Check bug 2 found fuzzing';
DROP TABLE IF EXISTS sipHashKeyed_keys;
CREATE TABLE sipHashKeyed_keys (`a` Map(String, String)) ENGINE = Memory;
INSERT INTO sipHashKeyed_keys FORMAT VALUES ({'a':'b', 'c':'d'}), ({'e':'f', 'g':'h'});
SELECT hex(sipHash128ReferenceKeyed((0::UInt64, materialize(0::UInt64)), a)) FROM sipHashKeyed_keys ORDER BY a;
DROP TABLE sipHashKeyed_keys;

View File

@ -1,3 +1,5 @@
The number of attached tables is more than 5
The number of attached views is more than 5
The number of attached dictionaries is more than 5
The number of attached databases is more than 2
The number of active parts is more than 10

View File

@ -13,6 +13,39 @@ CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_9 (id
CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_10 (id Int32, str String) Engine=Memory;
CREATE TABLE IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_11 (id Int32, str String) Engine=Memory;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_1 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_1;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_2 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_2;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_3 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_3;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_4 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_4;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_5 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_5;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_6 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_6;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_7 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_7;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_8 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_8;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_9 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_9;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_10 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_10;
CREATE VIEW IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_view_11 AS SELECT * FROM test_max_num_to_warn_02931.test_max_num_to_warn_11;
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_1 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_1'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_2 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_2'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_3 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_3'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_4 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_4'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_5 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_5'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_6 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_6'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_7 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_7'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_8 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_8'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_9 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_9'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DICTIONARY IF NOT EXISTS test_max_num_to_warn_02931.test_max_num_to_warn_dict_10 (id Int32, str String) PRIMARY KEY id
SOURCE(CLICKHOUSE(DB 'test_max_num_to_warn_02931' TABLE 'test_max_num_to_warn_10'))LAYOUT(FLAT()) LIFETIME(MIN 0 MAX 1000);
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_1;
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_2;
CREATE DATABASE IF NOT EXISTS test_max_num_to_warn_3;
@ -37,7 +70,13 @@ INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_9 VALUES (1, 'Hello'
INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_10 VALUES (1, 'Hello');
INSERT INTO test_max_num_to_warn_02931.test_max_num_to_warn_11 VALUES (1, 'Hello');
SELECT * FROM system.warnings where message in ('The number of attached tables is more than 5', 'The number of attached databases is more than 2', 'The number of active parts is more than 10');
SELECT * FROM system.warnings where message in (
'The number of attached tables is more than 5',
'The number of attached views is more than 5',
'The number of attached dictionaries is more than 5',
'The number of attached databases is more than 2',
'The number of active parts is more than 10'
);
DROP DATABASE IF EXISTS test_max_num_to_warn_02931;
DROP DATABASE IF EXISTS test_max_num_to_warn_1;

View File

@ -0,0 +1,11 @@
<-- Read UNIX endings -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1
<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->
Akiba_Hebrew_Academy 2017-08-01 241
Aegithina_tiphia 2018-02-01 34
1971-72_Utah_Stars_season 2016-10-01 1

View File

@ -0,0 +1,29 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Data preparation step
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
UNIX_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_without_crlf.tsv"
DOS_ENDINGS="${CLICKHOUSE_TEST_UNIQUE_NAME}_data_with_crlf.tsv"
DATA_FILE_UNIX_ENDINGS="${USER_FILES_PATH:?}/${UNIX_ENDINGS}"
DATA_FILE_DOS_ENDINGS="${USER_FILES_PATH:?}/${DOS_ENDINGS}"
touch $DATA_FILE_UNIX_ENDINGS
touch $DATA_FILE_DOS_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\nAegithina_tiphia\t2018-02-01\t34\n1971-72_Utah_Stars_season\t2016-10-01\t1\n" > $DATA_FILE_UNIX_ENDINGS
echo -ne "Akiba_Hebrew_Academy\t2017-08-01\t241\r\nAegithina_tiphia\t2018-02-01\t34\r\n1971-72_Utah_Stars_season\t2016-10-01\t1\r\n" > $DATA_FILE_DOS_ENDINGS
echo -e "<-- Read UNIX endings -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${UNIX_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32');"
$CLICKHOUSE_CLIENT --multiquery --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32'); --{serverError 117}"
echo -e "\n<-- Read DOS endings with setting input_format_tsv_crlf_end_of_line=1 -->\n"
$CLICKHOUSE_CLIENT --query "SELECT * FROM file(${DOS_ENDINGS}, 'TabSeparated', 'SearchTerm String, Date Date, Hits UInt32') SETTINGS input_format_tsv_crlf_end_of_line = 1;"
# Test teardown
rm $DATA_FILE_UNIX_ENDINGS
rm $DATA_FILE_DOS_ENDINGS

View File

@ -1,6 +1,6 @@
set allow_suspicious_primary_key = 0;
DROP TABLE IF EXISTS data;
drop table if exists data;
create table data (key Int, value AggregateFunction(sum, UInt64)) engine=AggregatingMergeTree() order by (key, value); -- { serverError DATA_TYPE_CANNOT_BE_USED_IN_KEY }
create table data (key Int, value SimpleAggregateFunction(sum, UInt64)) engine=AggregatingMergeTree() order by (key, value); -- { serverError DATA_TYPE_CANNOT_BE_USED_IN_KEY }
@ -12,7 +12,22 @@ create table data (key Int, value AggregateFunction(sum, UInt64)) engine=Aggrega
create table data (key Int, value SimpleAggregateFunction(sum, UInt64)) engine=AggregatingMergeTree() primary key value order by (value, key); -- { serverError DATA_TYPE_CANNOT_BE_USED_IN_KEY }
set allow_suspicious_primary_key = 1;
create table data (key Int, value SimpleAggregateFunction(sum, UInt64)) engine=AggregatingMergeTree() primary key value order by (value, key);
DROP TABLE data;
-- ATTACH should work regardless allow_suspicious_primary_key
set allow_suspicious_primary_key = 0;
detach table data;
attach table data;
drop table data;
-- ALTER AggregatingMergeTree
create table data (key Int) engine=AggregatingMergeTree() order by (key);
alter table data add column value SimpleAggregateFunction(sum, UInt64), modify order by (key, value); -- { serverError DATA_TYPE_CANNOT_BE_USED_IN_KEY }
alter table data add column value SimpleAggregateFunction(sum, UInt64), modify order by (key, value) settings allow_suspicious_primary_key=1;
drop table data;
-- ALTER ReplicatedAggregatingMergeTree
create table data_rep (key Int) engine=ReplicatedAggregatingMergeTree('/tables/{database}', 'r1') order by (key);
alter table data_rep add column value SimpleAggregateFunction(sum, UInt64), modify order by (key, value); -- { serverError DATA_TYPE_CANNOT_BE_USED_IN_KEY }
alter table data_rep add column value SimpleAggregateFunction(sum, UInt64), modify order by (key, value) settings allow_suspicious_primary_key=1;
drop table data_rep;

View File

@ -0,0 +1,2 @@
Disabled 11338881281426660955 14765404159170880511
Enabled 11338881281426660955 14765404159170880511

View File

@ -0,0 +1,23 @@
-- Tags: no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS account_test;
CREATE TABLE account_test
(
`id` UInt64,
`row_ver` UInt64,
)
ENGINE = ReplacingMergeTree(row_ver)
ORDER BY id
SETTINGS index_granularity = 16, index_granularity_bytes = 0,
min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0,
min_rows_for_compact_part = 0, min_bytes_for_compact_part = 0;
SYSTEM STOP MERGES account_test;
INSERT INTO account_test VALUES (11338881281426660955,717769962224129342),(12484100559155738267,7950971667203174918),(7603729260199571867,3255798127676911942),(7023543111808724827,911615979861855126),(10293135086416484571,3264379259750736572),(15561193439904316763,8419819469587131454),(17632407413882870235,7252071832370181502),(17009726455991851227,7525297506591593939),(12392078953873778779,8473049173389293961),(15283366022689446555,11692491360262171467),(9087459014730986523,2783662960221838603),(293823584550906267,4847630088179732782),(15693186194430465755,8163804880526285623),(7353080168325584795,17315892478487497859),(5980311238303466523,6943353798059390089),(14242621660019578011,8684624667957352769),(8241843507567433563,15731952080102886438);
INSERT INTO account_test VALUES (11338881281426660955, 14765404159170880511);
SELECT 'Disabled', * FROM account_test FINAL WHERE id = 11338881281426660955 SETTINGS split_parts_ranges_into_intersecting_and_non_intersecting_final = 0;
SELECT 'Enabled', * FROM account_test FINAL WHERE id = 11338881281426660955 SETTINGS split_parts_ranges_into_intersecting_and_non_intersecting_final = 1;

View File

@ -0,0 +1,2 @@
Hello, World! From client.
Hello, World! From local.

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_client.gz
${CLICKHOUSE_CLIENT} --query "SELECT * FROM (SELECT 'Hello, World! From client.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_client"
[ -e "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz ] && rm "${CLICKHOUSE_TMP}"/test_compression_of_output_file_from_local.gz
${CLICKHOUSE_LOCAL} --query "SELECT * FROM (SELECT 'Hello, World! From local.')" > ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
gunzip ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local.gz
cat ${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local
rm -f "${CLICKHOUSE_TMP}/test_compression_of_output_file_from_local"

View File

@ -0,0 +1 @@
4 3

View File

@ -0,0 +1,13 @@
CREATE TABLE test
(
foo String,
bar String,
)
ENGINE = MergeTree()
ORDER BY (foo, bar);
INSERT INTO test VALUES ('foo', 'bar1');
SELECT COLUMNS(bar, foo) APPLY (length) FROM test;
SELECT COLUMNS(bar, foo, xyz) APPLY (length) FROM test; -- { serverError UNKNOWN_IDENTIFIER }

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS base_table;
DROP TABLE IF EXISTS target_table;
DROP TABLE IF EXISTS mv_from_base_to_target;
DROP TABLE IF EXISTS mv_with_storage;
DROP TABLE IF EXISTS other_table_1;
DROP TABLE IF EXISTS other_table_2;
CREATE TABLE base_table (date DateTime, id String, cost Float64) ENGINE = MergeTree() ORDER BY date;
CREATE TABLE target_table (id String, total AggregateFunction(sum, Float64)) ENGINE = MergeTree() ORDER BY id;
CREATE MATERIALIZED VIEW mv_from_base_to_target TO target_table AS Select id, sumState(cost) FROM base_table GROUP BY id;
CREATE MATERIALIZED VIEW mv_with_storage ENGINE=MergeTree() ORDER BY id AS Select id, sumState(cost) FROM base_table GROUP BY id;
CREATE TABLE other_table_1 AS mv_with_storage;
CREATE TABLE other_table_2 AS mv_from_base_to_target; -- { serverError INCORRECT_QUERY }