Merge branch 'master' into s3queue_production_ready

This commit is contained in:
Alexey Milovidov 2023-12-06 12:25:29 +01:00
commit f8de7f6bc2
78 changed files with 399 additions and 1281 deletions

6
.gitmodules vendored
View File

@ -354,9 +354,3 @@
[submodule "contrib/aklomp-base64"]
path = contrib/aklomp-base64
url = https://github.com/aklomp/base64.git
[submodule "contrib/pocketfft"]
path = contrib/pocketfft
url = https://github.com/mreineck/pocketfft.git
[submodule "contrib/sqids-cpp"]
path = contrib/sqids-cpp
url = https://github.com/sqids/sqids-cpp.git

View File

@ -44,7 +44,6 @@ else ()
endif ()
add_contrib (miniselect-cmake miniselect)
add_contrib (pdqsort-cmake pdqsort)
add_contrib (pocketfft-cmake pocketfft)
add_contrib (crc32-vpmsum-cmake crc32-vpmsum)
add_contrib (sparsehash-c11-cmake sparsehash-c11)
add_contrib (abseil-cpp-cmake abseil-cpp)
@ -156,7 +155,6 @@ add_contrib (nuraft-cmake NuRaft)
add_contrib (fast_float-cmake fast_float)
add_contrib (datasketches-cpp-cmake datasketches-cpp)
add_contrib (incbin-cmake incbin)
add_contrib (sqids-cpp-cmake sqids-cpp)
option(ENABLE_NLP "Enable NLP functions support" ${ENABLE_LIBRARIES})
if (ENABLE_NLP)

1
contrib/pocketfft vendored

@ -1 +0,0 @@
Subproject commit 9efd4da52cf8d28d14531d14e43ad9d913807546

View File

@ -1,10 +0,0 @@
option (ENABLE_POCKETFFT "Enable pocketfft" ${ENABLE_LIBRARIES})
if (NOT ENABLE_POCKETFFT)
message(STATUS "Not using pocketfft")
return()
endif()
add_library(_pocketfft INTERFACE)
target_include_directories(_pocketfft INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/pocketfft)
add_library(ch_contrib::pocketfft ALIAS _pocketfft)

1
contrib/sqids-cpp vendored

@ -1 +0,0 @@
Subproject commit 3756e537d4d48cc0dd4176801fe19f99601439b0

View File

