Merge branch 'master' into rs/bump-zstd

This commit is contained in:
Robert Schulze 2023-04-14 09:53:47 +02:00 committed by GitHub
commit caf8690f58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 611 additions and 267 deletions

View File

@ -118,9 +118,11 @@ jobs:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
SonarCloud:
# TODO: Remove if: whenever SonarCloud supports c++23
if: ${{ false }}
runs-on: [self-hosted, builder]
env:
SONAR_SCANNER_VERSION: 4.7.0.2747
SONAR_SCANNER_VERSION: 4.8.0.2856
SONAR_SERVER_URL: "https://sonarcloud.io"
BUILD_WRAPPER_OUT_DIR: build_wrapper_output_directory # Directory where build-wrapper output will be placed
CC: clang-15
@ -173,4 +175,4 @@ jobs:
--define sonar.cfamily.build-wrapper-output="${{ env.BUILD_WRAPPER_OUT_DIR }}" \
--define sonar.projectKey="ClickHouse_ClickHouse" \
--define sonar.organization="clickhouse-java" \
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql"
--define sonar.exclusions="**/*.java,**/*.ts,**/*.js,**/*.css,**/*.sql" \

View File

@ -6,7 +6,7 @@ sidebar_label: Strings
# Functions for Working with Strings
:::note
:::note
Functions for [searching](../../sql-reference/functions/string-search-functions.md) and [replacing](../../sql-reference/functions/string-replace-functions.md) in strings are described separately.
:::
@ -1193,6 +1193,42 @@ Result:
```
## concatWithSeparatorAssumeInjective
Same as concatWithSeparator, the difference is that you need to ensure that concatWithSeparator(sep, expr1, expr2, expr3...) → result is injective, it will be used for optimization of GROUP BY.
The function is named “injective” if it always returns different result for different values of arguments. In other words: different arguments never yield identical result.
## soundex
Returns the [Soundex code](https://en.wikipedia.org/wiki/Soundex) of a string.
**Syntax**
``` sql
soundex(val)
```
**Arguments**
- `val` - Input value. [String](../data-types/string.md)
**Returned value**
- The Soundex code of the input value. [String](../data-types/string.md)
**Example**
Query:
``` sql
select soundex('aksel');
```
Result:
``` text
┌─soundex('aksel')─┐
│ A240 │
└──────────────────┘
```

View File

@ -168,3 +168,15 @@ SELECT format('{} {}', 'Hello', 'World')
## trimBoth(s) {#trimboths}
返回一个字符串,用于删除任一侧的空白字符。
## soundex(s)
返回一个字符串的soundex值。输出类型是FixedString示例如下
``` sql
select soundex('aksql');
┌─soundex('aksel')─┐
│ A240 │
└──────────────────┘
```

View File

@ -1192,12 +1192,12 @@ try
{
Settings::checkNoSettingNamesAtTopLevel(*config, config_path);
ServerSettings server_settings;
server_settings.loadSettingsFromConfig(*config);
ServerSettings server_settings_;
server_settings_.loadSettingsFromConfig(*config);
size_t max_server_memory_usage = server_settings.max_server_memory_usage;
size_t max_server_memory_usage = server_settings_.max_server_memory_usage;
double max_server_memory_usage_to_ram_ratio = server_settings.max_server_memory_usage_to_ram_ratio;
double max_server_memory_usage_to_ram_ratio = server_settings_.max_server_memory_usage_to_ram_ratio;
size_t default_max_server_memory_usage = static_cast<size_t>(memory_amount * max_server_memory_usage_to_ram_ratio);
if (max_server_memory_usage == 0)
@ -1225,7 +1225,7 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings.allow_use_jemalloc_memory);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
@ -1243,23 +1243,23 @@ try
global_context->setRemoteHostFilter(*config);
global_context->setMaxTableSizeToDrop(server_settings.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(server_settings.max_partition_size_to_drop);
global_context->setMaxTableSizeToDrop(server_settings_.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(server_settings_.max_partition_size_to_drop);
ConcurrencyControl::SlotCount concurrent_threads_soft_limit = ConcurrencyControl::Unlimited;
if (server_settings.concurrent_threads_soft_limit_num > 0 && server_settings.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = server_settings.concurrent_threads_soft_limit_num;
if (server_settings.concurrent_threads_soft_limit_ratio_to_cores > 0)
if (server_settings_.concurrent_threads_soft_limit_num > 0 && server_settings_.concurrent_threads_soft_limit_num < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = server_settings_.concurrent_threads_soft_limit_num;
if (server_settings_.concurrent_threads_soft_limit_ratio_to_cores > 0)
{
auto value = server_settings.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
auto value = server_settings_.concurrent_threads_soft_limit_ratio_to_cores * std::thread::hardware_concurrency();
if (value > 0 && value < concurrent_threads_soft_limit)
concurrent_threads_soft_limit = value;
}
ConcurrencyControl::instance().setMaxConcurrency(concurrent_threads_soft_limit);
global_context->getProcessList().setMaxSize(server_settings.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(server_settings.max_concurrent_insert_queries);
global_context->getProcessList().setMaxSelectQueriesAmount(server_settings.max_concurrent_select_queries);
global_context->getProcessList().setMaxSize(server_settings_.max_concurrent_queries);
global_context->getProcessList().setMaxInsertQueriesAmount(server_settings_.max_concurrent_insert_queries);
global_context->getProcessList().setMaxSelectQueriesAmount(server_settings_.max_concurrent_select_queries);
if (config->has("keeper_server"))
global_context->updateKeeperConfiguration(*config);
@ -1270,34 +1270,34 @@ try
/// This is done for backward compatibility.
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings.background_pool_size;
auto new_ratio = server_settings.background_merges_mutations_concurrency_ratio;
auto new_pool_size = server_settings_.background_pool_size;
auto new_ratio = server_settings_.background_merges_mutations_concurrency_ratio;
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, static_cast<size_t>(new_pool_size * new_ratio));
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings.background_merges_mutations_scheduling_policy.toString());
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(server_settings_.background_merges_mutations_scheduling_policy.toString());
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings.background_move_pool_size;
auto new_pool_size = server_settings_.background_move_pool_size;
global_context->getMovesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings.background_fetches_pool_size;
auto new_pool_size = server_settings_.background_fetches_pool_size;
global_context->getFetchesExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
if (global_context->areBackgroundExecutorsInitialized())
{
auto new_pool_size = server_settings.background_common_pool_size;
auto new_pool_size = server_settings_.background_common_pool_size;
global_context->getCommonExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size);
}
global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings.background_buffer_flush_schedule_pool_size);
global_context->getSchedulePool().increaseThreadsCount(server_settings.background_schedule_pool_size);
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings.background_message_broker_schedule_pool_size);
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings.background_distributed_schedule_pool_size);
global_context->getBufferFlushSchedulePool().increaseThreadsCount(server_settings_.background_buffer_flush_schedule_pool_size);
global_context->getSchedulePool().increaseThreadsCount(server_settings_.background_schedule_pool_size);
global_context->getMessageBrokerSchedulePool().increaseThreadsCount(server_settings_.background_message_broker_schedule_pool_size);
global_context->getDistributedSchedulePool().increaseThreadsCount(server_settings_.background_distributed_schedule_pool_size);
if (config->has("resources"))
{

View File

@ -97,8 +97,8 @@ public:
void write(WriteBuffer & wb) const
{
writeBinary(key, wb);
writeVarUIntOverflow(count, wb);
writeVarUIntOverflow(error, wb);
writeVarUInt(count, wb);
writeVarUInt(error, wb);
}
void read(ReadBuffer & rb)

View File

@ -177,7 +177,7 @@ void TablesLoader::removeUnresolvableDependencies()
}
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool_)
{
/// Compatibility setting which should be enabled by default on attach
/// Otherwise server will be unable to start for some old-format of IPv6/IPv4 types of columns
@ -189,12 +189,12 @@ void TablesLoader::loadTablesInTopologicalOrder(ThreadPool & pool)
for (size_t level = 0; level != tables_to_load.size(); ++level)
{
startLoadingTables(pool, load_context, tables_to_load[level], level);
pool.wait();
startLoadingTables(pool_, load_context, tables_to_load[level], level);
pool_.wait();
}
}
void TablesLoader::startLoadingTables(ThreadPool & pool, ContextMutablePtr load_context, const std::vector<StorageID> & tables_to_load, size_t level)
void TablesLoader::startLoadingTables(ThreadPool & pool_, ContextMutablePtr load_context, const std::vector<StorageID> & tables_to_load, size_t level)
{
size_t total_tables = metadata.parsed_tables.size();
@ -202,7 +202,7 @@ void TablesLoader::startLoadingTables(ThreadPool & pool, ContextMutablePtr load_
for (const auto & table_id : tables_to_load)
{
pool.scheduleOrThrowOnError([this, load_context, total_tables, table_name = table_id.getQualifiedName()]()
pool_.scheduleOrThrowOnError([this, load_context, total_tables, table_name = table_id.getQualifiedName()]()
{
const auto & path_and_query = metadata.parsed_tables[table_name];
databases[table_name.database]->loadTableFromMetadata(load_context, path_and_query.path, table_name, path_and_query.ast, strictness_mode);

View File

@ -16,9 +16,6 @@
#include <Interpreters/IExternalLoadable.h>
/// Clang mistakenly warns about the names in enum class.
#pragma clang diagnostic ignored "-Wshadow"
namespace DB
{
using TypeIndexUnderlying = magic_enum::underlying_type_t<TypeIndex>;

View File

@ -139,13 +139,13 @@ private:
void threadWorker(size_t shard)
{
Block block;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder_;
auto & shard_queue = *shards_queues[shard];
while (shard_queue.pop(block))
{
Stopwatch watch;
dictionary.blockToAttributes(block, arena_holder, shard);
dictionary.blockToAttributes(block, arena_holder_, shard);
UInt64 elapsed_ms = watch.elapsedMilliseconds();
if (elapsed_ms > 1'000)
LOG_TRACE(dictionary.log, "Block processing for shard #{} is slow {}ms (rows {}).", shard, elapsed_ms, block.rows());

View File

@ -1227,7 +1227,7 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
DictionarySourceCoordinator::ReadColumnsFunc read_keys_func = [dictionary_copy = dictionary](
const Strings & attribute_names,
const DataTypes & result_types,
const Columns & key_columns,
const Columns & key_columns_,
const DataTypes,
const Columns &)
{
@ -1238,15 +1238,15 @@ Pipe RangeHashedDictionary<dictionary_key_type>::read(const Names & column_names
Columns result;
result.reserve(attribute_names_size);
const ColumnPtr & key_column = key_columns.back();
const ColumnPtr & key_column = key_columns_.back();
const auto * key_to_index_column = typeid_cast<const ColumnUInt64 *>(key_column.get());
if (!key_to_index_column)
const auto * key_to_index_column_ = typeid_cast<const ColumnUInt64 *>(key_column.get());
if (!key_to_index_column_)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Dictionary {} read expect indexes column with type UInt64",
range_dictionary_ptr->getFullName());
const auto & data = key_to_index_column->getData();
const auto & data = key_to_index_column_->getData();
for (size_t i = 0; i < attribute_names_size; ++i)
{

View File

@ -198,22 +198,22 @@ void RegExpTreeDictionary::initRegexNodes(Block & block)
Array keys = (*keys_column)[i].safeGet<Array>();
Array values = (*values_column)[i].safeGet<Array>();
size_t keys_size = keys.size();
for (size_t i = 0; i < keys_size; i++)
for (size_t j = 0; j < keys_size; j++)
{
const String & name = keys[i].safeGet<String>();
const String & value = values[i].safeGet<String>();
if (structure.hasAttribute(name))
const String & name_ = keys[j].safeGet<String>();
const String & value = values[j].safeGet<String>();
if (structure.hasAttribute(name_))
{
const auto & attr = structure.getAttribute(name);
const auto & attr = structure.getAttribute(name_);
auto string_pieces = createStringPieces(value, num_captures, regex, logger);
if (!string_pieces.empty())
{
node->attributes[name] = RegexTreeNode::AttributeValue{.field = values[i], .pieces = std::move(string_pieces)};
node->attributes[name_] = RegexTreeNode::AttributeValue{.field = values[j], .pieces = std::move(string_pieces)};
}
else
{
Field field = parseStringToField(values[i].safeGet<String>(), attr.type);
node->attributes[name] = RegexTreeNode::AttributeValue{.field = std::move(field)};
Field field = parseStringToField(values[j].safeGet<String>(), attr.type);
node->attributes[name_] = RegexTreeNode::AttributeValue{.field = std::move(field)};
}
}
}
@ -424,23 +424,23 @@ bool RegExpTreeDictionary::setAttributes(
return attributes_to_set.size() == attributes.size();
visited_nodes.emplace(id);
const auto & node_attributes = regex_nodes.at(id)->attributes;
for (const auto & [name, value] : node_attributes)
for (const auto & [name_, value] : node_attributes)
{
if (!attributes.contains(name) || attributes_to_set.contains(name))
if (!attributes.contains(name_) || attributes_to_set.contains(name_))
continue;
if (value.containsBackRefs())
{
auto [updated_str, use_default] = processBackRefs(data, regex_nodes.at(id)->searcher, value.pieces);
if (use_default)
{
DefaultValueProvider default_value(attributes.at(name).null_value, defaults.at(name));
attributes_to_set[name] = default_value.getDefaultValue(key_index);
DefaultValueProvider default_value(attributes.at(name_).null_value, defaults.at(name_));
attributes_to_set[name_] = default_value.getDefaultValue(key_index);
}
else
attributes_to_set[name] = parseStringToField(updated_str, attributes.at(name).type);
attributes_to_set[name_] = parseStringToField(updated_str, attributes.at(name_).type);
}
else
attributes_to_set[name] = value.field;
attributes_to_set[name_] = value.field;
}
auto parent_id = regex_nodes.at(id)->parent_id;
@ -541,11 +541,11 @@ std::unordered_map<String, ColumnPtr> RegExpTreeDictionary::match(
std::unordered_map<String, MutableColumnPtr> columns;
/// initialize columns
for (const auto & [name, attr] : attributes)
for (const auto & [name_, attr] : attributes)
{
auto col_ptr = attr.type->createColumn();
col_ptr->reserve(keys_offsets.size());
columns[name] = std::move(col_ptr);
columns[name_] = std::move(col_ptr);
}
UInt64 offset = 0;
@ -628,25 +628,25 @@ std::unordered_map<String, ColumnPtr> RegExpTreeDictionary::match(
break;
}
for (const auto & [name, attr] : attributes)
for (const auto & [name_, attr] : attributes)
{
if (attributes_to_set.contains(name))
if (attributes_to_set.contains(name_))
continue;
DefaultValueProvider default_value(attr.null_value, defaults.at(name));
columns[name]->insert(default_value.getDefaultValue(key_idx));
DefaultValueProvider default_value(attr.null_value, defaults.at(name_));
columns[name_]->insert(default_value.getDefaultValue(key_idx));
}
/// insert to columns
for (const auto & [name, value] : attributes_to_set)
columns[name]->insert(value);
for (const auto & [name_, value] : attributes_to_set)
columns[name_]->insert(value);
offset = key_offset;
}
std::unordered_map<String, ColumnPtr> result;
for (auto & [name, mutable_ptr] : columns)
result.emplace(name, std::move(mutable_ptr));
for (auto & [name_, mutable_ptr] : columns)
result.emplace(name_, std::move(mutable_ptr));
return result;
}
@ -684,8 +684,8 @@ Columns RegExpTreeDictionary::getColumns(
defaults);
Columns result;
for (const String & name : attribute_names)
result.push_back(columns_map.at(name));
for (const String & name_ : attribute_names)
result.push_back(columns_map.at(name_));
return result;
}

View File

@ -229,23 +229,23 @@ void parseMatchNode(UInt64 parent_id, UInt64 & id, const YAML::Node & node, Resu
{
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Yaml match rule must contain key {}", key_name);
}
for (const auto & [key, node] : match)
for (const auto & [key, node_] : match)
{
if (key == key_name)
{
if (!node.IsScalar())
if (!node_.IsScalar())
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "`{}` should be a String", key_name);
attributes_to_insert.reg_exp = node.as<String>();
attributes_to_insert.reg_exp = node_.as<String>();
}
else if (structure.hasAttribute(key))
{
attributes_to_insert.keys.push_back(key);
attributes_to_insert.values.push_back(node.as<String>());
attributes_to_insert.values.push_back(node_.as<String>());
}
else if (node.IsSequence())
else if (node_.IsSequence())
{
parseMatchList(attributes_to_insert.id, id, node, result, key_name, structure);
parseMatchList(attributes_to_insert.id, id, node_, result, key_name, structure);
}
/// unknown attributes.
}

View File

@ -617,6 +617,10 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
continue_predownload = false;
}
}
else
{
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
if (!continue_predownload)
{
@ -636,8 +640,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
/// TODO: allow seek more than once with seek avoiding.
bytes_to_predownload = 0;
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TEST(log, "Bypassing cache because for {}", file_segment->getInfoForLog());
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;

View File

@ -68,8 +68,7 @@ public:
String timezone;
if (arguments.size() == 2)
{
if (arguments[1].column)
timezone = extractTimeZoneNameFromColumn(*arguments[1].column);
timezone = extractTimeZoneNameFromColumn(arguments[1].column.get(), arguments[1].name);
if (timezone.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,

View File

@ -1068,11 +1068,11 @@ public:
FunctionDictGetDescendantsExecutable(
String name_,
size_t level_,
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index,
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index_,
std::shared_ptr<FunctionDictHelper> dictionary_helper_)
: name(std::move(name_))
, level(level_)
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index))
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index_))
, dictionary_helper(std::move(dictionary_helper_))
{}
@ -1110,13 +1110,13 @@ public:
const DataTypes & argument_types_,
const DataTypePtr & result_type_,
size_t level_,
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index,
DictionaryHierarchicalParentToChildIndexPtr hierarchical_parent_to_child_index_,
std::shared_ptr<FunctionDictHelper> helper_)
: name(std::move(name_))
, argument_types(argument_types_)
, result_type(result_type_)
, level(level_)
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index))
, hierarchical_parent_to_child_index(std::move(hierarchical_parent_to_child_index_))
, helper(std::move(helper_))
{}

View File

@ -245,7 +245,8 @@ private:
{
if (additional_argument_index < arguments.size())
{
time_zone = extractTimeZoneNameFromColumn(*arguments[additional_argument_index].column);
time_zone = extractTimeZoneNameFromColumn(arguments[additional_argument_index].column.get(),
arguments[additional_argument_index].name);
++additional_argument_index;
}
}

View File

@ -17,14 +17,14 @@ namespace ErrorCodes
}
std::string extractTimeZoneNameFromColumn(const IColumn & column)
std::string extractTimeZoneNameFromColumn(const IColumn * column, const String & column_name)
{
const ColumnConst * time_zone_column = checkAndGetColumnConst<ColumnString>(&column);
const ColumnConst * time_zone_column = checkAndGetColumnConst<ColumnString>(column);
if (!time_zone_column)
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of time zone argument of function, must be constant string",
column.getName());
"Illegal column {} of time zone argument of function, must be a constant string",
column_name);
return time_zone_column->getValue<String>();
}
@ -33,9 +33,9 @@ std::string extractTimeZoneNameFromColumn(const IColumn & column)
std::string extractTimeZoneNameFromFunctionArguments(const ColumnsWithTypeAndName & arguments, size_t time_zone_arg_num, size_t datetime_arg_num)
{
/// Explicit time zone may be passed in last argument.
if (arguments.size() == time_zone_arg_num + 1 && arguments[time_zone_arg_num].column)
if (arguments.size() == time_zone_arg_num + 1)
{
return extractTimeZoneNameFromColumn(*arguments[time_zone_arg_num].column);
return extractTimeZoneNameFromColumn(arguments[time_zone_arg_num].column.get(), arguments[time_zone_arg_num].name);
}
else
{
@ -57,7 +57,7 @@ const DateLUTImpl & extractTimeZoneFromFunctionArguments(const ColumnsWithTypeAn
{
if (arguments.size() == time_zone_arg_num + 1)
{
std::string time_zone = extractTimeZoneNameFromColumn(*arguments[time_zone_arg_num].column);
std::string time_zone = extractTimeZoneNameFromColumn(arguments[time_zone_arg_num].column.get(), arguments[time_zone_arg_num].name);
if (time_zone.empty())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Provided time zone must be non-empty and be a valid time zone");
return DateLUT::instance(time_zone);

View File

@ -12,7 +12,7 @@ namespace DB
class Block;
std::string extractTimeZoneNameFromColumn(const IColumn & column);
std::string extractTimeZoneNameFromColumn(const IColumn * column, const String & column_name);
/// Determine working timezone either from optional argument with time zone name or from time zone in DateTime type of argument.
/// Returns empty string if default time zone should be used.

119
src/Functions/soundex.cpp Normal file
View File

@ -0,0 +1,119 @@
#include <cctype>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Common/StringUtils/StringUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
/** Soundex algorithm, https://en.wikipedia.org/wiki/Soundex
* Implemented similarly as in most SQL dialects:
* 1. Save the first letter. Map all occurrences of a, e, i, o, u, y, h, w. to zero(0)
* 2. Replace all consonants (include the first letter) with digits as follows:
* - b, f, p, v 1
* - c, g, j, k, q, s, x, z 2
* - d, t 3
* - l 4
* - m, n 5
* - r 6
* 3. Replace all adjacent same digits with one digit, and then remove all the zero (0) digits
* 4. If the saved letter's digit is the same as the resulting first digit, remove the digit (keep the letter).
* 5. Append 3 zeros if result contains less than 3 digits. Remove all except first letter and 3 digits after it.
*/
struct SoundexImpl
{
static constexpr auto length = 4z;
static constexpr auto soundex_map = "01230120022455012623010202";
static void calculate(const char * value, size_t value_length, char * out)
{
const char * cur = value;
const char * const end = value + value_length;
char * const out_end = out + length;
while (cur < end && !isAlphaASCII(*cur))
++cur;
char prev_code = '0';
if (cur < end)
{
*out = toUpperIfAlphaASCII(*cur);
++out;
prev_code = soundex_map[toUpperIfAlphaASCII(*cur) - 'A'];
++cur;
}
while (cur < end && !isAlphaASCII(*cur))
++cur;
while (cur < end && out < out_end)
{
char current_code = soundex_map[toUpperIfAlphaASCII(*cur) - 'A'];
if ((current_code != '0') && (current_code != prev_code))
{
*out = current_code;
++out;
}
prev_code = current_code;
++cur;
while (cur < end && !isAlphaASCII(*cur))
++cur;
}
while (out < out_end)
{
*out = '0';
++out;
}
}
static void vector(
const ColumnString::Chars & data,
const ColumnString::Offsets & offsets,
ColumnString::Chars & res_data,
ColumnString::Offsets & res_offsets)
{
const size_t size = offsets.size();
res_data.resize(size * (length + 1));
res_offsets.resize(size);
size_t prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
const char * value = reinterpret_cast<const char *>(&data[prev_offset]);
const size_t value_length = offsets[i] - prev_offset - 1;
const size_t out_index = i * (length + 1);
calculate(value, value_length, reinterpret_cast<char *>(&res_data[out_index]));
res_data[out_index + length] = '\0';
res_offsets[i] = (out_index + length + 1);
prev_offset = offsets[i];
}
}
[[noreturn]] static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Column of type FixedString is not supported by soundex function");
}
};
struct NameSoundex
{
static constexpr auto name = "soundex";
};
REGISTER_FUNCTION(Soundex)
{
factory.registerFunction<FunctionStringToString<SoundexImpl, NameSoundex>>(
Documentation{"Returns Soundex code of a string."}, FunctionFactory::CaseInsensitive);
}
}

View File

@ -8,7 +8,6 @@
#include <IO/WriteHelpers.h>
#include <Common/assert_cast.h>
namespace DB
{
namespace ErrorCodes
@ -100,6 +99,7 @@ public:
"Should be DateTime or DateTime64", arguments[0].type->getName(), getName());
String time_zone_name = extractTimeZoneNameFromFunctionArguments(arguments, 1, 0);
if (which_type.isDateTime())
return std::make_shared<DataTypeDateTime>(time_zone_name);

View File

@ -28,13 +28,13 @@ void ProgressValues::read(ReadBuffer & in, UInt64 server_revision)
void ProgressValues::write(WriteBuffer & out, UInt64 client_revision) const
{
writeVarUIntOverflow(read_rows, out);
writeVarUIntOverflow(read_bytes, out);
writeVarUIntOverflow(total_rows_to_read, out);
writeVarUInt(read_rows, out);
writeVarUInt(read_bytes, out);
writeVarUInt(total_rows_to_read, out);
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO)
{
writeVarUIntOverflow(written_rows, out);
writeVarUIntOverflow(written_bytes, out);
writeVarUInt(written_rows, out);
writeVarUInt(written_bytes, out);
}
if (client_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS)
{

View File

@ -12,6 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int BAD_ARGUMENTS;
}
@ -27,14 +28,6 @@ char * writeVarUInt(UInt64 x, char * ostr);
/// same limitation, others support the full 1<<64 range, e.g. clickhouse-driver (Python))
constexpr UInt64 VAR_UINT_MAX = (1ULL<<63) - 1;
/// Write UInt64 in variable length format (base128), limit the value to VAR_UINT_MAX if it exceed VAR_UINT_MAX (to bypass sanity check)
template <typename ...Args>
auto writeVarUIntOverflow(UInt64 x, Args && ... args)
{
return writeVarUInt(std::min(x, VAR_UINT_MAX), std::forward<Args>(args)...);
}
/// Read UInt64, written in variable length format (base128)
void readVarUInt(UInt64 & x, std::istream & istr);
void readVarUInt(UInt64 & x, ReadBuffer & istr);
@ -198,10 +191,20 @@ inline const char * readVarUInt(UInt64 & x, const char * istr, size_t size)
return istr;
}
[[noreturn]] inline void throwValueTooLargeForVarIntEncodingException(UInt64 x)
{
/// Under practical circumstances, we should virtually never end up here but AST Fuzzer manages to create superlarge input integers
/// which trigger this exception. Intentionally not throwing LOGICAL_ERROR or calling abort() or [ch]assert(false), so AST Fuzzer
/// can swallow the exception and continue to run.
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Value {} is too large for VarInt encoding", x);
}
inline void writeVarUInt(UInt64 x, WriteBuffer & ostr)
{
chassert(x <= VAR_UINT_MAX);
#ifndef NDEBUG
if (x > VAR_UINT_MAX)
throwValueTooLargeForVarIntEncodingException(x);
#endif
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;
@ -221,7 +224,10 @@ inline void writeVarUInt(UInt64 x, WriteBuffer & ostr)
inline void writeVarUInt(UInt64 x, std::ostream & ostr)
{
chassert(x <= VAR_UINT_MAX);
#ifndef NDEBUG
if (x > VAR_UINT_MAX)
throwValueTooLargeForVarIntEncodingException(x);
#endif
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;
@ -239,7 +245,10 @@ inline void writeVarUInt(UInt64 x, std::ostream & ostr)
inline char * writeVarUInt(UInt64 x, char * ostr)
{
chassert(x <= VAR_UINT_MAX);
#ifndef NDEBUG
if (x > VAR_UINT_MAX)
throwValueTooLargeForVarIntEncodingException(x);
#endif
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;

View File

@ -22,28 +22,28 @@ int main(int argc, char ** argv)
std::cout << std::hex << std::showbase << "Input: " << x << std::endl;
Poco::HexBinaryEncoder hex(std::cout);
std::cout << "writeVarUIntOverflow(std::ostream): 0x";
DB::writeVarUIntOverflow(x, hex);
std::cout << "writeVarUInt(std::ostream): 0x";
DB::writeVarUInt(x, hex);
std::cout << std::endl;
std::string s;
{
DB::WriteBufferFromString wb(s);
DB::writeVarUIntOverflow(x, wb);
DB::writeVarUInt(x, wb);
wb.next();
}
std::cout << "writeVarUIntOverflow(WriteBuffer): 0x";
std::cout << "writeVarUInt(WriteBuffer): 0x";
hex << s;
std::cout << std::endl;
s.clear();
s.resize(9);
s.resize(DB::writeVarUIntOverflow(x, s.data()) - s.data());
s.resize(DB::writeVarUInt(x, s.data()) - s.data());
std::cout << "writeVarUIntOverflow(char *): 0x";
std::cout << "writeVarUInt(char *): 0x";
hex << s;
std::cout << std::endl;
@ -52,7 +52,7 @@ int main(int argc, char ** argv)
DB::ReadBufferFromString rb(s);
DB::readVarUInt(y, rb);
std::cerr << "Input: " << x << ", readVarUInt(writeVarUIntOverflow()): " << y << std::endl;
std::cerr << "Input: " << x << ", readVarUInt(writeVarUInt()): " << y << std::endl;
return 0;
}

View File

@ -342,10 +342,15 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
}
}
static void appendElementsToLogSafe(
namespace
{
using TimePoint = std::chrono::time_point<std::chrono::system_clock>;
void appendElementsToLogSafe(
AsynchronousInsertLog & log,
std::vector<AsynchronousInsertLogElement> elements,
std::chrono::time_point<std::chrono::system_clock> flush_time,
TimePoint flush_time,
const String & flush_query_id,
const String & flush_exception)
try
@ -367,6 +372,8 @@ catch (...)
tryLogCurrentException("AsynchronousInsertQueue", "Failed to add elements to AsynchronousInsertLog");
}
}
// static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
try
@ -473,8 +480,26 @@ try
format->addBuffer(std::move(last_buffer));
auto insert_query_id = insert_context->getCurrentQueryId();
auto finish_entries = [&]
{
for (const auto & entry : data->entries)
{
if (!entry->isFinished())
entry->finish();
}
if (!log_elements.empty())
{
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
}
};
if (total_rows == 0)
{
finish_entries();
return;
}
try
{
@ -502,17 +527,7 @@ try
throw;
}
for (const auto & entry : data->entries)
{
if (!entry->isFinished())
entry->finish();
}
if (!log_elements.empty())
{
auto flush_time = std::chrono::system_clock::now();
appendElementsToLogSafe(*insert_log, std::move(log_elements), flush_time, insert_query_id, "");
}
finish_entries();
}
catch (const Exception & e)
{

View File

@ -1525,9 +1525,9 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression, const
asterisk = true;
}
else if (auto * function = (*expression)->as<ASTFunction>())
else if (auto * func = (*expression)->as<ASTFunction>())
{
if (use_structure_from_insertion_table_in_table_functions == 2 && findIdentifier(function))
if (use_structure_from_insertion_table_in_table_functions == 2 && findIdentifier(func))
{
use_columns_from_insert_query = false;
break;
@ -2074,9 +2074,9 @@ BackupsWorker & Context::getBackupsWorker() const
const bool allow_concurrent_restores = this->getConfigRef().getBool("backups.allow_concurrent_restores", true);
const auto & config = getConfigRef();
const auto & settings = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings.restore_threads);
const auto & settings_ = getSettingsRef();
UInt64 backup_threads = config.getUInt64("backup_threads", settings_.backup_threads);
UInt64 restore_threads = config.getUInt64("restore_threads", settings_.restore_threads);
if (!shared->backups_worker)
shared->backups_worker.emplace(backup_threads, restore_threads, allow_concurrent_backups, allow_concurrent_restores);
@ -4285,10 +4285,10 @@ ReadSettings Context::getReadSettings() const
ReadSettings Context::getBackupReadSettings() const
{
ReadSettings settings = getReadSettings();
settings.remote_throttler = getBackupsThrottler();
settings.local_throttler = getBackupsThrottler();
return settings;
ReadSettings settings_ = getReadSettings();
settings_.remote_throttler = getBackupsThrottler();
settings_.local_throttler = getBackupsThrottler();
return settings_;
}
WriteSettings Context::getWriteSettings() const
@ -4317,14 +4317,14 @@ std::shared_ptr<AsyncReadCounters> Context::getAsyncReadCounters() const
Context::ParallelReplicasMode Context::getParallelReplicasMode() const
{
const auto & settings = getSettingsRef();
const auto & settings_ = getSettingsRef();
using enum Context::ParallelReplicasMode;
if (!settings.parallel_replicas_custom_key.value.empty())
if (!settings_.parallel_replicas_custom_key.value.empty())
return CUSTOM_KEY;
if (settings.allow_experimental_parallel_reading_from_replicas
&& !settings.use_hedged_requests)
if (settings_.allow_experimental_parallel_reading_from_replicas
&& !settings_.use_hedged_requests)
return READ_TASKS;
return SAMPLE_KEY;
@ -4332,17 +4332,17 @@ Context::ParallelReplicasMode Context::getParallelReplicasMode() const
bool Context::canUseParallelReplicasOnInitiator() const
{
const auto & settings = getSettingsRef();
const auto & settings_ = getSettingsRef();
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& settings_.max_parallel_replicas > 1
&& !getClientInfo().collaborate_with_initiator;
}
bool Context::canUseParallelReplicasOnFollower() const
{
const auto & settings = getSettingsRef();
const auto & settings_ = getSettingsRef();
return getParallelReplicasMode() == ParallelReplicasMode::READ_TASKS
&& settings.max_parallel_replicas > 1
&& settings_.max_parallel_replicas > 1
&& getClientInfo().collaborate_with_initiator;
}

View File

@ -1866,8 +1866,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
{
ExpressionActionsChain::Step & step = *chain.steps.at(prewhere_step_num);
auto required_columns = prewhere_info->prewhere_actions->getRequiredColumnsNames();
NameSet required_source_columns(required_columns.begin(), required_columns.end());
auto required_columns_ = prewhere_info->prewhere_actions->getRequiredColumnsNames();
NameSet required_source_columns(required_columns_.begin(), required_columns_.end());
/// Add required columns to required output in order not to remove them after prewhere execution.
/// TODO: add sampling and final execution to common chain.
for (const auto & column : additional_required_columns_after_prewhere)

View File

@ -825,11 +825,11 @@ void InterpreterSystemQuery::dropDatabaseReplica(ASTSystemQuery & query)
if (query.replica.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Replica name is empty");
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query)
auto check_not_local_replica = [](const DatabaseReplicated * replicated, const ASTSystemQuery & query_)
{
if (!query.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query.replica_zk_path))
if (!query_.replica_zk_path.empty() && fs::path(replicated->getZooKeeperPath()) != fs::path(query_.replica_zk_path))
return;
if (replicated->getFullReplicaName() != query.replica)
if (replicated->getFullReplicaName() != query_.replica)
return;
throw Exception(ErrorCodes::TABLE_WAS_NOT_DROPPED, "There is a local database {}, which has the same path in ZooKeeper "

View File

@ -301,7 +301,7 @@ void JoinedTables::rewriteDistributedInAndJoins(ASTPtr & query)
}
}
std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & select_query)
std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & select_query_)
{
if (tables_with_columns.size() < 2)
return {};
@ -310,7 +310,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
const ASTTablesInSelectQueryElement * ast_join = select_query.join();
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();
/// TODO This syntax does not support specifying a database name.
@ -352,16 +352,16 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
if (!table_join->isSpecialStorage() &&
settings.enable_optimize_predicate_expression)
replaceJoinedTable(select_query);
replaceJoinedTable(select_query_);
return table_join;
}
void JoinedTables::reset(const ASTSelectQuery & select_query)
void JoinedTables::reset(const ASTSelectQuery & select_query_)
{
table_expressions = getTableExpressions(select_query);
left_table_expression = extractTableExpression(select_query, 0);
left_db_and_table = getDatabaseAndTable(select_query, 0);
table_expressions = getTableExpressions(select_query_);
left_table_expression = extractTableExpression(select_query_, 0);
left_db_and_table = getDatabaseAndTable(select_query_, 0);
}
}

View File

@ -15,23 +15,18 @@ ColumnPtr tryGetColumnFromBlock(const Block & block, const NameAndTypePair & req
if (!elem)
return nullptr;
DataTypePtr elem_type;
ColumnPtr elem_column;
auto elem_type = elem->type;
auto elem_column = elem->column->decompress();
if (requested_column.isSubcolumn())
{
auto subcolumn_name = requested_column.getSubcolumnName();
elem_type = elem->type->tryGetSubcolumnType(subcolumn_name);
elem_column = elem->type->tryGetSubcolumn(subcolumn_name, elem->column);
elem_column = elem_type->tryGetSubcolumn(subcolumn_name, elem_column);
elem_type = elem_type->tryGetSubcolumnType(subcolumn_name);
if (!elem_type || !elem_column)
return nullptr;
}
else
{
elem_type = elem->type;
elem_column = elem->column;
}
return castColumn({elem_column, elem_type, ""}, requested_column.type);
}

View File

@ -125,21 +125,19 @@ bool ParserUnionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return true;
}
static bool parseOperator(IParser::Pos & pos, const char * op, Expected & expected)
static bool parseOperator(IParser::Pos & pos, std::string_view op, Expected & expected)
{
if (isWordCharASCII(*op))
if (!op.empty() && isWordCharASCII(op.front()))
{
return ParserKeyword(op).ignore(pos, expected);
}
else
else if (op.length() == pos->size() && 0 == memcmp(op.data(), pos->begin, pos->size()))
{
if (strlen(op) == pos->size() && 0 == memcmp(op, pos->begin, pos->size()))
{
++pos;
return true;
}
return false;
++pos;
return true;
}
return false;
}
enum class SubqueryFunctionType
@ -778,14 +776,13 @@ protected:
int state = 0;
};
struct ParserExpressionImpl
{
static std::vector<std::pair<const char *, Operator>> operators_table;
static std::vector<std::pair<const char *, Operator>> unary_operators_table;
static const char * overlapping_operators_to_skip[];
static const std::vector<std::pair<std::string_view, Operator>> operators_table;
static const std::vector<std::pair<std::string_view, Operator>> unary_operators_table;
static const std::array<std::string_view, 1> overlapping_operators_to_skip;
static Operator finish_between_operator;
static const Operator finish_between_operator;
ParserCompoundIdentifier identifier_parser{false, true};
ParserNumber number_parser;
@ -813,7 +810,6 @@ struct ParserExpressionImpl
Action tryParseOperator(Layers & layers, IParser::Pos & pos, Expected & expected);
};
class ExpressionLayer : public Layer
{
public:
@ -2253,7 +2249,6 @@ bool ParseTimestampOperatorExpression(IParser::Pos & pos, ASTPtr & node, Expecte
return true;
}
bool ParserExpression::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto start = std::make_unique<ExpressionLayer>(false, allow_trailing_commas);
@ -2290,57 +2285,58 @@ bool ParserFunction::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
}
std::vector<std::pair<const char *, Operator>> ParserExpressionImpl::operators_table({
{"->", Operator("lambda", 1, 2, OperatorType::Lambda)},
{"?", Operator("", 2, 0, OperatorType::StartIf)},
{":", Operator("if", 3, 3, OperatorType::FinishIf)},
{"OR", Operator("or", 3, 2, OperatorType::Mergeable)},
{"AND", Operator("and", 4, 2, OperatorType::Mergeable)},
{"BETWEEN", Operator("", 6, 0, OperatorType::StartBetween)},
{"NOT BETWEEN", Operator("", 6, 0, OperatorType::StartNotBetween)},
{"==", Operator("equals", 8, 2, OperatorType::Comparison)},
{"!=", Operator("notEquals", 8, 2, OperatorType::Comparison)},
{"<>", Operator("notEquals", 8, 2, OperatorType::Comparison)},
{"<=", Operator("lessOrEquals", 8, 2, OperatorType::Comparison)},
{">=", Operator("greaterOrEquals", 8, 2, OperatorType::Comparison)},
{"<", Operator("less", 8, 2, OperatorType::Comparison)},
{">", Operator("greater", 8, 2, OperatorType::Comparison)},
{"=", Operator("equals", 8, 2, OperatorType::Comparison)},
{"LIKE", Operator("like", 8, 2)},
{"ILIKE", Operator("ilike", 8, 2)},
{"NOT LIKE", Operator("notLike", 8, 2)},
{"NOT ILIKE", Operator("notILike", 8, 2)},
{"REGEXP", Operator("match", 8, 2)},
{"IN", Operator("in", 8, 2)},
{"NOT IN", Operator("notIn", 8, 2)},
{"GLOBAL IN", Operator("globalIn", 8, 2)},
{"GLOBAL NOT IN", Operator("globalNotIn", 8, 2)},
{"||", Operator("concat", 9, 2, OperatorType::Mergeable)},
{"+", Operator("plus", 10, 2)},
{"-", Operator("minus", 10, 2)},
{"*", Operator("multiply", 11, 2)},
{"/", Operator("divide", 11, 2)},
{"%", Operator("modulo", 11, 2)},
{"MOD", Operator("modulo", 11, 2)},
{"DIV", Operator("intDiv", 11, 2)},
{".", Operator("tupleElement", 13, 2, OperatorType::TupleElement)},
{"[", Operator("arrayElement", 13, 2, OperatorType::ArrayElement)},
{"::", Operator("CAST", 13, 2, OperatorType::Cast)},
{"IS NULL", Operator("isNull", 13, 1, OperatorType::IsNull)},
{"IS NOT NULL", Operator("isNotNull", 13, 1, OperatorType::IsNull)},
});
std::vector<std::pair<const char *, Operator>> ParserExpressionImpl::unary_operators_table({
{"NOT", Operator("not", 5, 1)},
{"-", Operator("negate", 12, 1)}
});
Operator ParserExpressionImpl::finish_between_operator = Operator("", 7, 0, OperatorType::FinishBetween);
const char * ParserExpressionImpl::overlapping_operators_to_skip[] =
const std::vector<std::pair<std::string_view, Operator>> ParserExpressionImpl::operators_table
{
"IN PARTITION",
nullptr
{"->", Operator("lambda", 1, 2, OperatorType::Lambda)},
{"?", Operator("", 2, 0, OperatorType::StartIf)},
{":", Operator("if", 3, 3, OperatorType::FinishIf)},
{"OR", Operator("or", 3, 2, OperatorType::Mergeable)},
{"AND", Operator("and", 4, 2, OperatorType::Mergeable)},
{"BETWEEN", Operator("", 6, 0, OperatorType::StartBetween)},
{"NOT BETWEEN", Operator("", 6, 0, OperatorType::StartNotBetween)},
{"==", Operator("equals", 8, 2, OperatorType::Comparison)},
{"!=", Operator("notEquals", 8, 2, OperatorType::Comparison)},
{"<>", Operator("notEquals", 8, 2, OperatorType::Comparison)},
{"<=", Operator("lessOrEquals", 8, 2, OperatorType::Comparison)},
{">=", Operator("greaterOrEquals", 8, 2, OperatorType::Comparison)},
{"<", Operator("less", 8, 2, OperatorType::Comparison)},
{">", Operator("greater", 8, 2, OperatorType::Comparison)},
{"=", Operator("equals", 8, 2, OperatorType::Comparison)},
{"LIKE", Operator("like", 8, 2)},
{"ILIKE", Operator("ilike", 8, 2)},
{"NOT LIKE", Operator("notLike", 8, 2)},
{"NOT ILIKE", Operator("notILike", 8, 2)},
{"REGEXP", Operator("match", 8, 2)},
{"IN", Operator("in", 8, 2)},
{"NOT IN", Operator("notIn", 8, 2)},
{"GLOBAL IN", Operator("globalIn", 8, 2)},
{"GLOBAL NOT IN", Operator("globalNotIn", 8, 2)},
{"||", Operator("concat", 9, 2, OperatorType::Mergeable)},
{"+", Operator("plus", 10, 2)},
{"-", Operator("minus", 10, 2)},
{"*", Operator("multiply", 11, 2)},
{"/", Operator("divide", 11, 2)},
{"%", Operator("modulo", 11, 2)},
{"MOD", Operator("modulo", 11, 2)},
{"DIV", Operator("intDiv", 11, 2)},
{".", Operator("tupleElement", 13, 2, OperatorType::TupleElement)},
{"[", Operator("arrayElement", 13, 2, OperatorType::ArrayElement)},
{"::", Operator("CAST", 13, 2, OperatorType::Cast)},
{"IS NULL", Operator("isNull", 13, 1, OperatorType::IsNull)},
{"IS NOT NULL", Operator("isNotNull", 13, 1, OperatorType::IsNull)},
};
const std::vector<std::pair<std::string_view, Operator>> ParserExpressionImpl::unary_operators_table
{
{"NOT", Operator("not", 5, 1)},
{"-", Operator("negate", 12, 1)}
};
const Operator ParserExpressionImpl::finish_between_operator("", 7, 0, OperatorType::FinishBetween);
const std::array<std::string_view, 1> ParserExpressionImpl::overlapping_operators_to_skip
{
"IN PARTITION"
};
bool ParserExpressionImpl::parse(std::unique_ptr<Layer> start, IParser::Pos & pos, ASTPtr & node, Expected & expected)
@ -2585,8 +2581,8 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
///
/// 'IN PARTITION' here is not an 'IN' operator, so we should stop parsing immediately
Expected stub;
for (const char ** it = overlapping_operators_to_skip; *it; ++it)
if (ParserKeyword{*it}.checkWithoutMoving(pos, stub))
for (const auto & it : overlapping_operators_to_skip)
if (ParserKeyword{it}.checkWithoutMoving(pos, stub))
return Action::NONE;
/// Try to find operators from 'operators_table'

View File

@ -607,8 +607,8 @@ std::shared_ptr<DirectKeyValueJoin> tryDirectJoin(const std::shared_ptr<TableJoi
for (const auto & right_table_expression_column : right_table_expression_header)
{
const auto * table_column_name = right_table_expression_data.getColumnNameOrNull(right_table_expression_column.name);
if (!table_column_name)
const auto * table_column_name_ = right_table_expression_data.getColumnNameOrNull(right_table_expression_column.name);
if (!table_column_name_)
return {};
auto right_table_expression_column_with_storage_column_name = right_table_expression_column;

View File

@ -647,9 +647,9 @@ QueryPipelineBuilder StorageLiveView::completeQuery(Pipes pipes)
}
else
{
auto inner_blocks_query = getInnerBlocksQuery();
auto inner_blocks_query_ = getInnerBlocksQuery();
block_context->addExternalTable(getBlocksTableName(), std::move(blocks_storage_table_holder));
InterpreterSelectQuery interpreter(inner_blocks_query,
InterpreterSelectQuery interpreter(inner_blocks_query_,
block_context,
StoragePtr(),
nullptr,

View File

@ -179,8 +179,6 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// don't allow to reinitialize them, delete each of them immediately.
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
storage.clearOldWriteAheadLogs();
if (storage.getSettings()->merge_tree_enable_clear_old_broken_detached)
storage.clearOldBrokenPartsFromDetachedDirectory();
storage.createNewZooKeeperNodes();
storage.syncPinnedPartUUIDs();

View File

@ -79,7 +79,7 @@ protected:
if (block_size == 0)
return {};
UInt64 curr = state->counter.fetch_add(block_size, std::memory_order_acquire);
UInt64 curr = state->counter.fetch_add(block_size, std::memory_order_relaxed);
if (curr >= max_counter)
return {};

View File

@ -725,12 +725,12 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
if (!is_time_column_func_now)
{
auto query = select_query->clone();
auto query_ = select_query->clone();
DropTableIdentifierMatcher::Data drop_table_identifier_data;
DropTableIdentifierMatcher::Visitor(drop_table_identifier_data).visit(query);
DropTableIdentifierMatcher::Visitor(drop_table_identifier_data).visit(query_);
WindowFunctionMatcher::Data query_info_data;
WindowFunctionMatcher::Visitor(query_info_data).visit(query);
WindowFunctionMatcher::Visitor(query_info_data).visit(query_);
auto order_by = std::make_shared<ASTExpressionList>();
auto order_by_elem = std::make_shared<ASTOrderByElement>();
@ -749,12 +749,12 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery()
return select_with_union_query;
}
ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id)
ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id_)
{
/// We will create a query to create an internal table.
auto inner_create_query = std::make_shared<ASTCreateQuery>();
inner_create_query->setDatabase(inner_table_id.getDatabaseName());
inner_create_query->setTable(inner_table_id.getTableName());
inner_create_query->setDatabase(inner_table_id_.getDatabaseName());
inner_create_query->setTable(inner_table_id_.getTableName());
Aliases aliases;
QueryAliasesVisitor(aliases).visit(inner_query);
@ -1208,7 +1208,7 @@ StorageWindowView::StorageWindowView(
if (has_inner_target_table)
{
/// create inner target table
auto create_context = Context::createCopy(context_);
auto create_context_ = Context::createCopy(context_);
auto target_create_query = std::make_shared<ASTCreateQuery>();
target_create_query->setDatabase(table_id_.database_name);
target_create_query->setTable(generateTargetTableName(table_id_));
@ -1219,9 +1219,9 @@ StorageWindowView::StorageWindowView(
target_create_query->set(target_create_query->columns_list, new_columns_list);
target_create_query->set(target_create_query->storage, query.storage->ptr());
InterpreterCreateQuery create_interpreter(target_create_query, create_context);
create_interpreter.setInternal(true);
create_interpreter.execute();
InterpreterCreateQuery create_interpreter_(target_create_query, create_context_);
create_interpreter_.setInternal(true);
create_interpreter_.execute();
}
}

View File

@ -37,6 +37,7 @@
<disk>s3</disk>
<path>/jbod1/</path>
<max_size>1000000000</max_size>
<max_file_segment_size>1Gi</max_file_segment_size>
</s3_with_cache_and_jbod>
<s3_r>
<type>s3</type>

View File

@ -101,6 +101,13 @@ def run_s3_mocks(cluster):
)
def list_objects(cluster, path="data/"):
minio = cluster.minio_client
objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
logging.info(f"list_objects ({len(objects)}): {[x.object_name for x in objects]}")
return objects
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
@ -130,9 +137,7 @@ def drop_table(cluster, node_name):
wait_for_delete_s3_objects(cluster, 0)
finally:
# Remove extra objects to prevent tests cascade failing
for obj in list(
minio.list_objects(cluster.minio_bucket, "data/", recursive=True)
):
for obj in list_objects(cluster, "data/"):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@ -297,7 +302,7 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -306,14 +311,14 @@ def test_attach_detach_partition(cluster, node_name):
wait_for_delete_inactive_parts(node, "s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test ATTACH PARTITION '2020-01-03'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -322,7 +327,7 @@ def test_attach_detach_partition(cluster, node_name):
wait_for_delete_inactive_parts(node, "s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(4096)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 1
)
@ -331,7 +336,7 @@ def test_attach_detach_partition(cluster, node_name):
wait_for_delete_inactive_parts(node, "s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/")))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 1
)
node.query(
@ -340,7 +345,7 @@ def test_attach_detach_partition(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 0
)
@ -674,13 +679,22 @@ def test_cache_with_full_disk_space(cluster, node_name):
node = cluster.instances[node_name]
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_with_cache_and_jbod';"
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY value SETTINGS storage_policy='s3_with_cache_and_jbod';"
)
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 500000"
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(100000000)"
)
node.query(
"SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value FORMAT Null"
out = node.exec_in_container(
[
"/usr/bin/clickhouse",
"benchmark",
"--iterations",
"10",
"--max_threads",
"100",
"--query",
"SELECT count() FROM s3_test WHERE key < 40000000 or key > 80000000 SETTINGS max_read_buffer_size='44Ki'",
]
)
assert node.contains_in_log(
"Insert into cache is skipped due to insufficient disk space"

View File

@ -20,3 +20,17 @@ SELECT toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolk
SELECT toString(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'));
SELECT toString(toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolkata'));
SELECT toString(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), 'Asia/Kolkata');
SELECT toTimeZone(dt, tz) FROM (
SELECT toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul') AS dt, arrayJoin(['Asia/Kolkata', 'UTC']) AS tz
); -- { serverError ILLEGAL_COLUMN }
SELECT materialize('Asia/Kolkata') t, toTimeZone(toDateTime('2017-11-05 08:07:47', 'Asia/Istanbul'), t); -- { serverError ILLEGAL_COLUMN }
CREATE TEMPORARY TABLE tmp AS SELECT arrayJoin(['Europe/Istanbul', 'Asia/Istanbul']);
SELECT toTimeZone(now(), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT now((*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT now64(1, (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT toStartOfInterval(now(), INTERVAL 3 HOUR, (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT snowflakeToDateTime(toInt64(123), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT toUnixTimestamp(now(), (*,).1) FROM tmp; -- { serverError ILLEGAL_COLUMN }
SELECT toDateTimeOrDefault('2023-04-12 16:43:32', (*,).1, now()) FROM tmp; -- { serverError ILLEGAL_COLUMN }

View File

@ -0,0 +1,2 @@
1 foo ['0','1','2','3','4'] {'k1':'v1'}
2 bar ['0','1','2','3','4'] {'k2':'v2'}

View File

@ -0,0 +1,11 @@
DROP TABLE IF EXISTS t_memory_compressed;
CREATE TABLE t_memory_compressed (id UInt64, s String, arr Array(LowCardinality(String)), m Map(String, String))
ENGINE = Memory SETTINGS compress = 1;
INSERT INTO t_memory_compressed VALUES (1, 'foo', range(5), map('k1', 'v1'));
INSERT INTO t_memory_compressed VALUES (2, 'bar', range(5), map('k2', 'v2'));
SELECT * FROM t_memory_compressed ORDER BY id;
DROP TABLE t_memory_compressed;

View File

@ -0,0 +1,27 @@
0000
0000
J523
A000
F634
F634
J525
J525
J523
M235
M235
S530
S530
---
0000
0000
J523
A000
F634
F634
J525
J525
J523
M235
M235
S530
S530

View File

@ -0,0 +1,28 @@
SELECT soundex('');
SELECT soundex('12345');
SELECT soundex('341Jons54326ton');
SELECT soundex('A2222222');
SELECT soundex('Fairdale');
SELECT soundex('Faredale');
SELECT soundex('Jon1s2o3n');
SELECT soundex('Jonson');
SELECT soundex('Jonston');
SELECT soundex('M\acDonald22321');
SELECT soundex('MacDonald');
SELECT soundex('S3344mith0000');
SELECT soundex('Smith');
SELECT '---';
-- same input strings but in a table
DROP TABLE IF EXISTS tab;
CREATE TABLE tab (col String) Engine=MergeTree ORDER BY col;
INSERT INTO tab VALUES ('') ('12345') ('341Jons54326ton') ('A2222222') ('Fairdale') ('Faredale') ('Jon1s2o3n') ('Jonson') ('Jonston') ('M\acDonald22321') ('MacDonald') ('S3344mith0000') ('Smith');
SELECT soundex(col) FROM tab;
DROP TABLE tab;
-- negative tests
SELECT soundex(toFixedString('Smith', 5)); -- { serverError ILLEGAL_COLUMN }
SELECT soundex(5); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }

View File

@ -0,0 +1,12 @@
---
tab idx bloom_filter
---
Expression ((Projection + Before ORDER BY))
Filter (WHERE)
ReadFromMergeTree (default.tab)
Indexes:
Skip
Name: idx
Description: bloom_filter GRANULARITY 1
Parts: 1/1
Granules: 1/1

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS tab;
CREATE TABLE tab
(
foo Array(LowCardinality(String)),
INDEX idx foo TYPE bloom_filter
)
ENGINE = MergeTree
PRIMARY KEY tuple();
INSERT INTO tab VALUES (['a', 'b']);
SELECT '---';
SELECT table, name, type
FROM system.data_skipping_indices
WHERE database = currentDatabase() AND table = 'tab';
SELECT '---';
EXPLAIN indexes = 1 SELECT * FROM tab WHERE has(foo, 'b');
DROP TABLE tab;

View File

@ -0,0 +1,2 @@
0
Ok 0

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
url="${CLICKHOUSE_URL}&async_insert=1&wait_for_async_insert=1"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE t_async_insert_empty_data (id UInt32) ENGINE = Memory"
echo -n '' | ${CLICKHOUSE_CURL} -sS "$url&query=INSERT%20INTO%20t_async_insert_empty_data%20FORMAT%20JSONEachRow" --data-binary @-
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
${CLICKHOUSE_CLIENT} -q "SELECT count() FROM t_async_insert_empty_data"
${CLICKHOUSE_CLIENT} -q "SELECT status, bytes FROM system.asynchronous_insert_log WHERE database = '$CLICKHOUSE_DATABASE' AND table = 't_async_insert_empty_data'"
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS t_async_insert_empty_data"

View File

@ -31,6 +31,8 @@ SLACK_URL_DEFAULT = DRY_RUN_MARK
FLAKY_ALERT_PROBABILITY = 0.20
MAX_TESTS_TO_REPORT = 4
# Slack has a stupid limitation on message size, it splits long messages into multiple ones breaking formatting
MESSAGE_LENGTH_LIMIT = 4000
@ -64,6 +66,7 @@ WHERE 1
AND test_status LIKE 'F%')
AND test_context_raw NOT LIKE '%CannotSendRequest%' and test_context_raw NOT LIKE '%Server does not respond to health check%'
GROUP BY test_name
ORDER BY (count_prev_periods + count) DESC
"""
# Returns total number of failed checks during the last 24 hours
@ -155,11 +158,17 @@ def format_failed_tests_list(failed_tests, failure_type):
else:
res = "There are {} new {} tests:\n".format(len(failed_tests), failure_type)
for name, report in failed_tests:
for name, report in failed_tests[:MAX_TESTS_TO_REPORT]:
cidb_url = get_play_url(ALL_RECENT_FAILURES_QUERY.format(name))
res += "- *{}* - <{}|Report> - <{}|CI DB> \n".format(
name, report, cidb_url
)
if MAX_TESTS_TO_REPORT < len(failed_tests):
res += "- and {} other tests... :this-is-fine-fire:".format(
len(failed_tests) - MAX_TESTS_TO_REPORT
)
return res
@ -199,7 +208,7 @@ def get_too_many_failures_message_impl(failures_count):
if curr_failures < MAX_FAILURES:
return None
if prev_failures < MAX_FAILURES:
return "*CI is broken: there are {} failures during the last 24 hours*".format(
return ":alert: *CI is broken: there are {} failures during the last 24 hours*".format(
curr_failures
)
if curr_failures < prev_failures: