mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Merge branch 'master' into fix-creating-set-for-storage-fuzz
This commit is contained in:
commit
e78f3389b7
@ -2,21 +2,23 @@
|
||||
|
||||
#include <base/strong_typedef.h>
|
||||
#include <base/extended_types.h>
|
||||
#include <Common/formatIPv6.h>
|
||||
#include <Common/memcmpSmall.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using IPv4 = StrongTypedef<UInt32, struct IPv4Tag>;
|
||||
struct IPv4 : StrongTypedef<UInt32, struct IPv4Tag>
|
||||
{
|
||||
using StrongTypedef::StrongTypedef;
|
||||
using StrongTypedef::operator=;
|
||||
constexpr explicit IPv4(UInt64 value): StrongTypedef(static_cast<UnderlyingType>(value)) {}
|
||||
};
|
||||
|
||||
struct IPv6 : StrongTypedef<UInt128, struct IPv6Tag>
|
||||
{
|
||||
constexpr IPv6() = default;
|
||||
constexpr explicit IPv6(const UInt128 & x) : StrongTypedef(x) {}
|
||||
constexpr explicit IPv6(UInt128 && x) : StrongTypedef(std::move(x)) {}
|
||||
|
||||
IPv6 & operator=(const UInt128 & rhs) { StrongTypedef::operator=(rhs); return *this; }
|
||||
IPv6 & operator=(UInt128 && rhs) { StrongTypedef::operator=(std::move(rhs)); return *this; }
|
||||
using StrongTypedef::StrongTypedef;
|
||||
using StrongTypedef::operator=;
|
||||
|
||||
bool operator<(const IPv6 & rhs) const
|
||||
{
|
||||
@ -54,12 +56,22 @@ namespace DB
|
||||
|
||||
namespace std
|
||||
{
|
||||
/// For historical reasons we hash IPv6 as a FixedString(16)
|
||||
template <>
|
||||
struct hash<DB::IPv6>
|
||||
{
|
||||
size_t operator()(const DB::IPv6 & x) const
|
||||
{
|
||||
return std::hash<DB::IPv6::UnderlyingType>()(x.toUnderType());
|
||||
return std::hash<std::string_view>{}(std::string_view(reinterpret_cast<const char*>(&x.toUnderType()), IPV6_BINARY_LENGTH));
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct hash<DB::IPv4>
|
||||
{
|
||||
size_t operator()(const DB::IPv4 & x) const
|
||||
{
|
||||
return std::hash<DB::IPv4::UnderlyingType>()(x.toUnderType());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ RUN arch=${TARGETARCH:-amd64} \
|
||||
&& dpkg -i /tmp/nfpm.deb \
|
||||
&& rm /tmp/nfpm.deb
|
||||
|
||||
ARG GO_VERSION=1.19.5
|
||||
ARG GO_VERSION=1.19.10
|
||||
# We need go for clickhouse-diagnostics
|
||||
RUN arch=${TARGETARCH:-amd64} \
|
||||
&& curl -Lo /tmp/go.tgz "https://go.dev/dl/go${GO_VERSION}.linux-${arch}.tar.gz" \
|
||||
|
@ -1,4 +1,4 @@
|
||||
FROM ubuntu:22.04
|
||||
FROM ubuntu:20.04
|
||||
|
||||
# see https://github.com/moby/moby/issues/4032#issuecomment-192327844
|
||||
ARG DEBIAN_FRONTEND=noninteractive
|
||||
@ -11,17 +11,18 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
|
||||
&& apt-get update \
|
||||
&& apt-get upgrade -yq \
|
||||
&& apt-get install --yes --no-install-recommends \
|
||||
apt-transport-https \
|
||||
ca-certificates \
|
||||
dirmngr \
|
||||
gnupg2 \
|
||||
wget \
|
||||
locales \
|
||||
tzdata \
|
||||
&& apt-get clean
|
||||
wget \
|
||||
&& apt-get clean \
|
||||
&& rm -rf \
|
||||
/var/lib/apt/lists/* \
|
||||
/var/cache/debconf \
|
||||
/tmp/*
|
||||
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
|
||||
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
|
||||
ARG VERSION="23.5.3.24"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
@ -43,7 +44,8 @@ ARG single_binary_location_url=""
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
RUN arch=${TARGETARCH:-amd64} \
|
||||
# install from a web location with deb packages
|
||||
RUN arch="${TARGETARCH:-amd64}" \
|
||||
&& if [ -n "${deb_location_url}" ]; then \
|
||||
echo "installing from custom url with deb packages: ${deb_location_url}" \
|
||||
rm -rf /tmp/clickhouse_debs \
|
||||
@ -54,38 +56,54 @@ RUN arch=${TARGETARCH:-amd64} \
|
||||
|| exit 1 \
|
||||
; done \
|
||||
&& dpkg -i /tmp/clickhouse_debs/*.deb ; \
|
||||
elif [ -n "${single_binary_location_url}" ]; then \
|
||||
fi
|
||||
|
||||
# install from a single binary
|
||||
RUN if [ -n "${single_binary_location_url}" ]; then \
|
||||
echo "installing from single binary url: ${single_binary_location_url}" \
|
||||
&& rm -rf /tmp/clickhouse_binary \
|
||||
&& mkdir -p /tmp/clickhouse_binary \
|
||||
&& wget --progress=bar:force:noscroll "${single_binary_location_url}" -O /tmp/clickhouse_binary/clickhouse \
|
||||
&& chmod +x /tmp/clickhouse_binary/clickhouse \
|
||||
&& /tmp/clickhouse_binary/clickhouse install --user "clickhouse" --group "clickhouse" ; \
|
||||
else \
|
||||
mkdir -p /etc/apt/sources.list.d \
|
||||
&& apt-key adv --keyserver keyserver.ubuntu.com --recv 8919F6BD2B48D754 \
|
||||
&& echo ${REPOSITORY} > /etc/apt/sources.list.d/clickhouse.list \
|
||||
fi
|
||||
|
||||
# A fallback to installation from ClickHouse repository
|
||||
RUN if ! clickhouse local -q "SELECT ''" > /dev/null; then \
|
||||
apt-get update \
|
||||
&& apt-get install --yes --no-install-recommends \
|
||||
apt-transport-https \
|
||||
ca-certificates \
|
||||
dirmngr \
|
||||
gnupg2 \
|
||||
&& mkdir -p /etc/apt/sources.list.d \
|
||||
&& GNUPGHOME=$(mktemp -d) \
|
||||
&& GNUPGHOME="$GNUPGHOME" gpg --no-default-keyring \
|
||||
--keyring /usr/share/keyrings/clickhouse-keyring.gpg \
|
||||
--keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 8919F6BD2B48D754 \
|
||||
&& rm -r "$GNUPGHOME" \
|
||||
&& chmod +r /usr/share/keyrings/clickhouse-keyring.gpg \
|
||||
&& echo "${REPOSITORY}" > /etc/apt/sources.list.d/clickhouse.list \
|
||||
&& echo "installing from repository: ${REPOSITORY}" \
|
||||
&& apt-get update \
|
||||
&& apt-get --yes -o "Dpkg::Options::=--force-confdef" -o "Dpkg::Options::=--force-confold" upgrade \
|
||||
&& for package in ${PACKAGES}; do \
|
||||
packages="${packages} ${package}=${VERSION}" \
|
||||
; done \
|
||||
&& apt-get install --allow-unauthenticated --yes --no-install-recommends ${packages} || exit 1 \
|
||||
; fi \
|
||||
&& clickhouse-local -q 'SELECT * FROM system.build_options' \
|
||||
&& rm -rf \
|
||||
/var/lib/apt/lists/* \
|
||||
/var/cache/debconf \
|
||||
/tmp/* \
|
||||
&& mkdir -p /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client \
|
||||
&& chmod ugo+Xrw -R /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client
|
||||
|
||||
RUN apt-get autoremove --purge -yq libksba8 && \
|
||||
apt-get autoremove -yq
|
||||
&& apt-get autoremove --purge -yq libksba8 \
|
||||
&& apt-get autoremove -yq \
|
||||
; fi
|
||||
|
||||
# post install
|
||||
# we need to allow "others" access to clickhouse folder, because docker container
|
||||
# can be started with arbitrary uid (openshift usecase)
|
||||
RUN clickhouse-local -q 'SELECT * FROM system.build_options' \
|
||||
&& mkdir -p /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client \
|
||||
&& chmod ugo+Xrw -R /var/lib/clickhouse /var/log/clickhouse-server /etc/clickhouse-server /etc/clickhouse-client
|
||||
|
||||
RUN locale-gen en_US.UTF-8
|
||||
ENV LANG en_US.UTF-8
|
||||
|
@ -20,7 +20,6 @@ For more information and documentation see https://clickhouse.com/.
|
||||
|
||||
- The amd64 image requires support for [SSE3 instructions](https://en.wikipedia.org/wiki/SSE3). Virtually all x86 CPUs after 2005 support SSE3.
|
||||
- The arm64 image requires support for the [ARMv8.2-A architecture](https://en.wikipedia.org/wiki/AArch64#ARMv8.2-A). Most ARM CPUs after 2017 support ARMv8.2-A. A notable exception is Raspberry Pi 4 from 2019 whose CPU only supports ARMv8.0-A.
|
||||
- Since the Clickhouse 23.3 Ubuntu image started using `ubuntu:22.04` as its base image, it requires docker version >= `20.10.10`, or use `docker run -- privileged` instead. Alternatively, try the Clickhouse Alpine image.
|
||||
|
||||
## How to use this image
|
||||
|
||||
|
@ -2454,18 +2454,22 @@ In this format, all input data is read to a single value. It is possible to pars
|
||||
The result is output in binary format without delimiters and escaping. If more than one value is output, the format is ambiguous, and it will be impossible to read the data back.
|
||||
|
||||
Below is a comparison of the formats `RawBLOB` and [TabSeparatedRaw](#tabseparatedraw).
|
||||
|
||||
`RawBLOB`:
|
||||
- data is output in binary format, no escaping;
|
||||
- there are no delimiters between values;
|
||||
- no newline at the end of each value.
|
||||
[TabSeparatedRaw] (#tabseparatedraw):
|
||||
|
||||
`TabSeparatedRaw`:
|
||||
- data is output without escaping;
|
||||
- the rows contain values separated by tabs;
|
||||
- there is a line feed after the last value in every row.
|
||||
|
||||
The following is a comparison of the `RawBLOB` and [RowBinary](#rowbinary) formats.
|
||||
|
||||
`RawBLOB`:
|
||||
- String fields are output without being prefixed by length.
|
||||
|
||||
`RowBinary`:
|
||||
- String fields are represented as length in varint format (unsigned [LEB128] (https://en.wikipedia.org/wiki/LEB128)), followed by the bytes of the string.
|
||||
|
||||
|
@ -97,6 +97,10 @@ Result:
|
||||
|
||||
If you apply this combinator, the aggregate function does not return the resulting value (such as the number of unique values for the [uniq](../../sql-reference/aggregate-functions/reference/uniq.md#agg_function-uniq) function), but an intermediate state of the aggregation (for `uniq`, this is the hash table for calculating the number of unique values). This is an `AggregateFunction(...)` that can be used for further processing or stored in a table to finish aggregating later.
|
||||
|
||||
:::note
|
||||
Please notice, that -MapState is not an invariant for the same data due to the fact that order of data in intermediate state can change, though it doesn't impact ingestion of this data.
|
||||
:::
|
||||
|
||||
To work with these states, use:
|
||||
|
||||
- [AggregatingMergeTree](../../engines/table-engines/mergetree-family/aggregatingmergetree.md) table engine.
|
||||
|
@ -21,6 +21,9 @@ Expressions from `ON` clause and columns from `USING` clause are called “join
|
||||
## Related Content
|
||||
|
||||
- Blog: [ClickHouse: A Blazingly Fast DBMS with Full SQL Join Support - Part 1](https://clickhouse.com/blog/clickhouse-fully-supports-joins)
|
||||
- Blog: [ClickHouse: A Blazingly Fast DBMS with Full SQL Join Support - Under the Hood - Part 2](https://clickhouse.com/blog/clickhouse-fully-supports-joins-hash-joins-part2)
|
||||
- Blog: [ClickHouse: A Blazingly Fast DBMS with Full SQL Join Support - Under the Hood - Part 3](https://clickhouse.com/blog/clickhouse-fully-supports-joins-full-sort-partial-merge-part3)
|
||||
- Blog: [ClickHouse: A Blazingly Fast DBMS with Full SQL Join Support - Under the Hood - Part 4](https://clickhouse.com/blog/clickhouse-fully-supports-joins-direct-join-part4)
|
||||
|
||||
## Supported Types of JOIN
|
||||
|
||||
|
@ -66,6 +66,10 @@ WITH anySimpleState(number) AS c SELECT toTypeName(c), c FROM numbers(1);
|
||||
|
||||
В случае применения этого комбинатора, агрегатная функция возвращает не готовое значение (например, в случае функции [uniq](reference/uniq.md#agg_function-uniq) — количество уникальных значений), а промежуточное состояние агрегации (например, в случае функции `uniq` — хэш-таблицу для расчёта количества уникальных значений), которое имеет тип `AggregateFunction(...)` и может использоваться для дальнейшей обработки или может быть сохранено в таблицу для последующей доагрегации.
|
||||
|
||||
:::note
|
||||
Промежуточное состояние для -MapState не является инвариантом для одних и тех же исходных данные т.к. порядок данных может меняться. Это не влияет, тем не менее, на загрузку таких данных.
|
||||
:::
|
||||
|
||||
Для работы с промежуточными состояниями предназначены:
|
||||
|
||||
- Движок таблиц [AggregatingMergeTree](../../engines/table-engines/mergetree-family/aggregatingmergetree.md).
|
||||
|
@ -25,6 +25,7 @@ IAggregateFunction * createWithNumericOrTimeType(const IDataType & argument_type
|
||||
WhichDataType which(argument_type);
|
||||
if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate<UInt16, Data>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate<UInt32, Data>(std::forward<TArgs>(args)...);
|
||||
if (which.idx == TypeIndex::IPv4) return new AggregateFunctionTemplate<IPv4, Data>(std::forward<TArgs>(args)...);
|
||||
return createWithNumericType<AggregateFunctionTemplate, Data, TArgs...>(argument_type, std::forward<TArgs>(args)...);
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <AggregateFunctions/FactoryHelpers.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeIPv4andIPv6.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -39,12 +40,22 @@ public:
|
||||
static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>()); }
|
||||
};
|
||||
|
||||
template <typename HasLimit>
|
||||
class AggregateFunctionGroupUniqArrayIPv4 : public AggregateFunctionGroupUniqArray<DataTypeIPv4::FieldType, HasLimit>
|
||||
{
|
||||
public:
|
||||
explicit AggregateFunctionGroupUniqArrayIPv4(const DataTypePtr & argument_type, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
|
||||
: AggregateFunctionGroupUniqArray<DataTypeIPv4::FieldType, HasLimit>(argument_type, parameters_, createResultType(), max_elems_) {}
|
||||
static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeIPv4>()); }
|
||||
};
|
||||
|
||||
template <typename HasLimit, typename ... TArgs>
|
||||
IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type, TArgs && ... args)
|
||||
{
|
||||
WhichDataType which(argument_type);
|
||||
if (which.idx == TypeIndex::Date) return new AggregateFunctionGroupUniqArrayDate<HasLimit>(argument_type, std::forward<TArgs>(args)...);
|
||||
else if (which.idx == TypeIndex::DateTime) return new AggregateFunctionGroupUniqArrayDateTime<HasLimit>(argument_type, std::forward<TArgs>(args)...);
|
||||
else if (which.idx == TypeIndex::IPv4) return new AggregateFunctionGroupUniqArrayIPv4<HasLimit>(argument_type, std::forward<TArgs>(args)...);
|
||||
else
|
||||
{
|
||||
/// Check that we can use plain version of AggregateFunctionGroupUniqArrayGeneric
|
||||
|
@ -100,6 +100,10 @@ public:
|
||||
return std::make_shared<AggregateFunctionMap<UInt256>>(nested_function, arguments);
|
||||
case TypeIndex::UUID:
|
||||
return std::make_shared<AggregateFunctionMap<UUID>>(nested_function, arguments);
|
||||
case TypeIndex::IPv4:
|
||||
return std::make_shared<AggregateFunctionMap<IPv4>>(nested_function, arguments);
|
||||
case TypeIndex::IPv6:
|
||||
return std::make_shared<AggregateFunctionMap<IPv6>>(nested_function, arguments);
|
||||
case TypeIndex::FixedString:
|
||||
case TypeIndex::String:
|
||||
return std::make_shared<AggregateFunctionMap<String>>(nested_function, arguments);
|
||||
|
@ -19,7 +19,9 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include "DataTypes/Serializations/ISerialization.h"
|
||||
#include <base/IPv4andIPv6.h>
|
||||
#include "base/types.h"
|
||||
#include <Common/formatIPv6.h>
|
||||
#include <Common/Arena.h>
|
||||
#include "AggregateFunctions/AggregateFunctionFactory.h"
|
||||
|
||||
@ -69,6 +71,31 @@ struct AggregateFunctionMapCombinatorData<String>
|
||||
}
|
||||
};
|
||||
|
||||
/// Specialization for IPv6 - for historical reasons it should be stored as FixedString(16)
|
||||
template <>
|
||||
struct AggregateFunctionMapCombinatorData<IPv6>
|
||||
{
|
||||
struct IPv6Hash
|
||||
{
|
||||
using hash_type = std::hash<IPv6>;
|
||||
using is_transparent = void;
|
||||
|
||||
size_t operator()(const IPv6 & ip) const { return hash_type{}(ip); }
|
||||
};
|
||||
|
||||
using SearchType = IPv6;
|
||||
std::unordered_map<IPv6, AggregateDataPtr, IPv6Hash, std::equal_to<>> merged_maps;
|
||||
|
||||
static void writeKey(const IPv6 & key, WriteBuffer & buf)
|
||||
{
|
||||
writeIPv6Binary(key, buf);
|
||||
}
|
||||
static void readKey(IPv6 & key, ReadBuffer & buf)
|
||||
{
|
||||
readIPv6Binary(key, buf);
|
||||
}
|
||||
};
|
||||
|
||||
template <typename KeyType>
|
||||
class AggregateFunctionMap final
|
||||
: public IAggregateFunctionDataHelper<AggregateFunctionMapCombinatorData<KeyType>, AggregateFunctionMap<KeyType>>
|
||||
@ -147,6 +174,8 @@ public:
|
||||
StringRef key_ref;
|
||||
if (key_type->getTypeId() == TypeIndex::FixedString)
|
||||
key_ref = assert_cast<const ColumnFixedString &>(key_column).getDataAt(offset + i);
|
||||
else if (key_type->getTypeId() == TypeIndex::IPv6)
|
||||
key_ref = assert_cast<const ColumnIPv6 &>(key_column).getDataAt(offset + i);
|
||||
else
|
||||
key_ref = assert_cast<const ColumnString &>(key_column).getDataAt(offset + i);
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/FieldVisitorConvertToNumber.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeIPv4andIPv6.h>
|
||||
|
||||
|
||||
static inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF;
|
||||
@ -60,6 +61,22 @@ public:
|
||||
{}
|
||||
};
|
||||
|
||||
template <bool is_weighted>
|
||||
class AggregateFunctionTopKIPv4 : public AggregateFunctionTopK<DataTypeIPv4::FieldType, is_weighted>
|
||||
{
|
||||
public:
|
||||
using AggregateFunctionTopK<DataTypeIPv4::FieldType, is_weighted>::AggregateFunctionTopK;
|
||||
|
||||
AggregateFunctionTopKIPv4(UInt64 threshold_, UInt64 load_factor, const DataTypes & argument_types_, const Array & params)
|
||||
: AggregateFunctionTopK<DataTypeIPv4::FieldType, is_weighted>(
|
||||
threshold_,
|
||||
load_factor,
|
||||
argument_types_,
|
||||
params,
|
||||
std::make_shared<DataTypeArray>(std::make_shared<DataTypeIPv4>()))
|
||||
{}
|
||||
};
|
||||
|
||||
|
||||
template <bool is_weighted>
|
||||
IAggregateFunction * createWithExtraTypes(const DataTypes & argument_types, UInt64 threshold, UInt64 load_factor, const Array & params)
|
||||
@ -72,6 +89,8 @@ IAggregateFunction * createWithExtraTypes(const DataTypes & argument_types, UInt
|
||||
return new AggregateFunctionTopKDate<is_weighted>(threshold, load_factor, argument_types, params);
|
||||
if (which.idx == TypeIndex::DateTime)
|
||||
return new AggregateFunctionTopKDateTime<is_weighted>(threshold, load_factor, argument_types, params);
|
||||
if (which.idx == TypeIndex::IPv4)
|
||||
return new AggregateFunctionTopKIPv4<is_weighted>(threshold, load_factor, argument_types, params);
|
||||
|
||||
/// Check that we can use plain version of AggregateFunctionTopKGeneric
|
||||
if (argument_types[0]->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypeIPv4andIPv6.h>
|
||||
|
||||
#include <Core/Settings.h>
|
||||
|
||||
@ -60,6 +61,10 @@ createAggregateFunctionUniq(const std::string & name, const DataTypes & argument
|
||||
return std::make_shared<AggregateFunctionUniq<String, Data>>(argument_types);
|
||||
else if (which.isUUID())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data>>(argument_types);
|
||||
else if (which.isIPv4())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeIPv4::FieldType, Data>>(argument_types);
|
||||
else if (which.isIPv6())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeIPv6::FieldType, Data>>(argument_types);
|
||||
else if (which.isTuple())
|
||||
{
|
||||
if (use_exact_hash_function)
|
||||
@ -109,6 +114,10 @@ createAggregateFunctionUniq(const std::string & name, const DataTypes & argument
|
||||
return std::make_shared<AggregateFunctionUniq<String, Data<String, is_able_to_parallelize_merge>>>(argument_types);
|
||||
else if (which.isUUID())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeUUID::FieldType, Data<DataTypeUUID::FieldType, is_able_to_parallelize_merge>>>(argument_types);
|
||||
else if (which.isIPv4())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeIPv4::FieldType, Data<DataTypeIPv4::FieldType, is_able_to_parallelize_merge>>>(argument_types);
|
||||
else if (which.isIPv6())
|
||||
return std::make_shared<AggregateFunctionUniq<DataTypeIPv6::FieldType, Data<DataTypeIPv6::FieldType, is_able_to_parallelize_merge>>>(argument_types);
|
||||
else if (which.isTuple())
|
||||
{
|
||||
if (use_exact_hash_function)
|
||||
|
@ -101,6 +101,18 @@ struct AggregateFunctionUniqHLL12Data<UUID, false>
|
||||
static String getName() { return "uniqHLL12"; }
|
||||
};
|
||||
|
||||
template <>
|
||||
struct AggregateFunctionUniqHLL12Data<IPv6, false>
|
||||
{
|
||||
using Set = HyperLogLogWithSmallSetOptimization<UInt64, 16, 12>;
|
||||
Set set;
|
||||
|
||||
constexpr static bool is_able_to_parallelize_merge = false;
|
||||
constexpr static bool is_variadic = false;
|
||||
|
||||
static String getName() { return "uniqHLL12"; }
|
||||
};
|
||||
|
||||
template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_merge_>
|
||||
struct AggregateFunctionUniqHLL12DataForVariadic
|
||||
{
|
||||
@ -155,6 +167,25 @@ struct AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
|
||||
static String getName() { return "uniqExact"; }
|
||||
};
|
||||
|
||||
/// For historical reasons IPv6 is treated as FixedString(16)
|
||||
template <bool is_able_to_parallelize_merge_>
|
||||
struct AggregateFunctionUniqExactData<IPv6, is_able_to_parallelize_merge_>
|
||||
{
|
||||
using Key = UInt128;
|
||||
|
||||
/// When creating, the hash table must be small.
|
||||
using SingleLevelSet = HashSet<Key, UInt128TrivialHash, HashTableGrower<3>, HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;
|
||||
using TwoLevelSet = TwoLevelHashSet<Key, UInt128TrivialHash>;
|
||||
using Set = UniqExactSet<SingleLevelSet, TwoLevelSet>;
|
||||
|
||||
Set set;
|
||||
|
||||
constexpr static bool is_able_to_parallelize_merge = is_able_to_parallelize_merge_;
|
||||
constexpr static bool is_variadic = false;
|
||||
|
||||
static String getName() { return "uniqExact"; }
|
||||
};
|
||||
|
||||
template <bool is_exact_, bool argument_is_tuple_, bool is_able_to_parallelize_merge_>
|
||||
struct AggregateFunctionUniqExactDataForVariadic : AggregateFunctionUniqExactData<String, is_able_to_parallelize_merge_>
|
||||
{
|
||||
@ -248,27 +279,22 @@ struct Adder
|
||||
AggregateFunctionUniqUniquesHashSetData> || std::is_same_v<Data, AggregateFunctionUniqHLL12Data<T, Data::is_able_to_parallelize_merge>>)
|
||||
{
|
||||
const auto & column = *columns[0];
|
||||
if constexpr (!std::is_same_v<T, String>)
|
||||
if constexpr (std::is_same_v<T, String> || std::is_same_v<T, IPv6>)
|
||||
{
|
||||
StringRef value = column.getDataAt(row_num);
|
||||
data.set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size));
|
||||
}
|
||||
else
|
||||
{
|
||||
using ValueType = typename decltype(data.set)::value_type;
|
||||
const auto & value = assert_cast<const ColumnVector<T> &>(column).getElement(row_num);
|
||||
data.set.insert(static_cast<ValueType>(AggregateFunctionUniqTraits<T>::hash(value)));
|
||||
}
|
||||
else
|
||||
{
|
||||
StringRef value = column.getDataAt(row_num);
|
||||
data.set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size));
|
||||
}
|
||||
}
|
||||
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqExactData<T, Data::is_able_to_parallelize_merge>>)
|
||||
{
|
||||
const auto & column = *columns[0];
|
||||
if constexpr (!std::is_same_v<T, String>)
|
||||
{
|
||||
data.set.template insert<const T &, use_single_level_hash_table>(
|
||||
assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
|
||||
}
|
||||
else
|
||||
if constexpr (std::is_same_v<T, String> || std::is_same_v<T, IPv6>)
|
||||
{
|
||||
StringRef value = column.getDataAt(row_num);
|
||||
|
||||
@ -279,6 +305,11 @@ struct Adder
|
||||
|
||||
data.set.template insert<const UInt128 &, use_single_level_hash_table>(key);
|
||||
}
|
||||
else
|
||||
{
|
||||
data.set.template insert<const T &, use_single_level_hash_table>(
|
||||
assert_cast<const ColumnVector<T> &>(column).getData()[row_num]);
|
||||
}
|
||||
}
|
||||
#if USE_DATASKETCHES
|
||||
else if constexpr (std::is_same_v<Data, AggregateFunctionUniqThetaData>)
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <DataTypes/DataTypeDate32.h>
|
||||
#include <DataTypes/DataTypeDateTime.h>
|
||||
#include <DataTypes/DataTypeIPv4andIPv6.h>
|
||||
|
||||
#include <functional>
|
||||
|
||||
@ -60,6 +61,10 @@ namespace
|
||||
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<String>>(argument_types, params);
|
||||
else if (which.isUUID())
|
||||
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeUUID::FieldType>>(argument_types, params);
|
||||
else if (which.isIPv4())
|
||||
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeIPv4::FieldType>>(argument_types, params);
|
||||
else if (which.isIPv6())
|
||||
return std::make_shared<typename WithK<K, HashValueType>::template AggregateFunction<DataTypeIPv6::FieldType>>(argument_types, params);
|
||||
else if (which.isTuple())
|
||||
{
|
||||
if (use_exact_hash_function)
|
||||
|
@ -119,6 +119,10 @@ struct AggregateFunctionUniqCombinedData<String, K, HashValueType> : public Aggr
|
||||
{
|
||||
};
|
||||
|
||||
template <UInt8 K, typename HashValueType>
|
||||
struct AggregateFunctionUniqCombinedData<IPv6, K, HashValueType> : public AggregateFunctionUniqCombinedDataWithKey<UInt64 /*always*/, K>
|
||||
{
|
||||
};
|
||||
|
||||
template <typename T, UInt8 K, typename HashValueType>
|
||||
class AggregateFunctionUniqCombined final
|
||||
@ -141,16 +145,16 @@ public:
|
||||
|
||||
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
|
||||
{
|
||||
if constexpr (!std::is_same_v<T, String>)
|
||||
{
|
||||
const auto & value = assert_cast<const ColumnVector<T> &>(*columns[0]).getElement(row_num);
|
||||
this->data(place).set.insert(detail::AggregateFunctionUniqCombinedTraits<T, HashValueType>::hash(value));
|
||||
}
|
||||
else
|
||||
if constexpr (std::is_same_v<T, String> || std::is_same_v<T, IPv6>)
|
||||
{
|
||||
StringRef value = columns[0]->getDataAt(row_num);
|
||||
this->data(place).set.insert(CityHash_v1_0_2::CityHash64(value.data, value.size));
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & value = assert_cast<const ColumnVector<T> &>(*columns[0]).getElement(row_num);
|
||||
this->data(place).set.insert(detail::AggregateFunctionUniqCombinedTraits<T, HashValueType>::hash(value));
|
||||
}
|
||||
}
|
||||
|
||||
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
|
||||
|
@ -1175,16 +1175,12 @@ ProfileInfo Connection::receiveProfileInfo() const
|
||||
|
||||
ParallelReadRequest Connection::receiveParallelReadRequest() const
|
||||
{
|
||||
ParallelReadRequest request;
|
||||
request.deserialize(*in);
|
||||
return request;
|
||||
return ParallelReadRequest::deserialize(*in);
|
||||
}
|
||||
|
||||
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnounecement() const
|
||||
{
|
||||
InitialAllRangesAnnouncement announcement;
|
||||
announcement.deserialize(*in);
|
||||
return announcement;
|
||||
return InitialAllRangesAnnouncement::deserialize(*in);
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,6 +16,10 @@
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -34,9 +38,9 @@ struct Packet
|
||||
ProfileInfo profile_info;
|
||||
std::vector<UUID> part_uuids;
|
||||
|
||||
InitialAllRangesAnnouncement announcement;
|
||||
ParallelReadRequest request;
|
||||
ParallelReadResponse response;
|
||||
/// The part of parallel replicas protocol
|
||||
std::optional<InitialAllRangesAnnouncement> announcement;
|
||||
std::optional<ParallelReadRequest> request;
|
||||
|
||||
std::string server_timezone;
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <Common/logger_useful.h>
|
||||
#include "libaccel_config.h"
|
||||
#include <Common/MemorySanitizer.h>
|
||||
#include <base/scope_guard.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -34,6 +35,7 @@ DeflateQplJobHWPool::DeflateQplJobHWPool()
|
||||
// loop all configured workqueue size to get maximum job number.
|
||||
accfg_ctx * ctx_ptr = nullptr;
|
||||
auto ctx_status = accfg_new(&ctx_ptr);
|
||||
SCOPE_EXIT({ accfg_unref(ctx_ptr); });
|
||||
if (ctx_status == 0)
|
||||
{
|
||||
auto * dev_ptr = accfg_device_get_first(ctx_ptr);
|
||||
|
@ -27,7 +27,7 @@ namespace DB
|
||||
|
||||
using UUID = StrongTypedef<UInt128, struct UUIDTag>;
|
||||
|
||||
using IPv4 = StrongTypedef<UInt32, struct IPv4Tag>;
|
||||
struct IPv4;
|
||||
|
||||
struct IPv6;
|
||||
|
||||
|
@ -69,7 +69,7 @@ void DataTypeMap::assertKeyType() const
|
||||
if (!checkKeyType(key_type))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Type of Map key must be a type, that can be represented by integer "
|
||||
"or String or FixedString (possibly LowCardinality) or UUID,"
|
||||
"or String or FixedString (possibly LowCardinality) or UUID or IPv6,"
|
||||
" but {} given", key_type->getName());
|
||||
}
|
||||
|
||||
@ -120,6 +120,7 @@ bool DataTypeMap::checkKeyType(DataTypePtr key_type)
|
||||
else if (!key_type->isValueRepresentedByInteger()
|
||||
&& !isStringOrFixedString(*key_type)
|
||||
&& !WhichDataType(key_type).isNothing()
|
||||
&& !WhichDataType(key_type).isIPv6()
|
||||
&& !WhichDataType(key_type).isUUID())
|
||||
{
|
||||
return false;
|
||||
|
@ -63,6 +63,7 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_DATA;
|
||||
extern const int TOO_LARGE_STRING_SIZE;
|
||||
extern const int TOO_LARGE_ARRAY_SIZE;
|
||||
extern const int SIZE_OF_FIXED_STRING_DOESNT_MATCH;
|
||||
}
|
||||
|
||||
/// Helper functions for formatted input.
|
||||
@ -138,6 +139,19 @@ inline void readStringBinary(std::string & s, ReadBuffer & buf, size_t max_strin
|
||||
buf.readStrict(s.data(), size);
|
||||
}
|
||||
|
||||
/// For historical reasons we store IPv6 as a String
|
||||
inline void readIPv6Binary(IPv6 & ip, ReadBuffer & buf)
|
||||
{
|
||||
size_t size = 0;
|
||||
readVarUInt(size, buf);
|
||||
|
||||
if (size != IPV6_BINARY_LENGTH)
|
||||
throw Exception(ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH,
|
||||
"Size of the string {} doesn't match size of binary IPv6 {}", size, IPV6_BINARY_LENGTH);
|
||||
|
||||
buf.readStrict(reinterpret_cast<char*>(&ip.toUnderType()), size);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void readVectorBinary(std::vector<T> & v, ReadBuffer & buf)
|
||||
{
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
#include <pcg-random/pcg_random.hpp>
|
||||
|
||||
#include "Common/formatIPv6.h"
|
||||
#include <Common/DateLUT.h>
|
||||
#include <Common/LocalDate.h>
|
||||
#include <Common/LocalDateTime.h>
|
||||
@ -105,6 +106,13 @@ inline void writeStringBinary(const std::string & s, WriteBuffer & buf)
|
||||
buf.write(s.data(), s.size());
|
||||
}
|
||||
|
||||
/// For historical reasons we store IPv6 as a String
|
||||
inline void writeIPv6Binary(const IPv6 & ip, WriteBuffer & buf)
|
||||
{
|
||||
writeVarUInt(IPV6_BINARY_LENGTH, buf);
|
||||
buf.write(reinterpret_cast<const char *>(&ip.toUnderType()), IPV6_BINARY_LENGTH);
|
||||
}
|
||||
|
||||
inline void writeStringBinary(StringRef s, WriteBuffer & buf)
|
||||
{
|
||||
writeVarUInt(s.size, buf);
|
||||
|
@ -216,8 +216,24 @@ void DatabaseCatalog::shutdownImpl()
|
||||
|
||||
/// We still hold "databases" (instead of std::move) for Buffer tables to flush data correctly.
|
||||
|
||||
/// Delay shutdown of temporary and system databases. They will be shutdown last.
|
||||
/// Because some databases might use them until their shutdown is called, but calling shutdown
|
||||
/// on temporary database means clearing its set of tables, which will lead to unnecessary errors like "table not found".
|
||||
std::vector<DatabasePtr> databases_with_delayed_shutdown;
|
||||
for (auto & database : current_databases)
|
||||
{
|
||||
if (database.first == TEMPORARY_DATABASE || database.first == SYSTEM_DATABASE)
|
||||
{
|
||||
databases_with_delayed_shutdown.push_back(database.second);
|
||||
continue;
|
||||
}
|
||||
database.second->shutdown();
|
||||
}
|
||||
|
||||
for (auto & database : databases_with_delayed_shutdown)
|
||||
{
|
||||
database->shutdown();
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(tables_marked_dropped_mutex);
|
||||
|
@ -434,11 +434,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
|
||||
switch (packet.type)
|
||||
{
|
||||
case Protocol::Server::MergeTreeReadTaskRequest:
|
||||
processMergeTreeReadTaskRequest(packet.request);
|
||||
chassert(packet.request.has_value());
|
||||
processMergeTreeReadTaskRequest(packet.request.value());
|
||||
return ReadResult(ReadResult::Type::ParallelReplicasToken);
|
||||
|
||||
case Protocol::Server::MergeTreeAllRangesAnnounecement:
|
||||
processMergeTreeInitialReadAnnounecement(packet.announcement);
|
||||
chassert(packet.announcement.has_value());
|
||||
processMergeTreeInitialReadAnnounecement(packet.announcement.value());
|
||||
return ReadResult(ReadResult::Type::ParallelReplicasToken);
|
||||
|
||||
case Protocol::Server::ReadTaskRequest:
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Storages/MergeTree/RequestResponse.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -433,8 +434,12 @@ MergeTreeReadTaskPtr MergeTreeReadPoolParallelReplicas::getTask(size_t thread)
|
||||
|
||||
if (buffered_ranges.empty())
|
||||
{
|
||||
auto result = extension.callback(ParallelReadRequest{
|
||||
.replica_num = extension.number_of_current_replica, .min_number_of_marks = min_marks_for_concurrent_read * threads});
|
||||
auto result = extension.callback(ParallelReadRequest(
|
||||
CoordinationMode::Default,
|
||||
extension.number_of_current_replica,
|
||||
min_marks_for_concurrent_read * threads,
|
||||
/// For Default coordination mode we don't need to pass part names.
|
||||
RangesInDataPartsDescription{}));
|
||||
|
||||
if (!result || result->finish)
|
||||
{
|
||||
@ -529,12 +534,12 @@ MarkRanges MergeTreeInOrderReadPoolParallelReplicas::getNewTask(RangesInDataPart
|
||||
if (no_more_tasks)
|
||||
return {};
|
||||
|
||||
auto response = extension.callback(ParallelReadRequest{
|
||||
.mode = mode,
|
||||
.replica_num = extension.number_of_current_replica,
|
||||
.min_number_of_marks = min_marks_for_concurrent_read * request.size(),
|
||||
.description = request,
|
||||
});
|
||||
auto response = extension.callback(ParallelReadRequest(
|
||||
mode,
|
||||
extension.number_of_current_replica,
|
||||
min_marks_for_concurrent_read * request.size(),
|
||||
request
|
||||
));
|
||||
|
||||
if (!response || response->description.empty() || response->finish)
|
||||
{
|
||||
|
@ -193,10 +193,11 @@ public:
|
||||
predict_block_size_bytes, column_names, virtual_column_names, prewhere_info,
|
||||
actions_settings, reader_settings, per_part_params);
|
||||
|
||||
extension.all_callback({
|
||||
.description = parts_ranges.getDescriptions(),
|
||||
.replica_num = extension.number_of_current_replica
|
||||
});
|
||||
extension.all_callback(InitialAllRangesAnnouncement(
|
||||
CoordinationMode::Default,
|
||||
parts_ranges.getDescriptions(),
|
||||
extension.number_of_current_replica
|
||||
));
|
||||
}
|
||||
|
||||
~MergeTreeReadPoolParallelReplicas() override;
|
||||
@ -253,10 +254,11 @@ public:
|
||||
for (const auto & part : parts_ranges)
|
||||
buffered_tasks.push_back({part.data_part->info, MarkRanges{}});
|
||||
|
||||
extension.all_callback({
|
||||
.description = parts_ranges.getDescriptions(),
|
||||
.replica_num = extension.number_of_current_replica
|
||||
});
|
||||
extension.all_callback(InitialAllRangesAnnouncement(
|
||||
mode,
|
||||
parts_ranges.getDescriptions(),
|
||||
extension.number_of_current_replica
|
||||
));
|
||||
}
|
||||
|
||||
MarkRanges getNewTask(RangesInDataPartDescription description);
|
||||
|
@ -102,7 +102,6 @@ public:
|
||||
|
||||
explicit DefaultCoordinator(size_t replicas_count_)
|
||||
: ParallelReplicasReadingCoordinator::ImplInterface(replicas_count_)
|
||||
, announcements(replicas_count_)
|
||||
, reading_state(replicas_count_)
|
||||
{
|
||||
}
|
||||
@ -119,7 +118,6 @@ public:
|
||||
PartitionToBlockRanges partitions;
|
||||
|
||||
size_t sent_initial_requests{0};
|
||||
std::vector<InitialAllRangesAnnouncement> announcements;
|
||||
|
||||
Parts all_parts_to_read;
|
||||
/// Contains only parts which we haven't started to read from
|
||||
|
@ -51,7 +51,7 @@ String ParallelReadRequest::describe() const
|
||||
return result;
|
||||
}
|
||||
|
||||
void ParallelReadRequest::deserialize(ReadBuffer & in)
|
||||
ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
|
||||
{
|
||||
UInt64 version;
|
||||
readIntBinary(version, in);
|
||||
@ -60,12 +60,24 @@ void ParallelReadRequest::deserialize(ReadBuffer & in)
|
||||
"from replicas differ. Got: {}, supported version: {}",
|
||||
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
|
||||
|
||||
CoordinationMode mode;
|
||||
size_t replica_num;
|
||||
size_t min_number_of_marks;
|
||||
RangesInDataPartsDescription description;
|
||||
|
||||
uint8_t mode_candidate;
|
||||
readIntBinary(mode_candidate, in);
|
||||
mode = validateAndGet(mode_candidate);
|
||||
readIntBinary(replica_num, in);
|
||||
readIntBinary(min_number_of_marks, in);
|
||||
description.deserialize(in);
|
||||
|
||||
return ParallelReadRequest(
|
||||
mode,
|
||||
replica_num,
|
||||
min_number_of_marks,
|
||||
std::move(description)
|
||||
);
|
||||
}
|
||||
|
||||
void ParallelReadRequest::merge(ParallelReadRequest & other)
|
||||
@ -125,7 +137,7 @@ String InitialAllRangesAnnouncement::describe()
|
||||
return result;
|
||||
}
|
||||
|
||||
void InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
|
||||
InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
|
||||
{
|
||||
UInt64 version;
|
||||
readIntBinary(version, in);
|
||||
@ -134,11 +146,21 @@ void InitialAllRangesAnnouncement::deserialize(ReadBuffer & in)
|
||||
"from replicas differ. Got: {}, supported version: {}",
|
||||
version, DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION);
|
||||
|
||||
CoordinationMode mode;
|
||||
RangesInDataPartsDescription description;
|
||||
size_t replica_num;
|
||||
|
||||
uint8_t mode_candidate;
|
||||
readIntBinary(mode_candidate, in);
|
||||
mode = validateAndGet(mode_candidate);
|
||||
description.deserialize(in);
|
||||
readIntBinary(replica_num, in);
|
||||
|
||||
return InitialAllRangesAnnouncement {
|
||||
mode,
|
||||
description,
|
||||
replica_num
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -40,21 +40,40 @@ struct PartBlockRange
|
||||
}
|
||||
};
|
||||
|
||||
/// ParallelReadRequest is used by remote replicas during parallel read
|
||||
/// to signal an initiator that they need more marks to read.
|
||||
struct ParallelReadRequest
|
||||
{
|
||||
/// No default constructor, you must initialize all fields at once.
|
||||
|
||||
ParallelReadRequest(
|
||||
CoordinationMode mode_,
|
||||
size_t replica_num_,
|
||||
size_t min_number_of_marks_,
|
||||
RangesInDataPartsDescription description_)
|
||||
: mode(mode_)
|
||||
, replica_num(replica_num_)
|
||||
, min_number_of_marks(min_number_of_marks_)
|
||||
, description(std::move(description_))
|
||||
{}
|
||||
|
||||
CoordinationMode mode;
|
||||
size_t replica_num;
|
||||
size_t min_number_of_marks;
|
||||
|
||||
/// Extension for ordered mode
|
||||
/// Extension for Ordered (InOrder or ReverseOrder) mode
|
||||
/// Contains only data part names without mark ranges.
|
||||
RangesInDataPartsDescription description;
|
||||
|
||||
void serialize(WriteBuffer & out) const;
|
||||
String describe() const;
|
||||
void deserialize(ReadBuffer & in);
|
||||
static ParallelReadRequest deserialize(ReadBuffer & in);
|
||||
void merge(ParallelReadRequest & other);
|
||||
};
|
||||
|
||||
/// ParallelReadResponse is used by an initiator to tell
|
||||
/// remote replicas about what to read during parallel reading.
|
||||
/// Additionally contains information whether there are more available
|
||||
/// marks to read (whether it is the last packet or not).
|
||||
struct ParallelReadResponse
|
||||
{
|
||||
bool finish{false};
|
||||
@ -66,15 +85,30 @@ struct ParallelReadResponse
|
||||
};
|
||||
|
||||
|
||||
/// The set of parts (their names) along with ranges to read which is sent back
|
||||
/// to the initiator by remote replicas during parallel reading.
|
||||
/// Additionally contains an identifier (replica_num) plus
|
||||
/// the reading algorithm chosen (Default, InOrder or ReverseOrder).
|
||||
struct InitialAllRangesAnnouncement
|
||||
{
|
||||
/// No default constructor, you must initialize all fields at once.
|
||||
|
||||
InitialAllRangesAnnouncement(
|
||||
CoordinationMode mode_,
|
||||
RangesInDataPartsDescription description_,
|
||||
size_t replica_num_)
|
||||
: mode(mode_)
|
||||
, description(description_)
|
||||
, replica_num(replica_num_)
|
||||
{}
|
||||
|
||||
CoordinationMode mode;
|
||||
RangesInDataPartsDescription description;
|
||||
size_t replica_num;
|
||||
|
||||
void serialize(WriteBuffer & out) const;
|
||||
String describe();
|
||||
void deserialize(ReadBuffer & in);
|
||||
static InitialAllRangesAnnouncement deserialize(ReadBuffer & in);
|
||||
};
|
||||
|
||||
|
||||
|
@ -40,7 +40,7 @@
|
||||
<operation_timeout_ms>10000</operation_timeout_ms>
|
||||
<session_timeout_ms>30000</session_timeout_ms>
|
||||
<heart_beat_interval_ms>1000</heart_beat_interval_ms>
|
||||
<election_timeout_lower_bound_ms>4000</election_timeout_lower_bound_ms>
|
||||
<election_timeout_lower_bound_ms>2000</election_timeout_lower_bound_ms>
|
||||
<election_timeout_upper_bound_ms>5000</election_timeout_upper_bound_ms>
|
||||
<raft_logs_level>information</raft_logs_level>
|
||||
<force_sync>false</force_sync>
|
||||
|
@ -846,7 +846,7 @@ def test_start_stop_moves(start_cluster, name, engine):
|
||||
node1.query("SYSTEM START MOVES {}".format(name))
|
||||
|
||||
# wait sometime until background backoff finishes
|
||||
retry = 30
|
||||
retry = 60
|
||||
i = 0
|
||||
while not sum(1 for x in used_disks if x == "jbod1") <= 2 and i < retry:
|
||||
time.sleep(1)
|
||||
|
File diff suppressed because one or more lines are too long
@ -0,0 +1,170 @@
|
||||
-- Tags: no-parallel, no-fasttest
|
||||
|
||||
{# this test checks backward compatibility of aggregate functions States against IPv4, IPv6 types #}
|
||||
|
||||
{% set ip4_generator = "select num::UInt32::IPv4 ip from (select arrayJoin(range(999999999, number)) as num from numbers(999999999,50)) order by ip" %}
|
||||
{% set ip6_generator = "SELECT toIPv6(IPv6NumToString(toFixedString(reinterpretAsFixedString(num)||reinterpretAsFixedString(num), 16))) AS ip FROM (select arrayJoin(range(1010011101, number)) as num from numbers(1010011101,50)) order by ip" %}
|
||||
{% set ip_generators = {'ip4': ip4_generator, 'ip6': ip6_generator} %}
|
||||
|
||||
|
||||
{% set agg_func_list = [ "min", "max", "first_value", "last_value", "topK", "groupArray", "groupUniqArray", "uniq", "uniqExact", "uniqCombined", "uniqCombined64", "uniqHLL12", "uniqTheta" ] %}
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- hash / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}State(ip) )) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- finalizeAggregation / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}State(ip) ) AS {{ func }}{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- hash / IfState / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}IfState(ip, 1) )) AS {{ func }}IfState{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- finalizeAggregation / IfState / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}IfState(ip, 1) ) AS {{ func }}{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% set agg_func_list = [ "argMin", "argMax" ] %}
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Arg / hash / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}State(ip, ip) )) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Arg / finalizeAggregation / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}State(ip, ip) ) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
|
||||
|
||||
{# let's test functions with not deterministic result against 1 row, to make it deterministic #}
|
||||
{% set ip4_generator = "select number::UInt32::IPv4 ip from numbers(999999999,1) order by ip" %}
|
||||
{% set ip6_generator = "SELECT toIPv6(IPv6NumToString(toFixedString(reinterpretAsFixedString(number)||reinterpretAsFixedString(number), 16))) AS ip FROM numbers(1010011101, 1) order by ip" %}
|
||||
|
||||
{% set ip_generators = {'ip4': ip4_generator, 'ip6': ip6_generator} %}
|
||||
|
||||
{% set agg_func_list = [ "any", "anyHeavy", "anyLast" ] %}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- hash / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}State(ip) )) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- finalizeAggregation / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}State(ip) ) AS {{ func }}{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
|
||||
{% set agg_func_list = [ "sumMap", "minMap", "maxMap" ] %}
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Map/Map hash / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}State(map(ip, 1::Int64)) )) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Map/Map finalizeAggregation / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}State(map(ip, 1::Int64)) ) AS {{ func }}{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Map/Array hash / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
cityHash64(hex( {{ func }}State([ip], [1::Int64]) )) AS {{ func }}State{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
||||
|
||||
|
||||
{% for generator_name, ip_generator in ip_generators.items() %}
|
||||
|
||||
select '----- Map/Array finalizeAggregation / State / {{ generator_name }} -----';
|
||||
select
|
||||
{% for func in agg_func_list -%}
|
||||
finalizeAggregation( {{ func }}State([ip], [1::Int64]) ) AS {{ func }}{{ "," if not loop.last }}
|
||||
{% endfor -%}
|
||||
from ( {{ ip_generator }} ) format Vertical;
|
||||
|
||||
{% endfor -%}
|
@ -469,6 +469,7 @@ MSan
|
||||
MVCC
|
||||
MacBook
|
||||
MacOS
|
||||
MapState
|
||||
MarkCacheBytes
|
||||
MarkCacheFiles
|
||||
MarksLoaderThreads
|
||||
|
Loading…
Reference in New Issue
Block a user