@ -1,14 +0,0 @@
option(ENABLE_SQIDS "Enable sqids support" ${ENABLE_LIBRARIES})
if ((NOT ENABLE_SQIDS))
message (STATUS "Not using sqids")
return()
endif()
set (SQIDS_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/sqids-cpp")
set (SQIDS_INCLUDE_DIR "${SQIDS_SOURCE_DIR}/include")
add_library(_sqids INTERFACE)
target_include_directories(_sqids SYSTEM INTERFACE ${SQIDS_INCLUDE_DIR})
add_library(ch_contrib::sqids ALIAS _sqids)
target_compile_definitions(_sqids INTERFACE ENABLE_SQIDS)

View File

@ -93,7 +93,7 @@ While ClickHouse can work over NFS, it is not the best idea.
## Linux Kernel {#linux-kernel}
Dont use an outdated Linux kernel.
Don't use an outdated Linux kernel.
## Network {#network}

View File

@ -1,48 +0,0 @@
---
toc_priority: 112
---
# groupArraySorted {#groupArraySorted}
Returns an array with the first N items in ascending order.
``` sql
groupArraySorted(N)(column)
```
**Arguments**
- `N` The number of elements to return.
If the parameter is omitted, default value is the size of input.
- `column` The value (Integer, String, Float and other Generic types).
**Example**
Gets the first 10 numbers:
``` sql
SELECT groupArraySorted(10)(number) FROM numbers(100)
```
``` text
┌─groupArraySorted(10)(number)─┐
│ [0,1,2,3,4,5,6,7,8,9] │
└──────────────────────────────┘
```
Gets all the String implementations of all numbers in column:
``` sql
SELECT groupArraySorted(str) FROM (SELECT toString(number) as str FROM numbers(5));
```
``` text
┌─groupArraySorted(str)────────┐
│ ['0','1','2','3','4'] │
└──────────────────────────────┘
```

View File

@ -54,7 +54,6 @@ ClickHouse-specific aggregate functions:
- [groupArrayMovingAvg](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingavg.md)
- [groupArrayMovingSum](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingsum.md)
- [groupArraySample](./grouparraysample.md)
- [groupArraySorted](/docs/en/sql-reference/aggregate-functions/reference/grouparraysorted.md)
- [groupBitAnd](/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md)
- [groupBitOr](/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md)
- [groupBitXor](/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md)

View File

@ -1776,34 +1776,3 @@ Result:
│ (('queries','database','analytical'),('oriented','processing','DBMS')) │
└────────────────────────────────────────────────────────────────────────┘
```
## sqid
Transforms numbers into YouTube-like short URL hash called [Sqid](https://sqids.org/).
To use this function, set setting `allow_experimental_hash_functions = 1`.
**Syntax**
```sql
sqid(number1,...)
```
**Arguments**
- A variable number of UInt8, UInt16, UInt32 or UInt64 numbers.
**Returned Value**
A hash id [String](/docs/en/sql-reference/data-types/string.md).
**Example**
```sql
SELECT sqid(1, 2, 3, 4, 5);
```
```response
┌─sqid(1, 2, 3, 4, 5)─┐
│ gXHfJ1C6dN │
└─────────────────────┘
```

View File

@ -1,47 +0,0 @@
---
slug: /en/sql-reference/functions/time-series-functions
sidebar_position: 172
sidebar_label: Time Series
---
# Time Series Functions
Below functions are used for time series analysis.
## seriesPeriodDetectFFT
Finds the period of the given time series data using FFT
Detect Period in time series data using FFT.
FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
**Syntax**
``` sql
seriesPeriodDetectFFT(series);
```
**Arguments**
- `series` - An array of numeric values
**Returned value**
- A real value equal to the period of time series
Type: [Float64](../../sql-reference/data-types/float.md).
**Examples**
Query:
``` sql
SELECT seriesPeriodDetectFFT([1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6, 1, 4, 6]) AS print_0;
```
Result:
``` text
┌───────────print_0──────┐
│ 3 │
└────────────────────────┘
```

View File

@ -456,7 +456,7 @@ Closes the socket and gracefully terminates the existing connections to the serv
However, if the corresponding protocol settings were not specified in the clickhouse-server configuration, this command will have no effect.
```sql
SYSTEM STOP LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QUERIES CUSTOM | TCP | TCP_WITH_PROXY | TCP_SECURE | HTTP | HTTPS | MYSQL | GRPC | POSTGRESQL | PROMETHEUS | CUSTOM 'protocol']
SYSTEM STOP LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QUERIES CUSTOM | TCP | TCP WITH PROXY | TCP SECURE | HTTP | HTTPS | MYSQL | GRPC | POSTGRESQL | PROMETHEUS | CUSTOM 'protocol']
```
- If `CUSTOM 'protocol'` modifier is specified, the custom protocol with the specified name defined in the protocols section of the server configuration will be stopped.
@ -471,5 +471,5 @@ Allows new connections to be established on the specified protocols.
However, if the server on the specified port and protocol was not stopped using the SYSTEM STOP LISTEN command, this command will have no effect.
```sql
SYSTEM START LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QUERIES CUSTOM | TCP | TCP_WITH_PROXY | TCP_SECURE | HTTP | HTTPS | MYSQL | GRPC | POSTGRESQL | PROMETHEUS | CUSTOM 'protocol']
SYSTEM START LISTEN [ON CLUSTER cluster_name] [QUERIES ALL | QUERIES DEFAULT | QUERIES CUSTOM | TCP | TCP WITH PROXY | TCP SECURE | HTTP | HTTPS | MYSQL | GRPC | POSTGRESQL | PROMETHEUS | CUSTOM 'protocol']
```

View File

@ -36,7 +36,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 2)
@ -51,8 +51,8 @@ public:
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk_from = global_context->getDisk(disk_name_from);
DiskPtr disk_to = global_context->getDisk(disk_name_to);
DiskPtr disk_from = disk_selector->get(disk_name_from);
DiskPtr disk_to = disk_selector->get(disk_name_to);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);

View File

@ -27,7 +27,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 2)
@ -41,7 +41,7 @@ public:
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);

View File

@ -33,7 +33,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
@ -46,7 +46,7 @@ public:
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path = validatePathAndGetAsRelative(path);

View File

@ -26,8 +26,8 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
Poco::Util::LayeredConfiguration &) override
std::shared_ptr<DiskSelector> &,
Poco::Util::LayeredConfiguration & config) override
{
if (!command_arguments.empty())
{
@ -35,8 +35,29 @@ public:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Bad Arguments");
}
for (const auto & [disk_name, _] : global_context->getDisksMap())
std::cout << disk_name << '\n';
constexpr auto config_prefix = "storage_configuration.disks";
constexpr auto default_disk_name = "default";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
bool has_default_disk = false;
/// For the output to be ordered
std::set<String> disks;
for (const auto & disk_name : keys)
{
if (disk_name == default_disk_name)
has_default_disk = true;
disks.insert(disk_name);
}
if (!has_default_disk)
disks.insert(default_disk_name);
for (const auto & disk : disks)
std::cout << disk << '\n';
}
};
}

View File

@ -34,7 +34,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
@ -47,7 +47,7 @@ public:
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path = validatePathAndGetAsRelative(path);
bool recursive = config.getBool("recursive", false);

View File

@ -26,7 +26,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 2)
@ -40,7 +40,7 @@ public:
const String & path_from = command_arguments[0];
const String & path_to = command_arguments[1];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);

View File

@ -36,7 +36,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
@ -47,7 +47,7 @@ public:
String disk_name = config.getString("disk", "default");
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path = validatePathAndGetAsRelative(command_arguments[0]);

View File

@ -26,7 +26,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
@ -39,7 +39,7 @@ public:
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path = validatePathAndGetAsRelative(path);

View File

@ -37,7 +37,7 @@ public:
void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) override
{
if (command_arguments.size() != 1)
@ -50,7 +50,7 @@ public:
const String & path = command_arguments[0];
DiskPtr disk = global_context->getDisk(disk_name);
DiskPtr disk = disk_selector->get(disk_name);
String relative_path = validatePathAndGetAsRelative(path);

View File

@ -209,7 +209,35 @@ int DisksApp::main(const std::vector<String> & /*args*/)
po::parsed_options parsed = parser.run();
args = po::collect_unrecognized(parsed.options, po::collect_unrecognized_mode::include_positional);
}
command->execute(args, global_context, config());
std::unordered_set<std::string> disks
{
config().getString("disk", "default"),
config().getString("disk-from", config().getString("disk", "default")),
config().getString("disk-to", config().getString("disk", "default")),
};
auto validator = [&disks](
const Poco::Util::AbstractConfiguration & config,
const std::string & disk_config_prefix,
const std::string & disk_name)
{
if (!disks.contains(disk_name))
return false;
const auto disk_type = config.getString(disk_config_prefix + ".type", "local");
if (disk_type == "cache")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk type 'cache' of disk {} is not supported by clickhouse-disks", disk_name);
return true;
};
constexpr auto config_prefix = "storage_configuration.disks";
auto disk_selector = std::make_shared<DiskSelector>();
disk_selector->initialize(config(), config_prefix, global_context, validator);
command->execute(args, disk_selector, config());
return Application::EXIT_OK;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Disks/IDisk.h>
#include <Disks/DiskSelector.h>
#include <boost/program_options.hpp>
@ -25,7 +26,7 @@ public:
virtual void execute(
const std::vector<String> & command_arguments,
DB::ContextMutablePtr & global_context,
std::shared_ptr<DiskSelector> & disk_selector,
Poco::Util::LayeredConfiguration & config) = 0;
const std::optional<ProgramOptionsDescription> & getCommandOptions() const { return command_option_description; }

View File

@ -1,82 +0,0 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionGroupArraySorted.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Common/Exception.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
template <template <typename> class AggregateFunctionTemplate, typename ... TArgs>
AggregateFunctionPtr createWithNumericOrTimeType(const IDataType & argument_type, TArgs && ... args)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::Date) return std::make_shared<AggregateFunctionTemplate<UInt16>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::DateTime) return std::make_shared<AggregateFunctionTemplate<UInt32>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::IPv4) return std::make_shared<AggregateFunctionTemplate<IPv4>>(std::forward<TArgs>(args)...);
return AggregateFunctionPtr(createWithNumericType<AggregateFunctionTemplate, TArgs...>(argument_type, std::forward<TArgs>(args)...));
}
template <typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionGroupArraySortedImpl(const DataTypePtr & argument_type, const Array & parameters, TArgs ... args)
{
if (auto res = createWithNumericOrTimeType<GroupArraySortedNumericImpl>(*argument_type, argument_type, parameters, std::forward<TArgs>(args)...))
return AggregateFunctionPtr(res);
WhichDataType which(argument_type);
return std::make_shared<GroupArraySortedGeneralImpl<GroupArraySortedNodeGeneral>>(argument_type, parameters, std::forward<TArgs>(args)...);
}
AggregateFunctionPtr createAggregateFunctionGroupArraySorted(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (parameters.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should have limit argument", name);
}
else if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive number", name);
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive number", name);
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} does not support this number of arguments", name);
return createAggregateFunctionGroupArraySortedImpl(argument_types[0], parameters, max_elems);
}
}
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = false };
factory.registerFunction("groupArraySorted", { createAggregateFunctionGroupArraySorted, properties });
}
}

View File

@ -1,355 +0,0 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Functions/array/arraySort.h>
#include <Common/Exception.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/IDataType.h>
#include <base/sort.h>
#include <Columns/IColumn.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/RadixSort.h>
#include <algorithm>
#include <type_traits>
#include <utility>
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE 0xFFFFFF
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
template <typename T>
struct GroupArraySortedData;
template <typename T>
struct GroupArraySortedData
{
/// For easy serialization.
static_assert(std::has_unique_object_representations_v<T> || std::is_floating_point_v<T>);
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value;
};
template <typename T>
class GroupArraySortedNumericImpl final
: public IAggregateFunctionDataHelper<GroupArraySortedData<T>, GroupArraySortedNumericImpl<T>>
{
using Data = GroupArraySortedData<T>;
UInt64 max_elems;
SerializationPtr serialization;
public:
explicit GroupArraySortedNumericImpl(
const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<GroupArraySortedData<T>, GroupArraySortedNumericImpl<T>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, max_elems(max_elems_)
, serialization(data_type_->getDefaultSerialization())
{
}
String getName() const override { return "groupArraySorted"; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const auto & row_value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
auto & cur_elems = this->data(place);
cur_elems.value.push_back(row_value, arena);
/// To optimize, we sort (2 * max_size) elements of input array over and over again
/// and after each loop we delete the last half of sorted array
if (cur_elems.value.size() >= max_elems * 2)
{
RadixSort<RadixSortNumTraits<T>>::executeLSD(cur_elems.value.data(), cur_elems.value.size());
cur_elems.value.resize(max_elems, arena);
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
if (rhs_elems.value.empty())
return;
if (rhs_elems.value.size())
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);
RadixSort<RadixSortNumTraits<T>>::executeLSD(cur_elems.value.data(), cur_elems.value.size());
size_t elems_size = cur_elems.value.size() < max_elems ? cur_elems.value.size() : max_elems;
cur_elems.value.resize(elems_size, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & value = this->data(place).value;
size_t size = value.size();
writeVarUInt(size, buf);
for (const auto & elem : value)
writeBinaryLittleEndian(elem, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
auto & value = this->data(place).value;
value.resize(size, arena);
for (auto & element : value)
readBinaryLittleEndian(element, buf);
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
auto& value = this->data(place).value;
RadixSort<RadixSortNumTraits<T>>::executeLSD(value.data(), value.size());
size_t elems_size = value.size() < max_elems ? value.size() : max_elems;
value.resize(elems_size, arena);
size_t size = value.size();
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
offsets_to.push_back(offsets_to.back() + size);
if (size)
{
typename ColumnVector<T>::Container & data_to = assert_cast<ColumnVector<T> &>(arr_to.getData()).getData();
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
RadixSort<RadixSortNumTraits<T>>::executeLSD(value.data(), value.size());
value.resize(elems_size, arena);
}
}
bool allocatesMemoryInArena() const override { return true; }
};
template <typename Node, bool has_sampler>
struct GroupArraySortedGeneralData;
template <typename Node>
struct GroupArraySortedGeneralData<Node, false>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Node *), 4096>;
using Array = PODArray<Field, 32, Allocator>;
Array value;
};
template <typename Node>
struct GroupArraySortedNodeBase
{
UInt64 size; // size of payload
/// Returns pointer to actual payload
char * data() { return reinterpret_cast<char *>(this) + sizeof(Node); }
const char * data() const { return reinterpret_cast<const char *>(this) + sizeof(Node); }
};
struct GroupArraySortedNodeString : public GroupArraySortedNodeBase<GroupArraySortedNodeString>
{
using Node = GroupArraySortedNodeString;
};
struct GroupArraySortedNodeGeneral : public GroupArraySortedNodeBase<GroupArraySortedNodeGeneral>
{
using Node = GroupArraySortedNodeGeneral;
};
/// Implementation of groupArraySorted for Generic data via Array
template <typename Node>
class GroupArraySortedGeneralImpl final
: public IAggregateFunctionDataHelper<GroupArraySortedGeneralData<Node, false>, GroupArraySortedGeneralImpl<Node>>
{
using Data = GroupArraySortedGeneralData<Node, false>;
static Data & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt64 max_elems;
SerializationPtr serialization;
public:
GroupArraySortedGeneralImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<GroupArraySortedGeneralData<Node, false>, GroupArraySortedGeneralImpl<Node>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, data_type(this->argument_types[0])
, max_elems(max_elems_)
, serialization(data_type->getDefaultSerialization())
{
}
String getName() const override { return "groupArraySorted"; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & cur_elems = data(place);
cur_elems.value.push_back(columns[0][0][row_num], arena);
/// To optimize, we sort (2 * max_size) elements of input array over and over again and
/// after each loop we delete the last half of sorted array
if (cur_elems.value.size() >= max_elems * 2)
{
std::sort(cur_elems.value.begin(), cur_elems.value.begin() + (max_elems * 2));
cur_elems.value.erase(cur_elems.value.begin() + max_elems, cur_elems.value.begin() + (max_elems * 2));
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = data(place);
auto & rhs_elems = data(rhs);
if (rhs_elems.value.empty())
return;
UInt64 new_elems = rhs_elems.value.size();
for (UInt64 i = 0; i < new_elems; ++i)
cur_elems.value.push_back(rhs_elems.value[i], arena);
checkArraySize(cur_elems.value.size(), AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
if (!cur_elems.value.empty())
{
std::sort(cur_elems.value.begin(), cur_elems.value.end());
if (cur_elems.value.size() > max_elems)
cur_elems.value.resize(max_elems, arena);
}
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & value = data(place).value;
size_t size = value.size();
checkArraySize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
writeVarUInt(size, buf);
for (const Field & elem : value)
{
if (elem.isNull())
{
writeBinary(false, buf);
}
else
{
writeBinary(true, buf);
serialization->serializeBinary(elem, buf, {});
}
}
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
checkArraySize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
auto & value = data(place).value;
value.resize(size, arena);
for (Field & elem : value)
{
UInt8 is_null = 0;
readBinary(is_null, buf);
if (!is_null)
serialization->deserializeBinary(elem, buf, {});
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
auto & column_array = assert_cast<ColumnArray &>(to);
auto & value = data(place).value;
if (!value.empty())
{
std::sort(value.begin(), value.end());
if (value.size() > max_elems)
value.resize_exact(max_elems, arena);
}
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + value.size());
auto & column_data = column_array.getData();
if (std::is_same_v<Node, GroupArraySortedNodeString>)
{
auto & string_offsets = assert_cast<ColumnString &>(column_data).getOffsets();
string_offsets.reserve(string_offsets.size() + value.size());
}
for (const Field& field : value)
column_data.insert(field);
}
bool allocatesMemoryInArena() const override { return true; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
}

View File

@ -15,7 +15,6 @@ void registerAggregateFunctionCount(AggregateFunctionFactory &);
void registerAggregateFunctionDeltaSum(AggregateFunctionFactory &);
void registerAggregateFunctionDeltaSumTimestamp(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory);
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
@ -112,7 +111,6 @@ void registerAggregateFunctions()
registerAggregateFunctionDeltaSum(factory);
registerAggregateFunctionDeltaSumTimestamp(factory);
registerAggregateFunctionGroupArray(factory);
registerAggregateFunctionGroupArraySorted(factory);
registerAggregateFunctionGroupUniqArray(factory);
registerAggregateFunctionGroupArrayInsertAt(factory);
registerAggregateFunctionsQuantile(factory);

View File

@ -436,10 +436,6 @@ dbms_target_link_libraries(PRIVATE ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::zstd)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::xz)
if (TARGET ch_contrib::pocketfft)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::pocketfft)
endif ()
if (TARGET ch_contrib::icu)
dbms_target_link_libraries (PRIVATE ch_contrib::icu)
endif ()

View File

@ -1,6 +1,6 @@
#pragma once
#include <time.h>
#include <ctime>
#include <cstdlib>
#include <climits>
#include <random>
@ -180,6 +180,7 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)

View File

@ -27,7 +27,6 @@
#cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS
#cmakedefine01 USE_SQIDS
#cmakedefine01 USE_NLP
#cmakedefine01 USE_VECTORSCAN
#cmakedefine01 USE_LIBURING
@ -62,7 +61,6 @@
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_BCRYPT
#cmakedefine01 USE_LIBARCHIVE
#cmakedefine01 USE_POCKETFFT
/// This is needed for .incbin in assembly. For some reason, include paths don't work there in presence of LTO.
/// That's why we use absolute paths.

View File

@ -69,7 +69,7 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config,
namespace
{
bool diskValidator(const Poco::Util::AbstractConfiguration & config, const std::string & disk_config_prefix)
bool diskValidator(const Poco::Util::AbstractConfiguration & config, const std::string & disk_config_prefix, const std::string &)
{
const auto disk_type = config.getString(disk_config_prefix + ".type", "local");

View File

@ -44,9 +44,9 @@ void DiskSelector::initialize(const Poco::Util::AbstractConfiguration & config,
if (disk_name == default_disk_name)
has_default_disk = true;
auto disk_config_prefix = config_prefix + "." + disk_name;
const auto disk_config_prefix = config_prefix + "." + disk_name;
if (disk_validator && !disk_validator(config, disk_config_prefix))
if (disk_validator && !disk_validator(config, disk_config_prefix, disk_name))
continue;
disks.emplace(disk_name, factory.create(disk_name, config, disk_config_prefix, context, disks));

View File

@ -23,7 +23,7 @@ public:
DiskSelector() = default;
DiskSelector(const DiskSelector & from) = default;
using DiskValidator = std::function<bool(const Poco::Util::AbstractConfiguration & config, const String & disk_config_prefix)>;
using DiskValidator = std::function<bool(const Poco::Util::AbstractConfiguration & config, const String & disk_config_prefix, const String & disk_name)>;
void initialize(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, ContextPtr context, DiskValidator disk_validator = {});
DiskSelectorPtr updateFromConfig(

View File

@ -79,10 +79,6 @@ if (ENABLE_NLP)
list (APPEND PRIVATE_LIBS ch_contrib::cld2)
endif()
if (TARGET ch_contrib::sqids)
list (APPEND PRIVATE_LIBS ch_contrib::sqids)
endif()
if (TARGET ch_contrib::h3)
list (APPEND PRIVATE_LIBS ch_contrib::h3)
endif()
@ -99,10 +95,6 @@ if (TARGET ch_contrib::rapidjson)
list (APPEND PRIVATE_LIBS ch_contrib::rapidjson)
endif()
if (TARGET ch_contrib::pocketfft)
list (APPEND PRIVATE_LIBS ch_contrib::pocketfft)
endif()
if (TARGET ch_contrib::crc32-vpmsum)
list (APPEND PUBLIC_LIBS ch_contrib::crc32-vpmsum)
endif()

View File

@ -1,97 +0,0 @@
#include "config.h"
#ifdef ENABLE_SQIDS
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <sqids/sqids.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int SUPPORT_IS_DISABLED;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
// sqid(number1, ...)
class FunctionSqid : public IFunction
{
public:
static constexpr auto name = "sqid";
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isVariadic() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
static FunctionPtr create(ContextPtr context)
{
if (!context->getSettingsRef().allow_experimental_hash_functions)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Hashing function '{}' is experimental. Set `allow_experimental_hash_functions` setting to enable it",
name);
return std::make_shared<FunctionSqid>();
}
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least one argument.", getName());
for (size_t i = 0; i < arguments.size(); ++i)
{
if (!checkDataTypes<
DataTypeUInt8,
DataTypeUInt16,
DataTypeUInt32,
DataTypeUInt64>(arguments[i].get()))
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument {} for function {} must have datatype UInt*, given type: {}.",
i, getName(), arguments[i]->getName());
}
return std::make_shared<DataTypeString>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
size_t num_args = arguments.size();
auto col_res = ColumnString::create();
sqidscxx::Sqids<> sqids;
std::vector<UInt64> numbers(num_args);
for (size_t i = 0; i < input_rows_count; ++i)
{
for (size_t j = 0; j < num_args; ++j)
{
const ColumnWithTypeAndName & arg = arguments[j];
ColumnPtr current_column = arg.column;
numbers[j] = current_column->getUInt(i);
}
auto id = sqids.encode(numbers);
col_res->insert(id);
}
return col_res;
}
};
REGISTER_FUNCTION(Sqid)
{
factory.registerFunction<FunctionSqid>();
}
}
#endif

View File

@ -102,8 +102,12 @@ namespace
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong JSON string to merge. Expected JSON object");
};
const auto * first_string = typeid_cast<const ColumnString *>(arguments[0].column.get());
if (!first_string)
const bool is_first_const = isColumnConst(*arguments[0].column);
const auto * first_column_arg_string = is_first_const
? checkAndGetColumnConstData<ColumnString>(arguments[0].column.get())
: checkAndGetColumn<ColumnString>(arguments[0].column.get());
if (!first_column_arg_string)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Arguments of function {} must be strings", getName());
std::vector<rapidjson::Document> merged_jsons;
@ -112,19 +116,29 @@ namespace
for (size_t i = 0; i < input_rows_count; ++i)
{
auto & merged_json = merged_jsons.emplace_back(rapidjson::Type::kObjectType, &allocator);
parse_json_document(*first_string, merged_json, i);
if (is_first_const)
parse_json_document(*first_column_arg_string, merged_json, 0);
else
parse_json_document(*first_column_arg_string, merged_json, i);
}
for (size_t col_idx = 1; col_idx < arguments.size(); ++col_idx)
{
const auto * column_string = typeid_cast<const ColumnString *>(arguments[col_idx].column.get());
if (!column_string)
const bool is_const = isColumnConst(*arguments[col_idx].column);
const auto * column_arg_string = is_const
? checkAndGetColumnConstData<ColumnString>(arguments[col_idx].column.get())
: checkAndGetColumn<ColumnString>(arguments[col_idx].column.get());
if (!column_arg_string)
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Arguments of function {} must be strings", getName());
for (size_t i = 0; i < input_rows_count; ++i)
{
rapidjson::Document document(&allocator);
parse_json_document(*column_string, document, i);
if (is_const)
parse_json_document(*column_arg_string, document, 0);
else
parse_json_document(*column_arg_string, document, i);
merge_objects(merged_jsons[i], document);
}
}

View File

@ -1,164 +0,0 @@
#include "config.h"
#if USE_POCKETFFT
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wshadow"
# pragma clang diagnostic ignored "-Wextra-semi-stmt"
# pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
# endif
# include <pocketfft_hdronly.h>
# ifdef __clang__
# pragma clang diagnostic pop
# endif
# include <cmath>
# include <Columns/ColumnArray.h>
# include <Columns/ColumnsNumber.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypesNumber.h>
# include <Functions/FunctionFactory.h>
# include <Functions/FunctionHelpers.h>
# include <Functions/IFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int ILLEGAL_COLUMN;
}
/*Detect Period in time series data using FFT.
* FFT - Fast Fourier transform (https://en.wikipedia.org/wiki/Fast_Fourier_transform)
* 1. Convert time series data to frequency domain using FFT.
* 2. Remove the 0th(the Dc component) and n/2th the Nyquist frequency
* 3. Find the peak value (highest) for dominant frequency component.
* 4. Inverse of the dominant frequency component is the period.
*/
class FunctionSeriesPeriodDetectFFT : public IFunction
{
public:
static constexpr auto name = "seriesPeriodDetectFFT";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionSeriesPeriodDetectFFT>(); }
std::string getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors args{{"time_series", &isArray<IDataType>, nullptr, "Array"}};
validateFunctionArgumentTypes(*this, arguments, args);
return std::make_shared<DataTypeFloat64>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
{
ColumnPtr array_ptr = arguments[0].column;
const ColumnArray * array = checkAndGetColumn<ColumnArray>(array_ptr.get());
const IColumn & src_data = array->getData();
auto res = ColumnFloat64::create(1);
auto & res_data = res->getData();
Float64 period;
if (executeNumber<UInt8>(src_data, period) || executeNumber<UInt16>(src_data, period) || executeNumber<UInt32>(src_data, period)
|| executeNumber<UInt64>(src_data, period) || executeNumber<Int8>(src_data, period) || executeNumber<Int16>(src_data, period)
|| executeNumber<Int32>(src_data, period) || executeNumber<Int64>(src_data, period) || executeNumber<Float32>(src_data, period)
|| executeNumber<Float64>(src_data, period))
{
res_data[0] = period;
return res;
}
else
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}",
arguments[0].column->getName(),
getName());
}
template <typename T>
bool executeNumber(const IColumn & src_data, Float64 & period) const
{
const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data);
if (!src_data_concrete)
return false;
const PaddedPODArray<T> & src_vec = src_data_concrete->getData();
size_t len = src_vec.size();
if (len < 4)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "At least four data points are needed for function {}", getName());
std::vector<Float64> src(src_vec.begin(), src_vec.end());
std::vector<std::complex<double>> out((len / 2) + 1);
pocketfft::shape_t shape{len};
pocketfft::shape_t axes;
axes.reserve(shape.size());
for (size_t i = 0; i < shape.size(); ++i)
axes.push_back(i);
pocketfft::stride_t stride_src{sizeof(double)};
pocketfft::stride_t stride_out{sizeof(std::complex<double>)};
pocketfft::r2c(shape, stride_src, stride_out, axes, pocketfft::FORWARD, src.data(), out.data(), static_cast<double>(1));
size_t spec_len = (len - 1) / 2; //removing the nyquist element when len is even
double max_mag = 0;
size_t idx = 1;
for (size_t i = 1; i < spec_len; ++i)
{
double magnitude = sqrt(out[i].real() * out[i].real() + out[i].imag() * out[i].imag());
if (magnitude > max_mag)
{
max_mag = magnitude;
idx = i;
}
}
// In case all FFT values are zero, it means the input signal is flat.
// It implies the period of the series should be 0.
if (max_mag == 0)
{
period = 0;
return true;
}
std::vector<double> xfreq(spec_len);
double step = 0.5 / (spec_len - 1);
for (size_t i = 0; i < spec_len; ++i)
xfreq[i] = i * step;
auto freq = xfreq[idx];
period = std::round(1 / freq);
return true;
}
};
REGISTER_FUNCTION(SeriesPeriodDetectFFT)
{
factory.registerFunction<FunctionSeriesPeriodDetectFFT>(FunctionDocumentation{
.description = R"(
Detects period in time series data using FFT.)",
.categories{"Time series analysis"}});
}
}
#endif

View File

@ -488,8 +488,14 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", replica_key);
}
addShard(settings, std::move(replica_addresses), /* treat_local_as_remote = */ false, current_shard_num,
std::move(insert_paths), weight, internal_replication);
addShard(
settings,
replica_addresses,
/* treat_local_as_remote = */ false,
current_shard_num,
weight,
std::move(insert_paths),
internal_replication);
}
else
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);
@ -525,7 +531,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* weight= */ 1);
++current_shard_num;
}
@ -553,15 +559,21 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* weight= */ 1);
++current_shard_num;
}
initMisc();
}
void Cluster::addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths, UInt32 weight, bool internal_replication)
void Cluster::addShard(
const Settings & settings,
Addresses addresses,
bool treat_local_as_remote,
UInt32 current_shard_num,
UInt32 weight,
ShardInfoInsertPathForInternalReplication insert_paths,
bool internal_replication)
{
Addresses shard_local_addresses;
@ -572,19 +584,28 @@ void Cluster::addShard(const Settings & settings, Addresses && addresses, bool t
{
auto replica_pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password, replica.quota_key,
replica.cluster, replica.cluster_secret,
"server", replica.compression,
replica.secure, replica.priority);
replica.host_name,
replica.port,
replica.default_database,
replica.user,
replica.password,
replica.quota_key,
replica.cluster,
replica.cluster_secret,
"server",
replica.compression,
replica.secure,
replica.priority);
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
all_replicas_pools, settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(), settings.distributed_replica_error_cap);
all_replicas_pools,
settings.load_balancing,
settings.distributed_replica_error_half_life.totalSeconds(),
settings.distributed_replica_error_cap);
if (weight)
slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size());

View File

@ -291,8 +291,14 @@ private:
struct ReplicasAsShardsTag {};
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
void addShard(const Settings & settings, Addresses && addresses, bool treat_local_as_remote, UInt32 current_shard_num,
ShardInfoInsertPathForInternalReplication && insert_paths = {}, UInt32 weight = 1, bool internal_replication = false);
void addShard(
const Settings & settings,
Addresses addresses,
bool treat_local_as_remote,
UInt32 current_shard_num,
UInt32 weight = 1,
ShardInfoInsertPathForInternalReplication insert_paths = {},
bool internal_replication = false);
/// Inter-server secret
String secret;

View File

@ -80,15 +80,6 @@ public:
UInt32 shard_count,
bool parallel_replicas_enabled);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;

View File

@ -185,8 +185,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,
@ -253,9 +256,15 @@ void executeQuery(
const auto & addresses = cluster->getShardsAddresses().at(i);
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseParallelReplicas();
stream_factory.createForShard(shard_info,
query_ast_for_shard, main_table, table_func_ptr,
new_context, plans, remote_shards, static_cast<UInt32>(shards),
stream_factory.createForShard(
shard_info,
query_ast_for_shard,
main_table,
table_func_ptr,
new_context,
plans,
remote_shards,
static_cast<UInt32>(shards),
parallel_replicas_enabled);
}

View File

@ -54,8 +54,11 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
SelectStreamFactory & stream_factory,
Poco::Logger * log,
const ASTPtr & query_ast,
ContextPtr context,
const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster,

View File

@ -43,6 +43,14 @@ public:
const bool profile_processors;
const bool trace_processors;
/// There is a performance optimization that schedules a task to the current thread, avoiding global task queue.
/// Optimization decreases contention on global task queue but may cause starvation.
/// See 01104_distributed_numbers_test.sql
/// This constant tells us that we should skip the optimization
/// if it was applied more than `max_scheduled_local_tasks` in a row.
constexpr static size_t max_scheduled_local_tasks = 128;
size_t num_scheduled_local_tasks = 0;
void wait(std::atomic_bool & finished);
void wakeUp();

View File

@ -53,6 +53,17 @@ void ExecutorTasks::tryGetTask(ExecutionThreadContext & context)
{
std::unique_lock lock(mutex);
#if defined(OS_LINUX)
if (num_threads == 1)
{
if (auto res = async_task_queue.tryGetReadyTask(lock))
{
context.setTask(static_cast<ExecutingGraph::Node *>(res.data));
return;
}
}
#endif
/// Try get async task assigned to this thread or any other task from queue.
if (auto * async_task = context.tryPopAsyncTask())
{
@ -109,11 +120,15 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
context.setTask(nullptr);
/// Take local task from queue if has one.
if (!queue.empty() && !context.hasAsyncTasks())
if (!queue.empty() && !context.hasAsyncTasks()
&& context.num_scheduled_local_tasks < context.max_scheduled_local_tasks)
{
++context.num_scheduled_local_tasks;
context.setTask(queue.front());
queue.pop();
}
else
context.num_scheduled_local_tasks = 0;
if (!queue.empty() || !async_queue.empty())
{

View File

@ -65,7 +65,7 @@ static std::string dumpTasks(const std::unordered_map<std::uintptr_t, PollingQue
return res.str();
}
PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock)
PollingQueue::TaskData PollingQueue::getTask(std::unique_lock<std::mutex> & lock, int timeout)
{
if (is_finished)
return {};
@ -74,10 +74,13 @@ PollingQueue::TaskData PollingQueue::wait(std::unique_lock<std::mutex> & lock)
epoll_event event;
event.data.ptr = nullptr;
epoll.getManyReady(1, &event, -1);
size_t num_events = epoll.getManyReady(1, &event, timeout);
lock.lock();
if (num_events == 0)
return {};
if (event.data.ptr == pipe_fd)
return {};

View File

@ -31,6 +31,8 @@ private:
std::atomic_bool is_finished = false;
std::unordered_map<std::uintptr_t, TaskData> tasks;
TaskData getTask(std::unique_lock<std::mutex> & lock, int timeout);
public:
PollingQueue();
~PollingQueue();
@ -44,7 +46,12 @@ public:
/// Wait for any descriptor. If no descriptors in queue, blocks.
/// Returns ptr which was inserted into queue or nullptr if finished was called.
/// Lock is unlocked during waiting.
TaskData wait(std::unique_lock<std::mutex> & lock);
TaskData wait(std::unique_lock<std::mutex> & lock) { return getTask(lock, -1); }
/// Get any ready descriptor.
/// Returns nullptr if no descriptor is ready or if finished was called.
/// Does not block.
TaskData tryGetReadyTask(std::unique_lock<std::mutex> & lock) { return getTask(lock, 0); }
/// Interrupt waiting.
void finish();

View File

@ -11,9 +11,9 @@ class ServerType
public:
enum Type
{
TCP,
TCP_WITH_PROXY,
TCP_SECURE,
TCP,
HTTP,
HTTPS,
MYSQL,

View File

@ -10,8 +10,6 @@
#include <Parsers/parseQuery.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/ReplaceAliasByExpressionVisitor.h>
#include <Core/Defines.h>
#include "Common/Exception.h"
@ -24,11 +22,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
using ReplaceAliasToExprVisitor = InDepthNodeVisitor<ReplaceAliasByExpressionMatcher, true>;
}
IndexDescription::IndexDescription(const IndexDescription & other)
: definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr)
, expression_list_ast(other.expression_list_ast ? other.expression_list_ast->clone() : nullptr)
@ -101,10 +94,6 @@ IndexDescription IndexDescription::getIndexFromAST(const ASTPtr & definition_ast
if (index_definition->expr)
{
expr_list = extractKeyExpressionList(index_definition->expr->clone());
ReplaceAliasToExprVisitor::Data data{columns};
ReplaceAliasToExprVisitor{data}.visit(expr_list);
result.expression_list_ast = expr_list->clone();
}
else

View File

@ -194,6 +194,7 @@ namespace ErrorCodes
extern const int SERIALIZATION_ERROR;
extern const int TOO_MANY_MUTATIONS;
extern const int CANNOT_SCHEDULE_TASK;
extern const int LIMIT_EXCEEDED;
}
static void checkSuspiciousIndices(const ASTFunction * index_function)
@ -649,6 +650,10 @@ void MergeTreeData::checkProperties(
if (projections_names.find(projection.name) != projections_names.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection with name {} already exists", backQuote(projection.name));
const auto settings = getSettings();
if (projections_names.size() >= settings->max_projections)
throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Maximum limit of {} projection(s) exceeded", settings->max_projections);
/// We cannot alter a projection so far. So here we do not try to find a projection in old metadata.
bool is_aggregate = projection.type == ProjectionDescription::Type::Aggregate;
checkProperties(*projection.metadata, *projection.metadata, attach, is_aggregate, true /* allow_nullable_key */, local_context);

View File

@ -95,8 +95,8 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "minimum interval between updates of async_block_ids_cache", 0) \
M(Bool, use_async_block_ids_cache, false, "use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "Minimum interval between updates of async_block_ids_cache", 0) \
M(Bool, use_async_block_ids_cache, true, "Use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
@ -186,6 +186,9 @@ struct Settings;
M(String, primary_key_compression_codec, "ZSTD(3)", "Compression encoding used by primary, primary key is small enough and cached, so the default compression is ZSTD(3).", 0) \
M(UInt64, marks_compress_block_size, 65536, "Mark compress block size, the actual size of the block to compress.", 0) \
M(UInt64, primary_key_compress_block_size, 65536, "Primary compress block size, the actual size of the block to compress.", 0) \
\
/** Projection settings. */ \
M(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
#define MAKE_OBSOLETE_MERGE_TREE_SETTING(M, TYPE, NAME, DEFAULT) \
M(TYPE, NAME, DEFAULT, "Obsolete setting, does nothing.", BaseSettingsHelpers::Flags::OBSOLETE)

View File

@ -1,32 +0,0 @@
#include <Storages/ReplaceAliasByExpressionVisitor.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/ColumnsDescription.h>
#include <Common/typeid_cast.h>
namespace DB
{
void ReplaceAliasByExpressionMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * identifier = ast->as<ASTIdentifier>())
{
visit(*identifier, ast, data);
}
}
void ReplaceAliasByExpressionMatcher::visit(const ASTIdentifier & column, ASTPtr & ast, Data & data)
{
const auto & column_name = column.name();
if (data.columns.hasAlias(column_name))
{
/// Alias expr is saved in default expr.
if (auto col_default = data.columns.getDefault(column_name))
{
ast = col_default->expression->clone();
}
}
}
}

View File

@ -1,40 +0,0 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTFunction;
class ColumnsDescription;
class ASTIdentifier;
/* The Visitor is used to replace ALIAS by EXPRESSION when we refer to ALIAS
* column in index definition.
*
* For example, if we have following create statement:
* CREATE TABLE t
* (
* col UInt8,
* col_alias ALIAS col + 1
* INDEX idx (col_alias) TYPE minmax
* ) ENGINE = MergeTree ORDER BY col;
* we need call the visitor to replace `col_alias` by `col` + 1 when get index
* description from index definition AST.
*/
class ReplaceAliasByExpressionMatcher
{
public:
struct Data
{
const ColumnsDescription & columns;
};
static void visit(ASTPtr & ast, Data &);
static void visit(const ASTIdentifier &, ASTPtr & ast, Data &);
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
};
}

View File

@ -908,12 +908,20 @@ void StorageDistributed::read(
}
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster, additional_shard_filter_generator);
query_plan,
header,
processed_stage,
main_table,
remote_table_function_ptr,
select_stream_factory,
log,
modified_query_ast,
local_context,
query_info,
sharding_key_expr,
sharding_key_column_name,
query_info.cluster,
additional_shard_filter_generator);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())

View File

@ -128,9 +128,6 @@ endif()
if (TARGET ch_contrib::fastops)
set(USE_FASTOPS 1)
endif()
if (TARGET ch_contrib::sqids)
set(USE_SQIDS 1)
endif()
if (TARGET ch_contrib::vectorscan)
set(USE_VECTORSCAN 1)
endif()
@ -169,8 +166,5 @@ endif()
if (TARGET ch_contrib::libarchive)
set(USE_LIBARCHIVE 1)
endif()
if (TARGET ch_contrib::pocketfft)
set(USE_POCKETFFT 1)
endif()
set(SOURCE_DIR ${PROJECT_SOURCE_DIR})

View File

@ -36,6 +36,18 @@ def process_result(file_path: Path) -> Tuple[bool, TestResults]:
test_results = [] # type: TestResults
state, report_url, description = post_commit_status_from_file(file_path)
prefix = file_path.parent.name
if description.strip() in [
"Invalid check_status.tsv",
"Not found test_results.tsv",
"Empty test_results.tsv",
]:
status = (
f'Check failed (<a href="{report_url}">Report</a>)'
if report_url != "null"
else "Check failed"
)
return False, [TestResult(f"{prefix}: {description}", status)]
is_ok = state == "success"
if is_ok and report_url == "null":
return is_ok, test_results

View File

@ -0,0 +1,10 @@
<clickhouse>
<openSSL>
<client>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</clickhouse>

View File

@ -5,6 +5,14 @@
<tcp_port>9000</tcp_port>
<http_port>8123</http_port>
<mysql_port>9004</mysql_port>
<tcp_port_secure>9440</tcp_port_secure>
<openSSL>
<server>
<certificateFile>/etc/clickhouse-server/config.d/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/config.d/server.key</privateKeyFile>
</server>
</openSSL>
<!-- Custom protocols -->
<protocols>

View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+zCCAeOgAwIBAgIJAIhI9ozZJ+TWMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDAeFw0xOTA0MjIwNDMyNTJaFw0yMDA0MjEwNDMyNTJaMBQx
EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBAK+wVUEdqF2uXvN0MJBgnAHyXi6JTi4p/F6igsrCjSNjJWzHH0vQmK8ujfcF
CkifW88i+W5eHctuEtQqNHK+t9x9YiZtXrj6m/XkOXs20mYgENSmbbbHbriTPnZB
zZrq6UqMlwIHNNAa+I3NMORQxVRaI0ybXnGVO5elr70xHpk03xL0JWKHpEqYp4db
2aBQgF6y3Ww4khxjIYqpUYXWXGFnVIRU7FKVEAM1xyKqvQzXjQ5sVM/wyHknveEF
3b/X4ggN+KNl5KOc0cWDh1/XaatJAPaUUPqZcq76tynLbP64Xm3dxHcj+gtRkO67
ef6MSg6l63m3XQP6Qb+MIkd06OsCAwEAAaNQME4wHQYDVR0OBBYEFDmODTO8QLDN
ykR3x0LIOnjNhrKhMB8GA1UdIwQYMBaAFDmODTO8QLDNykR3x0LIOnjNhrKhMAwG
A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAwaiJc7uqEpnH3aukbftDwX
m8GfEnj1HVdgg+9GGNq+9rvUYBF6gdPmjRCX9dO0cclLFx8jc2org0rTSq9WoOhX
E6qL4Eqrmc5SE3Y9jZM0h6GRD4oXK014FmtZ3T6ddZU3dQLj3BS2r1XrvmubTvGN
ZuTJNY8nx8Hh6H5XINmsEjUF9E5hog+PwCE03xt2adIdYL+gsbxASeNYyeUFpZv5
zcXR3VoakBWnAaOVgCHq2qh96QAnL7ZKzFkGf/MdwV10KU3dmb+ICbQUUdf9Gc17
aaDCIRws312F433FdXBkGs2UkB7ZZme9dfn6O1QbeTNvex2VLMqYx/CTkfFbOQA=
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvsFVBHahdrl7z
dDCQYJwB8l4uiU4uKfxeooLKwo0jYyVsxx9L0JivLo33BQpIn1vPIvluXh3LbhLU
KjRyvrfcfWImbV64+pv15Dl7NtJmIBDUpm22x264kz52Qc2a6ulKjJcCBzTQGviN
zTDkUMVUWiNMm15xlTuXpa+9MR6ZNN8S9CVih6RKmKeHW9mgUIBest1sOJIcYyGK
qVGF1lxhZ1SEVOxSlRADNcciqr0M140ObFTP8Mh5J73hBd2/1+IIDfijZeSjnNHF
g4df12mrSQD2lFD6mXKu+rcpy2z+uF5t3cR3I/oLUZDuu3n+jEoOpet5t10D+kG/
jCJHdOjrAgMBAAECggEARF66zrxb6RkSmmt8+rKeA6PuQu3sHsr4C1vyyjUr97l9
tvdGlpp20LWtSZQMjHZ3pARYTTsTHTeY3DgQcRcHNicVKx8k3ZepWeeW9vw+pL+V
zSt3RsoVrH6gsCSrfr4sS3aqzX9AbjwQvh48CJ3mLQ1m70kHV+xbZIh1+4pB/hyP
1wKyUE18ZkOptXvO/TtoHzLQCecpkXtWzmry1Eh2isvXA+NMrAtLibGsyM1mtm7i
5ozevzHabvvCDBEe+KgZdONgVhhhvm2eOd+/s4w3rw4ETud4fI/ZAJyWXhiIKFnA
VJbElWruSAoVBW7p2bsF5PbmVzvo8vXL+VylxYD+AQKBgQDhLoRKTVhNkn/QjKxq
sdOh+QZra0LzjVpAmkQzu7wZMSHEz9qePQciDQQrYKrmRF1vNcIRCVUTqWYheJ/1
lKRrCGa0ab6k96zkWMqLHD5u+UeJV7r1dJIx08ME9kNJ+x/XtB8klRIji16NiQUS
qc6p8z0M2AnbJzsRfWZRH8FeYwKBgQDHu8dzdtVGI7MtxfPOE/bfajiopDg8BdTC
pdug2T8XofRHRq7Q+0vYjTAZFT/slib91Pk6VvvPdo9VBZiL4omv4dAq6mOOdX/c
U14mJe1X5GCrr8ExZ8BfNJ3t/6sV1fcxyJwAw7iBguqxA2JqdM/wFk10K8XqvzVn
CD6O9yGt2QKBgFX1BMi8N538809vs41S7l9hCQNOQZNo/O+2M5yv6ECRkbtoQKKw
1x03bMUGNJaLuELweXE5Z8GGo5bZTe5X3F+DKHlr+DtO1C+ieUaa9HY2MAmMdLCn
2/qrREGLo+oEs4YKmuzC/taUp/ZNPKOAMISNdluFyFVg51pozPrgrVbTAoGBAKkE
LBl3O67o0t0vH8sJdeVFG8EJhlS0koBMnfgVHqC++dm+5HwPyvTrNQJkyv1HaqNt
r6FArkG3ED9gRuBIyT6+lctbIPgSUip9mbQqcBfqOCvQxGksZMur2ODncz09HLtS
CUFUXjOqNzOnq4ZuZu/Bz7U4vXiSaXxQq6+LTUKxAoGAFZU/qrI06XxnrE9A1X0W
l7DSkpZaDcu11NrZ473yONih/xOZNh4SSBpX8a7F6Pmh9BdtGqphML8NFPvQKcfP
b9H2iid2tc292uyrUEb5uTMmv61zoTwtitqLzO0+tS6PT3fXobX+eyeEWKzPBljL
HFtxG5CCXpkdnWRmaJnhTzA=
-----END PRIVATE KEY-----

View File

@ -1,15 +1,23 @@
#!/usr/bin/env python3
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import Client
from helpers.client import Client, QueryRuntimeException
import requests
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
main_node = cluster.add_instance(
"main_node",
main_configs=["configs/cluster.xml", "configs/protocols.xml"],
main_configs=[
"configs/cluster.xml",
"configs/protocols.xml",
"configs/server.crt",
"configs/server.key",
],
with_zookeeper=True,
)
backup_node = cluster.add_instance(
@ -36,11 +44,27 @@ def http_works(port=8123):
return False
def tcp_secure_works(port=9440):
client = Client(
main_node.ip_address,
port,
command=cluster.client_bin_path,
secure=True,
config=f"{SCRIPT_DIR}/configs/client.xml",
)
try:
client.query(QUERY)
except QueryRuntimeException:
return False
return True
def assert_everything_works():
custom_client = Client(main_node.ip_address, 9001, command=cluster.client_bin_path)
main_node.query(QUERY)
main_node.query(MYSQL_QUERY)
custom_client.query(QUERY)
assert tcp_secure_works()
assert http_works()
assert http_works(8124)
@ -68,6 +92,12 @@ def test_default_protocols(started_cluster):
assert "Connections to mysql failed" in main_node.query_and_get_error(MYSQL_QUERY)
main_node.query("SYSTEM START LISTEN MYSQL")
# TCP Secure
assert_everything_works()
main_node.query("SYSTEM STOP LISTEN TCP SECURE")
assert not tcp_secure_works()
main_node.query("SYSTEM START LISTEN TCP SECURE")
assert_everything_works()

View File

@ -1,31 +0,0 @@
<test>
<settings>
<max_memory_usage>30000000000</max_memory_usage>
</settings>
<substitutions>
<substitution>
<name>millions</name>
<values>
<value>50</value>
<value>100</value>
</values>
</substitution>
<substitution>
<name>window</name>
<values>
<value>10</value>
<value>1000</value>
<value>10000</value>
</values>
</substitution>
</substitutions>
<create_query>create table sorted_{millions}m engine MergeTree order by k as select number % 100 k, rand() v from numbers_mt(1000000 * {millions})</create_query>
<create_query>optimize table sorted_{millions}m final</create_query>
<query>select k, groupArraySorted({window})(v) from sorted_{millions}m group by k format Null</query>
<query>select k % 10 kk, groupArraySorted({window})(v) from sorted_{millions}m group by kk format Null</query>
<drop_query>drop table if exists sorted_{millions}m</drop_query>
</test>

View File

@ -2,3 +2,6 @@
100
100
100
300
100
100

View File

@ -1,9 +1,25 @@
-- Tags: distributed
SELECT *
FROM
(
SELECT *
FROM system.numbers
WHERE number = 100
UNION ALL
SELECT *
FROM system.numbers
WHERE number = 100
)
LIMIT 2
SETTINGS max_threads = 1 FORMAT Null;
DROP TABLE IF EXISTS d_numbers;
CREATE TABLE d_numbers (number UInt32) ENGINE = Distributed(test_cluster_two_shards, system, numbers, rand());
SELECT '100' AS number FROM d_numbers AS n WHERE n.number = 100 LIMIT 2;
SELECT '100' AS number FROM d_numbers AS n WHERE n.number = 100 LIMIT 2 SETTINGS max_threads = 1, prefer_localhost_replica=1;
SELECT sum(number) FROM (select * from remote('127.0.0.{1,1,1}', system.numbers) AS n WHERE n.number = 100 LIMIT 3) SETTINGS max_threads = 2, prefer_localhost_replica=1;
SET distributed_product_mode = 'local';

View File

@ -2,7 +2,7 @@
-- Please help shorten this list down to zero elements.
SELECT name FROM system.functions WHERE NOT is_aggregate AND origin = 'System' AND alias_to = '' AND length(description) < 10
AND name NOT IN (
'MD4', 'MD5', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'halfMD5', 'sqid',
'MD4', 'MD5', 'SHA1', 'SHA224', 'SHA256', 'SHA384', 'SHA512', 'halfMD5',
'aes_decrypt_mysql', 'aes_encrypt_mysql', 'decrypt', 'encrypt',
'base64Decode', 'base64Encode', 'tryBase64Decode',
'convertCharset',

View File

@ -1,12 +0,0 @@
-- Tags: no-fasttest
SELECT seriesPeriodDetectFFT([139, 87, 110, 68, 54, 50, 51, 53, 133, 86, 141, 97, 156, 94, 149, 95, 140, 77, 61, 50, 54, 47, 133, 72, 152, 94, 148, 105, 162, 101, 160, 87, 63, 53, 55, 54, 151, 103, 189, 108, 183, 113, 175, 113, 178, 90, 71, 62, 62, 65, 165, 109, 181, 115, 182, 121, 178, 114, 170]);
SELECT seriesPeriodDetectFFT([10,20,30,10,20,30,10,20,30, 10,20,30,10,20,30,10,20,30,10,20,30]);
SELECT seriesPeriodDetectFFT([10.1, 20.45, 40.34, 10.1, 20.45, 40.34,10.1, 20.45, 40.34,10.1, 20.45, 40.34,10.1, 20.45, 40.34,10.1, 20.45, 40.34,10.1, 20.45, 40.34, 10.1, 20.45, 40.34]);
SELECT seriesPeriodDetectFFT([10.1, 10, 400, 10.1, 10, 400, 10.1, 10, 400,10.1, 10, 400,10.1, 10, 400,10.1, 10, 400,10.1, 10, 400,10.1, 10, 400]);
SELECT seriesPeriodDetectFFT([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]);
SELECT seriesPeriodDetectFFT([1,2,3]); -- { serverError BAD_ARGUMENTS}
SELECT seriesPeriodDetectFFT(); --{ serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
SELECT seriesPeriodDetectFFT([]); -- { serverError ILLEGAL_COLUMN}
SELECT seriesPeriodDetectFFT([NULL, NULL, NULL]); -- { serverError ILLEGAL_COLUMN}
SELECT seriesPeriodDetectFFT([10,20,30,10,202,30,NULL]); -- { serverError ILLEGAL_COLUMN }

View File

@ -1,12 +0,0 @@
[0,1,2,3,4]
[0,1,2,3,4]
[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
['0','1','10','11','12','13','14','15','16','17','18','19','2','20','21','22','23','24','25','26','27','28','29','3','4','5','6','7','8','9']
[0,0,1,1,2,2,3,3,4,4]
[[1,2,3,4],[2,3,4,5],[3,4,5,6]]
[(2,1),(15,25),(30,60),(100,200)]
[0.2,2.2,6.6,12.5]
['AAA','Aaa','aaa','abc','bbc']
1000000
1000000
[0,1]

View File

@ -1,41 +0,0 @@
SELECT groupArraySorted(5)(number) FROM numbers(100);
SELECT groupArraySorted(10)(number) FROM numbers(5);
SELECT groupArraySorted(100)(number) FROM numbers(1000);
SELECT groupArraySorted(30)(str) FROM (SELECT toString(number) as str FROM numbers(30));
SELECT groupArraySorted(10)(toInt64(number/2)) FROM numbers(100);
DROP TABLE IF EXISTS test;
CREATE TABLE test (a Array(UInt64)) engine=MergeTree ORDER BY a;
INSERT INTO test VALUES ([3,4,5,6]), ([1,2,3,4]), ([2,3,4,5]);
SELECT groupArraySorted(3)(a) FROM test;
DROP TABLE test;
CREATE TABLE IF NOT EXISTS test (id Int32, data Tuple(Int32, Int32)) ENGINE = MergeTree() ORDER BY id;
INSERT INTO test (id, data) VALUES (1, (100, 200)), (2, (15, 25)), (3, (2, 1)), (4, (30, 60));
SELECT groupArraySorted(4)(data) FROM test;
DROP TABLE test;
CREATE TABLE IF NOT EXISTS test (id Int32, data Decimal32(2)) ENGINE = MergeTree() ORDER BY id;
INSERT INTO test (id, data) VALUES (1, 12.5), (2, 0.2), (3, 6.6), (4, 2.2);
SELECT groupArraySorted(4)(data) FROM test;
DROP TABLE test;
CREATE TABLE IF NOT EXISTS test (id Int32, data FixedString(3)) ENGINE = MergeTree() ORDER BY id;
INSERT INTO test (id, data) VALUES (1, 'AAA'), (2, 'bbc'), (3, 'abc'), (4, 'aaa'), (5, 'Aaa');
SELECT groupArraySorted(5)(data) FROM test;
DROP TABLE test;
CREATE TABLE test (id Decimal(76, 53), str String) ENGINE = MergeTree ORDER BY id;
INSERT INTO test SELECT number, 'test' FROM numbers(1000000);
SELECT count(id) FROM test;
SELECT count(concat(toString(id), 'a')) FROM test;
DROP TABLE test;
CREATE TABLE test (id UInt64, agg AggregateFunction(groupArraySorted(2), UInt64)) engine=MergeTree ORDER BY id;
INSERT INTO test SELECT 1, groupArraySortedState(2)(number) FROM numbers(10);
SELECT groupArraySortedMerge(2)(agg) FROM test;
DROP TABLE test;

View File

@ -1,55 +0,0 @@
Expression ((Projection + Before ORDER BY))
Filter (WHERE)
ReadFromMergeTree (02911_support_alias_column_in_indices.test1)
Indexes:
PrimaryKey
Keys:
c
Condition: (plus(c, 1) in [11, +Inf))
Parts: 1/2
Granules: 1/2
Skip
Name: i
Description: minmax GRANULARITY 1
Parts: 1/1
Granules: 1/1
Expression ((Project names + Projection))
Filter ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (02911_support_alias_column_in_indices.test1)
Indexes:
PrimaryKey
Keys:
c
Condition: (_CAST(plus(c, \'UInt64\'), 1) in [11, +Inf))
Parts: 1/2
Granules: 1/2
Skip
Name: i
Description: minmax GRANULARITY 1
Parts: 1/1
Granules: 1/1
Expression ((Projection + Before ORDER BY))
Filter (WHERE)
ReadFromMergeTree (02911_support_alias_column_in_indices.test2)
Indexes:
PrimaryKey
Keys:
c
Condition: (plus(plus(c, 1), 1) in [16, +Inf))
Parts: 1/2
Granules: 1/2
Skip
Name: i
Description: minmax GRANULARITY 1
Parts: 1/1
Granules: 1/1
Expression ((Project names + Projection))
Filter ((WHERE + Change column names to column identifiers))
ReadFromMergeTree (02911_support_alias_column_in_indices.test2)
Indexes:
PrimaryKey
Keys:
c
Condition: (_CAST(plus(_CAST(plus(c, \'UInt64\'), 1), \'UInt64\'), 1) in [16, +Inf))
Parts: 1/2
Granules: 1/2

View File

@ -1,34 +0,0 @@
-- Tags: no-parallel
drop database if exists 02911_support_alias_column_in_indices;
create database 02911_support_alias_column_in_indices;
use 02911_support_alias_column_in_indices;
create table test1
(
c UInt32,
a alias c + 1,
index i (a) type minmax
) engine = MergeTree order by c;
insert into test1 select * from numbers(10);
insert into test1 select * from numbers(11, 20);
explain indexes = 1 select * from test1 where a > 10 settings allow_experimental_analyzer = 0;
explain indexes = 1 select * from test1 where a > 10 settings allow_experimental_analyzer = 1;
create table test2
(
c UInt32,
a1 alias c + 1,
a2 alias a1 + 1,
index i (a2) type minmax
) engine = MergeTree order by c;
insert into test2 select * from numbers(10);
insert into test2 select * from numbers(11, 20);
explain indexes = 1 select * from test2 where a2 > 15 settings allow_experimental_analyzer = 0;
explain indexes = 1 select * from test2 where a2 > 15 settings allow_experimental_analyzer = 1; -- buggy, analyzer does not pick up index i
drop database 02911_support_alias_column_in_indices;

View File

@ -0,0 +1 @@
{"id":1,"foo":["bar"]} {"id":1,"foo":["bar","baz"]}

View File

@ -0,0 +1,3 @@
-- Tags: no-fasttest
select '{"id":1,"foo":["bar"]}' as a, jsonMergePatch(a,toJSONString(map('foo',arrayPushBack(arrayMap(x->JSONExtractString(x),JSONExtractArrayRaw(a, 'foo')),'baz')))) as b;

View File

@ -1,13 +0,0 @@
-- negative tests
-- const UInt*
Uk
XMbT
86Rf07
Td1EnWQo
XMbT
-- non-const UInt*
Uk
XMbT
86Rf07
Td1EnWQo
XMbT

View File

@ -1,22 +0,0 @@
-- Tags: no-fasttest
SET allow_experimental_hash_functions = 1;
SET allow_suspicious_low_cardinality_types = 1;
SELECT '-- negative tests';
SELECT sqid(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
SELECT sqid('1'); -- { serverError ILLEGAL_TYPE_OF_ARGUMENT }
SELECT '-- const UInt*';
SELECT sqid(1);
SELECT sqid(1, 2);
SELECT sqid(1, 2, 3);
SELECT sqid(1::UInt8, 2::UInt16, 3::UInt32, 4::UInt64);
SELECT sqid(toNullable(1), toLowCardinality(2));
SELECT '-- non-const UInt*';
SELECT sqid(materialize(1));
SELECT sqid(materialize(1), materialize(2));
SELECT sqid(materialize(1), materialize(2), materialize(3));
SELECT sqid(materialize(1::UInt8), materialize(2::UInt16), materialize(3::UInt32), materialize(4::UInt64));
SELECT sqid(toNullable(materialize(1)), toLowCardinality(materialize(2)));

View File

@ -0,0 +1,39 @@
DROP TABLE IF EXISTS test_max_mt_projections_alter;
CREATE TABLE test_max_mt_projections_alter (c1 UInt32, c2 UInt32, c3 UInt32)
ENGINE = MergeTree ORDER BY c1
SETTINGS max_projections = 3;
ALTER TABLE test_max_mt_projections_alter ADD PROJECTION p1 (SELECT c2 ORDER BY c2);
ALTER TABLE test_max_mt_projections_alter ADD PROJECTION p2 (SELECT c3 ORDER BY c3);
ALTER TABLE test_max_mt_projections_alter ADD PROJECTION p3 (SELECT c1, c2 ORDER BY c1, c2);
ALTER TABLE test_max_mt_projections_alter
ADD PROJECTION p4 (SELECT c2, c3 ORDER BY c2, c3); -- { serverError LIMIT_EXCEEDED }
ALTER TABLE test_max_mt_projections_alter DROP PROJECTION p3;
ALTER TABLE test_max_mt_projections_alter ADD PROJECTION p4 (SELECT c2, c3 ORDER BY c2, c3);
DROP TABLE IF EXISTS test_max_mt_projections_alter;
DROP TABLE IF EXISTS test_max_mt_projections_create;
CREATE TABLE test_max_mt_projections_create (c1 UInt32, c2 UInt32,
PROJECTION p1 (SELECT c1, c2 ORDER BY c2),
PROJECTION p2 (SELECT c2 ORDER BY c2))
ENGINE = MergeTree ORDER BY c1
SETTINGS max_projections = 1; -- { serverError LIMIT_EXCEEDED }
CREATE TABLE test_max_mt_projections_create (c1 UInt32, c2 UInt32,
PROJECTION p (SELECT c1, c2 ORDER BY c2))
ENGINE = MergeTree ORDER BY c1
SETTINGS max_projections = 0; -- { serverError LIMIT_EXCEEDED }
CREATE TABLE test_max_mt_projections_create (c1 UInt32, c2 UInt32,
PROJECTION p (SELECT c1, c2 ORDER BY c2))
ENGINE = MergeTree ORDER BY c1
SETTINGS max_projections = 1;
ALTER TABLE test_max_mt_projections_create
ADD PROJECTION p2 (SELECT c2 ORDER BY c2); -- { serverError LIMIT_EXCEEDED }
DROP TABLE IF EXISTS test_max_mt_projections_create;

View File

@ -1572,7 +1572,6 @@ groupArrayLast
groupArrayMovingAvg
groupArrayMovingSum
groupArraySample
groupArraySorted
groupBitAnd
groupBitOr
groupBitXor
@ -1587,7 +1586,6 @@ grouparraylast
grouparraymovingavg
grouparraymovingsum
grouparraysample
grouparraysorted
groupbitand
groupbitmap
groupbitmapand
@ -1791,7 +1789,6 @@ logTrace
logagent
loghouse
london
lookups
lowcardinality
lowerUTF
lowercased
@ -2234,7 +2231,6 @@ seektable
sequenceCount
sequenceMatch
sequenceNextNode
seriesPeriodDetectFFT
serverTimeZone
serverTimezone
serverUUID
@ -2278,7 +2274,6 @@ splitByRegexp
splitByString
splitByWhitespace
splitby
sqid
sql
sqlalchemy
sqlinsert