Merge branch 'master' into addParts

This commit is contained in:
KitKatKKK 2023-02-10 16:48:04 +08:00 committed by GitHub
commit ea300ce13b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 879 additions and 483 deletions

View File

@ -33,6 +33,41 @@
#include <base/extended_types.h>
template <typename T>
inline int digits10(T x)
{
if (x < 10ULL)
return 1;
if (x < 100ULL)
return 2;
if (x < 1000ULL)
return 3;
if (x < 1000000000000ULL)
{
if (x < 100000000ULL)
{
if (x < 1000000ULL)
{
if (x < 10000ULL)
return 4;
else
return 5 + (x >= 100000ULL);
}
return 7 + (x >= 10000000ULL);
}
if (x < 10000000000ULL)
return 9 + (x >= 1000000000ULL);
return 11 + (x >= 100000000000ULL);
}
return 12 + digits10(x / 1000000000000ULL);
}
namespace impl
{
@ -312,39 +347,6 @@ namespace convert
}
}
template <typename T>
static inline int digits10(T x)
{
if (x < 10ULL)
return 1;
if (x < 100ULL)
return 2;
if (x < 1000ULL)
return 3;
if (x < 1000000000000ULL)
{
if (x < 100000000ULL)
{
if (x < 1000000ULL)
{
if (x < 10000ULL)
return 4;
else
return 5 + (x >= 100000ULL);
}
return 7 + (x >= 10000000ULL);
}
if (x < 10000000000ULL)
return 9 + (x >= 1000000000ULL);
return 11 + (x >= 100000000000ULL);
}
return 12 + digits10(x / 1000000000000ULL);
}
template <typename T>
static inline char * writeUIntText(T x, char * p)

2
contrib/libxml2 vendored

@ -1 +1 @@
Subproject commit 7846b0a677f8d3ce72486125fa281e92ac9970e8
Subproject commit f507d167f1755b7eaea09fb1a44d29aab828b6d1

View File

@ -24,7 +24,6 @@ set(SRCS
"${LIBXML2_SOURCE_DIR}/xinclude.c"
"${LIBXML2_SOURCE_DIR}/nanohttp.c"
"${LIBXML2_SOURCE_DIR}/nanoftp.c"
"${LIBXML2_SOURCE_DIR}/DOCBparser.c"
"${LIBXML2_SOURCE_DIR}/catalog.c"
"${LIBXML2_SOURCE_DIR}/globals.c"
"${LIBXML2_SOURCE_DIR}/threads.c"
@ -36,7 +35,6 @@ set(SRCS
"${LIBXML2_SOURCE_DIR}/xmlschemastypes.c"
"${LIBXML2_SOURCE_DIR}/xmlunicode.c"
"${LIBXML2_SOURCE_DIR}/triostr.c"
#"${LIBXML2_SOURCE_DIR}/trio.c"
"${LIBXML2_SOURCE_DIR}/xmlreader.c"
"${LIBXML2_SOURCE_DIR}/relaxng.c"
"${LIBXML2_SOURCE_DIR}/dict.c"

View File

@ -1,6 +1,6 @@
/*
* Summary: compile-time version informations
* Description: compile-time version informations for the XML library
* Summary: compile-time version information
* Description: compile-time version information for the XML library
*
* Copy: See Copyright for the status of this software.
*
@ -29,28 +29,28 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* the version string like "1.2.3"
*/
#define LIBXML_DOTTED_VERSION "2.9.8"
#define LIBXML_DOTTED_VERSION "2.10.3"
/**
* LIBXML_VERSION:
*
* the version number: 1.2.3 value is 10203
*/
#define LIBXML_VERSION 20908
#define LIBXML_VERSION 21003
/**
* LIBXML_VERSION_STRING:
*
* the version number string, 1.2.3 value is "10203"
*/
#define LIBXML_VERSION_STRING "20908"
#define LIBXML_VERSION_STRING "21003"
/**
* LIBXML_VERSION_EXTRA:
*
* extra version information, used to show a CVS compilation
* extra version information, used to show a git commit description
*/
#define LIBXML_VERSION_EXTRA "-GITv2.9.9-rc2-1-g6fc04d71"
#define LIBXML_VERSION_EXTRA ""
/**
* LIBXML_TEST_VERSION:
@ -58,7 +58,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
* Macro to check that the libxml version in use is compatible with
* the version the software has been compiled against
*/
#define LIBXML_TEST_VERSION xmlCheckVersion(20908);
#define LIBXML_TEST_VERSION xmlCheckVersion(21003);
#ifndef VMS
#if 0
@ -90,7 +90,9 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether the thread support is configured in
*/
#define LIBXML_THREAD_ENABLED 1
#if 1
#define LIBXML_THREAD_ENABLED
#endif
/**
* LIBXML_THREAD_ALLOC_ENABLED:
@ -169,7 +171,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether the FTP support is configured in
*/
#if 1
#if 0
#define LIBXML_FTP_ENABLED
#endif
@ -205,7 +207,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether the deprecated APIs are compiled in for compatibility
*/
#if 1
#if 0
#define LIBXML_LEGACY_ENABLED
#endif
@ -227,15 +229,6 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
#define LIBXML_CATALOG_ENABLED
#endif
/**
* LIBXML_DOCB_ENABLED:
*
* Whether the SGML Docbook support is configured in
*/
#if 1
#define LIBXML_DOCB_ENABLED
#endif
/**
* LIBXML_XPATH_ENABLED:
*
@ -254,6 +247,15 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
#define LIBXML_XPTR_ENABLED
#endif
/**
* LIBXML_XPTR_LOCS_ENABLED:
*
* Whether support for XPointer locations is configured in
*/
#if 0
#define LIBXML_XPTR_LOCS_ENABLED
#endif
/**
* LIBXML_XINCLUDE_ENABLED:
*
@ -268,7 +270,7 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
*
* Whether iconv support is available
*/
#if 0
#if 1
#define LIBXML_ICONV_ENABLED
#endif
@ -348,8 +350,10 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
* LIBXML_EXPR_ENABLED:
*
* Whether the formal expressions interfaces are compiled in
*
* This code is unused and disabled unconditionally for now.
*/
#if 1
#if 0
#define LIBXML_EXPR_ENABLED
#endif
@ -452,6 +456,15 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
# define LIBXML_ATTR_FORMAT(fmt,args)
#endif
#ifndef XML_DEPRECATED
# ifdef IN_LIBXML
# define XML_DEPRECATED
# else
/* Available since at least GCC 3.1 */
# define XML_DEPRECATED __attribute__((deprecated))
# endif
#endif
#else /* ! __GNUC__ */
/**
* ATTRIBUTE_UNUSED:
@ -471,6 +484,15 @@ XMLPUBFUN void XMLCALL xmlCheckVersion(int version);
* Macro used to indicate to GCC the parameter are printf like
*/
#define LIBXML_ATTR_FORMAT(fmt,args)
/**
* XML_DEPRECATED:
*
* Macro used to indicate that a function, variable, type or struct member
* is deprecated.
*/
#ifndef XML_DEPRECATED
#define XML_DEPRECATED
#endif
#endif /* __GNUC__ */
#ifdef __cplusplus

View File

@ -16,7 +16,6 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
gnupg \
locales \
wget \
yasm \
tzdata \
&& apt-get clean

View File

@ -22,3 +22,8 @@ List of supported integrations:
- [PostgreSQL](../../../engines/table-engines/integrations/postgresql.md)
- [SQLite](../../../engines/table-engines/integrations/sqlite.md)
- [Hive](../../../engines/table-engines/integrations/hive.md)
- [ExternalDistributed](../../../engines/table-engines/integrations/ExternalDistributed.md)
- [MaterializedPostgreSQL](../../../engines/table-engines/integrations/materialized-postgresql.md)
- [NATS](../../../engines/table-engines/integrations/nats.md)
- [DeltaLake](../../../engines/table-engines/integrations/deltalake.md)
- [Hudi](../../../engines/table-engines/integrations/hudi.md)

View File

@ -683,6 +683,11 @@ Example:
## JSONColumns {#jsoncolumns}
:::tip
The output of the JSONColumns* formats provides the ClickHouse field name and then the content of each row of the table for that field;
visually, the data is rotated 90 degrees to the left.
:::
In this format, all data is represented as a single JSON Object.
Note that JSONColumns output format buffers all data in memory to output it as a single block and it can lead to high memory consumption.

View File

@ -3310,6 +3310,15 @@ SELECT
FROM fuse_tbl
```
## optimize_rewrite_aggregate_function_with_if
Rewrite aggregate functions with if expression as argument when logically equivalent.
For example, `avg(if(cond, col, null))` can be rewritten to `avgOrNullIf(cond, col)`. It may improve performance.
:::note
Supported only with experimental analyzer (`allow_experimental_analyzer = 1`).
:::
## allow_experimental_database_replicated {#allow_experimental_database_replicated}
Enables to create databases with [Replicated](../../engines/database-engines/replicated.md) engine.

View File

@ -26,7 +26,6 @@
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>

View File

@ -1,7 +1,7 @@
#include "ClusterCopierApp.h"
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/registerFormats.h>
#include <Common/scope_guard_safe.h>
#include <unistd.h>

View File

@ -1,6 +1,6 @@
#include "LibraryBridgeHelper.h"
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -12,7 +12,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
}

View File

@ -12,7 +12,7 @@
#include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <base/range.h>
#include <BridgeHelper/IBridgeHelper.h>
@ -98,7 +98,7 @@ protected:
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, getHTTPTimeouts(), credentials);
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
@ -161,6 +161,10 @@ private:
Poco::Net::HTTPBasicCredentials credentials{};
ConnectionTimeouts getHTTPTimeouts()
{
return ConnectionTimeouts::getHTTPTimeouts(getContext()->getSettingsRef(), {getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
@ -195,7 +199,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
bool res;
readBoolText(res, buf);
@ -217,7 +221,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
std::string character;
readStringBinary(character, buf);

View File

@ -342,14 +342,13 @@ set_source_files_properties(
PROPERTIES COMPILE_FLAGS "-mwaitpkg")
endif ()
target_link_libraries(common PUBLIC ch_contrib::re2_st)
target_link_libraries(common PUBLIC ch_contrib::re2)
target_link_libraries(clickhouse_common_io
PUBLIC
boost::program_options
boost::system
ch_contrib::cityhash
ch_contrib::re2
ch_contrib::re2_st
ch_contrib::zlib
pcg_random
Poco::Foundation

View File

@ -173,14 +173,12 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecFPC(CompressionCodecFactory & factory);
#endif
CompressionCodecFactory::CompressionCodecFactory()

View File

@ -52,6 +52,8 @@
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
#define DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT 10
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.

View File

@ -54,7 +54,6 @@ namespace
return applyVisitor(FieldVisitorConvertToNumber<T>(), f);
}
#ifndef KEEPER_STANDALONE_BUILD
Map stringToMap(const String & str)
{
/// Allow empty string as an empty map
@ -71,7 +70,7 @@ namespace
return (*column)[0].safeGet<Map>();
}
Map fieldToMap(const Field & f)
[[maybe_unused]] Map fieldToMap(const Field & f)
{
if (f.getType() == Field::Types::String)
{
@ -82,7 +81,6 @@ namespace
return f.safeGet<const Map &>();
}
#endif
}
@ -327,6 +325,13 @@ void SettingFieldString::readBinary(ReadBuffer & in)
*this = std::move(str);
}
/// Unbeautiful workaround for clickhouse-keeper standalone build ("-DBUILD_STANDALONE_KEEPER=1").
/// In this build, we don't build and link library dbms (to which SettingsField.cpp belongs) but
/// only build SettingsField.cpp. Further dependencies, e.g. DataTypeString and DataTypeMap below,
/// require building of further files for clickhouse-keeper. To keep dependencies slim, we don't do
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -239,8 +239,6 @@ struct SettingFieldString
void readBinary(ReadBuffer & in);
};
#ifndef KEEPER_STANDALONE_BUILD
struct SettingFieldMap
{
public:
@ -264,8 +262,6 @@ public:
void readBinary(ReadBuffer & in);
};
#endif
struct SettingFieldChar
{
public:

View File

@ -100,7 +100,7 @@ inline UInt32 getDecimalScale(const DataTypeDecimal<T> & data_type)
template <typename FromDataType, typename ToDataType, typename ReturnType = void>
requires (IsDataTypeDecimal<FromDataType> && IsDataTypeDecimal<ToDataType>)
inline ReturnType convertDecimalsImpl(const typename FromDataType::FieldType & value, UInt32 scale_from, UInt32 scale_to, typename ToDataType::FieldType& result)
inline ReturnType convertDecimalsImpl(const typename FromDataType::FieldType & value, UInt32 scale_from, UInt32 scale_to, typename ToDataType::FieldType & result)
{
using FromFieldType = typename FromDataType::FieldType;
using ToFieldType = typename ToDataType::FieldType;
@ -121,8 +121,14 @@ inline ReturnType convertDecimalsImpl(const typename FromDataType::FieldType & v
return ReturnType(false);
}
}
else if (scale_to == scale_from)
{
converted_value = value.value;
}
else
{
converted_value = value.value / DecimalUtils::scaleMultiplier<MaxNativeType>(scale_from - scale_to);
}
if constexpr (sizeof(FromFieldType) > sizeof(ToFieldType))
{
@ -155,7 +161,7 @@ inline typename ToDataType::FieldType convertDecimals(const typename FromDataTyp
template <typename FromDataType, typename ToDataType>
requires (IsDataTypeDecimal<FromDataType> && IsDataTypeDecimal<ToDataType>)
inline bool tryConvertDecimals(const typename FromDataType::FieldType & value, UInt32 scale_from, UInt32 scale_to, typename ToDataType::FieldType& result)
inline bool tryConvertDecimals(const typename FromDataType::FieldType & value, UInt32 scale_from, UInt32 scale_to, typename ToDataType::FieldType & result)
{
return convertDecimalsImpl<FromDataType, ToDataType, bool>(value, scale_from, scale_to, result);
}

View File

@ -1,7 +1,6 @@
#include "HTTPDictionarySource.h"
#include <Formats/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
@ -39,7 +38,7 @@ HTTPDictionarySource::HTTPDictionarySource(
, configuration(configuration_)
, sample_block(sample_block_)
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
credentials.setUsername(credentials_.getUsername());
credentials.setPassword(credentials_.getPassword());
@ -52,7 +51,7 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
credentials.setUsername(other.credentials.getUsername());
credentials.setPassword(other.credentials.getPassword());

View File

@ -4,7 +4,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -76,7 +76,7 @@ XDBCDictionarySource::XDBCDictionarySource(
, load_all_query(query_builder.composeLoadAllQuery())
, bridge_helper(bridge_)
, bridge_url(bridge_helper->getMainURI())
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)

View File

@ -149,13 +149,13 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (buffer_size % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned size.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned size.
buffer_size = align_up(buffer_size);
}
if (reinterpret_cast<uintptr_t>(existing_memory) % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned offset.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned offset.
}
/// Attempt to open a file with O_DIRECT

View File

@ -40,11 +40,15 @@ void WebObjectStorage::initialize(const String & uri_path) const
try
{
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP metadata_buf(
Poco::URI(fs::path(uri_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(getContext()),
ConnectionTimeouts::getHTTPTimeouts(
getContext()->getSettingsRef(),
{getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials,
/* max_redirects= */ 0,
/* buffer_size_= */ DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -377,7 +377,7 @@ struct ToDateTransform32Or64
static NO_SANITIZE_UNDEFINED ToType execute(const FromType & from, const DateLUTImpl & time_zone)
{
// since converting to Date, no need in values outside of default LUT range.
return (from < DATE_LUT_MAX_DAY_NUM)
return (from <= DATE_LUT_MAX_DAY_NUM)
? from
: time_zone.toDayNum(std::min(time_t(from), time_t(0xFFFFFFFF)));
}
@ -394,7 +394,7 @@ struct ToDateTransform32Or64Signed
/// The function should be monotonic (better for query optimizations), so we saturate instead of overflow.
if (from < 0)
return 0;
return (from < DATE_LUT_MAX_DAY_NUM)
return (from <= DATE_LUT_MAX_DAY_NUM)
? static_cast<ToType>(from)
: time_zone.toDayNum(std::min(time_t(from), time_t(0xFFFFFFFF)));
}

View File

@ -2,9 +2,10 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <base/extended_types.h>
#include <base/itoa.h>
namespace DB
@ -83,7 +84,7 @@ private:
template <typename T, typename ColVecType>
static void execute(const ColVecType & col, ColumnUInt8 & result_column, size_t rows_count)
{
using NativeT = NativeType<T>;
using NativeT = make_unsigned_t<NativeType<T>>;
const auto & src_data = col.getData();
auto & dst_data = result_column.getData();
@ -92,50 +93,22 @@ private:
for (size_t i = 0; i < rows_count; ++i)
{
if constexpr (is_decimal<T>)
dst_data[i] = digits<NativeT>(src_data[i].value);
else
dst_data[i] = digits<NativeT>(src_data[i]);
}
}
template <typename T>
static UInt32 digits(T value)
{
static_assert(!is_decimal<T>);
using DivT = std::conditional_t<is_signed_v<T>, Int32, UInt32>;
UInt32 res = 0;
T tmp;
if constexpr (sizeof(T) > sizeof(Int32))
{
static constexpr const DivT e9 = 1000000000;
tmp = value / e9;
while (tmp != 0)
{
value = tmp;
tmp /= e9;
res += 9;
auto value = src_data[i].value;
if (unlikely(value < 0))
dst_data[i] = digits10<NativeT>(-static_cast<NativeT>(value));
else
dst_data[i] = digits10<NativeT>(value);
}
else
{
auto value = src_data[i];
if (unlikely(value < 0))
dst_data[i] = digits10<NativeT>(-static_cast<NativeT>(value));
else
dst_data[i] = digits10<NativeT>(value);
}
}
static constexpr const DivT e3 = 1000;
tmp = value / e3;
while (tmp != 0)
{
value = tmp;
tmp /= e3;
res += 3;
}
while (value != 0)
{
value /= 10;
++res;
}
return res;
}
};

View File

@ -0,0 +1,126 @@
#include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(0)
, http_keep_alive_timeout(0)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(0)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(http_keep_alive_timeout_)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_,
Poco::Timespan secure_connection_timeout_,
Poco::Timespan receive_hello_timeout_,
Poco::Timespan receive_data_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(http_keep_alive_timeout_)
, secure_connection_timeout(secure_connection_timeout_)
, hedged_connection_timeout(receive_hello_timeout_)
, receive_data_timeout(receive_data_timeout_)
{
}
Poco::Timespan ConnectionTimeouts::saturate(Poco::Timespan timespan, Poco::Timespan limit)
{
if (limit.totalMicroseconds() == 0)
return timespan;
else
return (timespan > limit) ? limit : timespan;
}
ConnectionTimeouts ConnectionTimeouts::getSaturated(Poco::Timespan limit) const
{
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit),
saturate(secure_connection_timeout, limit),
saturate(hedged_connection_timeout, limit),
saturate(receive_data_timeout, limit));
}
/// Timeouts for the case when we have just single attempt to connect.
ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
/// Timeouts for the case when we will try many addresses in a loop.
ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(
settings.connect_timeout_with_failover_ms,
settings.send_timeout,
settings.receive_timeout,
settings.tcp_keep_alive_timeout,
0,
settings.connect_timeout_with_failover_secure_ms,
settings.hedged_connection_timeout_ms,
settings.receive_data_timeout_ms);
}
ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout)
{
return ConnectionTimeouts(
settings.http_connection_timeout,
settings.http_send_timeout,
settings.http_receive_timeout,
settings.tcp_keep_alive_timeout,
http_keep_alive_timeout);
}
}

View File

@ -30,47 +30,18 @@ struct ConnectionTimeouts
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(0),
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan receive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan tcp_keep_alive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan http_keep_alive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
@ -79,43 +50,17 @@ struct ConnectionTimeouts
Poco::Timespan http_keep_alive_timeout_,
Poco::Timespan secure_connection_timeout_,
Poco::Timespan receive_hello_timeout_,
Poco::Timespan receive_data_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(secure_connection_timeout_),
hedged_connection_timeout(receive_hello_timeout_),
receive_data_timeout(receive_data_timeout_)
{
}
Poco::Timespan receive_data_timeout_);
static Poco::Timespan saturate(Poco::Timespan timespan, Poco::Timespan limit)
{
if (limit.totalMicroseconds() == 0)
return timespan;
else
return (timespan > limit) ? limit : timespan;
}
ConnectionTimeouts getSaturated(Poco::Timespan limit) const
{
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit),
saturate(secure_connection_timeout, limit),
saturate(hedged_connection_timeout, limit),
saturate(receive_data_timeout, limit));
}
static Poco::Timespan saturate(Poco::Timespan timespan, Poco::Timespan limit);
ConnectionTimeouts getSaturated(Poco::Timespan limit) const;
/// Timeouts for the case when we have just single attempt to connect.
static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings);
/// Timeouts for the case when we will try many addresses in a loop.
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings);
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context);
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout);
};
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
/// Timeouts for the case when we have just single attempt to connect.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
/// Timeouts for the case when we will try many addresses in a loop.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(
settings.connect_timeout_with_failover_ms,
settings.send_timeout,
settings.receive_timeout,
settings.tcp_keep_alive_timeout,
0,
settings.connect_timeout_with_failover_secure_ms,
settings.hedged_connection_timeout_ms,
settings.receive_data_timeout_ms);
}
inline ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(ContextPtr context)
{
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout);
}
}

View File

@ -7,7 +7,7 @@
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>

View File

@ -886,8 +886,8 @@ void InterpreterSystemQuery::syncReplica()
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
if (!storage_replicated->waitForProcessingQueue(getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \

View File

@ -117,7 +117,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
data->rethrowExceptionIfHas();
bool is_execution_finished
= !data->executor->checkTimeLimitSoft() || lazy_format ? lazy_format->isFinished() : data->is_finished.load();
= !data->executor->checkTimeLimitSoft() || (lazy_format ? lazy_format->isFinished() : data->is_finished.load());
if (is_execution_finished)
{

View File

@ -12,7 +12,6 @@
#include <Processors/Transforms/ReadFromMergeTreeDependencyTransform.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/ConnectionTimeoutsContext.h>
#include "Common/logger_useful.h"
#include <Common/checkStackSize.h>
#include <Core/QueryProcessingStage.h>

View File

@ -21,7 +21,7 @@
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>

View File

@ -20,7 +20,6 @@
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CheckingCompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/Operators.h>
#include <Disks/IDisk.h>
#include <boost/algorithm/string/find_iterator.hpp>

View File

@ -13,7 +13,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/NativeWriter.h>
#include <Processors/Sinks/RemoteSink.h>
#include <Processors/Executors/PushingPipelineExecutor.h>

View File

@ -8,7 +8,7 @@
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>

View File

@ -9,7 +9,7 @@ namespace ErrorCodes
}
bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const
bool DropPartsRanges::isAffectedByDropPart(const std::string & new_part_name, std::string & postpone_reason) const
{
if (new_part_name.empty())
return false;
@ -19,7 +19,9 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
{
if (!drop_range.isDisjoint(entry_info))
{
postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartNameForLogs(), new_part_name);
postpone_reason = fmt::format("Has DROP_PART affecting entry {} producing part {}. "
"Will postpone it's execution.",
drop_range.getPartNameForLogs(), new_part_name);
return true;
}
}
@ -27,23 +29,24 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
return false;
}
bool DropPartsRanges::isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const
bool DropPartsRanges::isAffectedByDropPart(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const
{
return isAffectedByDropRange(entry.new_part_name, postpone_reason);
return isAffectedByDropPart(entry.new_part_name, postpone_reason);
}
void DropPartsRanges::addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
void DropPartsRanges::addDropPart(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE", entry->typeToString());
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_PART)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE",
entry->typeToString());
MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(*entry->getDropRange(format_version), format_version);
drop_ranges.emplace(entry->znode_name, entry_info);
}
void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
void DropPartsRanges::removeDropPart(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_PART)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Trying to remove entry of type {} from drop ranges, expected DROP_RANGE",
entry->typeToString());
@ -53,7 +56,7 @@ void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & ent
drop_ranges.erase(it);
}
bool DropPartsRanges::hasDropRange(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info) const
bool DropPartsRanges::hasDropPart(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info) const
{
for (const auto & [_, drop_range] : drop_ranges)
{

View File

@ -23,20 +23,20 @@ public:
: format_version(format_version_)
{}
/// Entry is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Entry is affected by DROP_PART and must be postponed
bool isAffectedByDropPart(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Part is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const;
/// Part is affected by DROP_PART and must be postponed
bool isAffectedByDropPart(const std::string & new_part_name, std::string & postpone_reason) const;
/// Already has equal DROP_RANGE. Don't need to assign new one
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
bool hasDropPart(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Add DROP_RANGE to map
void addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
void addDropPart(const ReplicatedMergeTreeLogEntryPtr & entry);
/// Remove DROP_RANGE from map
void removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
void removeDropPart(const ReplicatedMergeTreeLogEntryPtr & entry);
};

View File

@ -211,7 +211,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat);
sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat,
storage.format_version);
}
else
{
@ -267,7 +268,9 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str, sync_destination_log_entry_stat);
sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str,
sync_destination_log_entry_stat,
storage.format_version);
}
else
{
@ -330,7 +333,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat);
fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat,
storage.format_version);
}
else
{
@ -397,11 +401,14 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str, attach_rollback_log_entry_stat);
attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str,
attach_rollback_log_entry_stat,
storage.format_version);
}
else
{
const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat);
const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat,
storage.format_version);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
@ -495,7 +502,7 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
Coordination::Stat stat;
String log_entry_str = zk->get(attach_log_entry_barrier_path, &stat);
log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat);
log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat, storage.format_version);
}
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 1);
@ -542,7 +549,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat);
source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat,
storage.format_version);
}
else
{

View File

@ -99,6 +99,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
break;
case DROP_RANGE:
case DROP_PART:
if (detach)
out << "detach\n";
else
@ -180,7 +181,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << "quorum: " << quorum << '\n';
}
void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFormatVersion partition_format_version)
{
UInt8 format_version = 0;
String type_str;
@ -278,6 +279,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
type = DROP_RANGE;
detach = type_str == "detach";
in >> new_part_name;
auto drop_range_info = MergeTreePartInfo::fromPartName(new_part_name, partition_format_version);
if (!drop_range_info.isFakeDropRangePart())
type = DROP_PART;
}
else if (type_str == "clear_column") /// NOTE: Deprecated.
{
@ -426,11 +430,12 @@ String ReplicatedMergeTreeLogEntryData::toString() const
return out.str();
}
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat)
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat,
MergeTreeDataFormatVersion format_version)
{
ReadBufferFromString in(s);
Ptr res = std::make_shared<ReplicatedMergeTreeLogEntry>();
res->readText(in);
res->readText(in, format_version);
assertEOF(in);
if (!res->create_time)
@ -444,6 +449,9 @@ std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDat
if (type == DROP_RANGE)
return new_part_name;
if (type == DROP_PART)
return new_part_name;
if (type == REPLACE_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
@ -457,14 +465,9 @@ std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDat
return {};
}
bool ReplicatedMergeTreeLogEntryData::isDropPart(MergeTreeDataFormatVersion format_version) const
bool ReplicatedMergeTreeLogEntryData::isDropPart(MergeTreeDataFormatVersion) const
{
if (type == DROP_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
return !drop_range_info.isFakeDropRangePart();
}
return false;
return type == DROP_PART;
}
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
@ -475,30 +478,7 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
{
auto drop_range_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It's DROP PART and we don't want to add it into virtual parts
/// because it can lead to intersecting parts on stale replicas and this
/// problem is fundamental. So we have very weak guarantees for DROP
/// PART. If any concurrent merge will be assigned then DROP PART will
/// delete nothing and part will be successfully merged into bigger part.
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In
/// the second case part UUIDs used to forbid merges for moding parts so
/// there is no problem with concurrent merges. The third case is quite
/// rare and we give very weak guarantee: there will be no active part
/// with this name, but possibly it was merged to some other part.
if (!drop_range_part_info.isFakeDropRangePart())
return {};
return {new_part_name};
}
if (type == REPLACE_RANGE)
{
@ -509,6 +489,25 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
return res;
}
/// It's DROP PART and we don't want to add it into virtual parts
/// because it can lead to intersecting parts on stale replicas and this
/// problem is fundamental. So we have very weak guarantees for DROP
/// PART. If any concurrent merge will be assigned then DROP PART will
/// delete nothing and part will be successfully merged into bigger part.
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In
/// the second case part UUIDs used to forbid merges for moding parts so
/// there is no problem with concurrent merges. The third case is quite
/// rare and we give very weak guarantee: there will be no active part
/// with this name, but possibly it was merged to some other part.
if (type == DROP_PART)
return {};
/// Doesn't produce any part.
if (type == SYNC_PINNED_PART_UUIDS)
return {};

View File

@ -45,6 +45,7 @@ struct ReplicatedMergeTreeLogEntryData
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths
SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state.
CLONE_PART_FROM_SHARD, /// Clone part from another shard.
DROP_PART, /// NOTE: Virtual (has the same (de)serialization format as DROP_RANGE). Deletes the specified part.
};
static String typeToString(Type type)
@ -62,6 +63,7 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA";
case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS";
case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD";
case ReplicatedMergeTreeLogEntryData::DROP_PART: return "DROP_PART";
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", DB::toString<int>(type));
}
@ -73,7 +75,7 @@ struct ReplicatedMergeTreeLogEntryData
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
void readText(ReadBuffer & in, MergeTreeDataFormatVersion partition_format_version);
String toString() const;
String znode_name;
@ -183,7 +185,7 @@ struct ReplicatedMergeTreeLogEntry : public ReplicatedMergeTreeLogEntryData, std
std::condition_variable execution_complete; /// Awake when currently_executing becomes false.
static Ptr parse(const String & s, const Coordination::Stat & stat);
static Ptr parse(const String & s, const Coordination::Stat & stat, MergeTreeDataFormatVersion format_version);
};
using ReplicatedMergeTreeLogEntryPtr = std::shared_ptr<ReplicatedMergeTreeLogEntry>;

View File

@ -29,7 +29,7 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
, drop_ranges(format_version)
, drop_parts(format_version)
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
@ -95,10 +95,26 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr &
return !virtual_part_name.empty() && virtual_part_name != data_part->name;
}
bool ReplicatedMergeTreeQueue::hasDropRange(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
bool ReplicatedMergeTreeQueue::isGoingToBeDropped(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
{
std::lock_guard lock(state_mutex);
return drop_ranges.hasDropRange(part_info, out_drop_range_info);
return isGoingToBeDroppedImpl(part_info, out_drop_range_info);
}
bool ReplicatedMergeTreeQueue::isGoingToBeDroppedImpl(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
{
String covering_virtual = virtual_parts.getContainingPart(part_info);
if (!covering_virtual.empty())
{
auto covering_virtual_info = MergeTreePartInfo::fromPartName(covering_virtual, format_version);
if (covering_virtual_info.isFakeDropRangePart())
{
if (out_drop_range_info)
*out_drop_range_info = covering_virtual_info;
return true;
}
}
return drop_parts.hasDropPart(part_info, out_drop_range_info);
}
bool ReplicatedMergeTreeQueue::checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const
@ -165,7 +181,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
for (size_t i = 0; i < children_num; ++i)
{
auto res = results[i];
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
LogEntryPtr entry = LogEntry::parse(res.data, res.stat, format_version);
entry->znode_name = children[i];
std::lock_guard lock(state_mutex);
@ -221,41 +237,35 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
for (const String & virtual_part_name : entry_virtual_parts)
{
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type == LogEntry::DROP_RANGE)
continue;
/// Note: DROP_PART does not have virtual parts
auto part_info = MergeTreePartInfo::fromPartName(virtual_part_name, format_version);
if (entry->type == LogEntry::REPLACE_RANGE && part_info.isFakeDropRangePart())
if (part_info.isFakeDropRangePart())
continue;
addPartToMutations(virtual_part_name, part_info);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
if (entry->type == LogEntry::DROP_PART)
{
queue.push_back(entry);
}
else
{
drop_ranges.addDropRange(entry);
/// DROP PART remove parts, so we remove it from virtual parts to
/// preserve invariant virtual_parts = current_parts + queue.
/// Also remove it from parts_to_do to avoid intersecting parts in parts_to_do
/// if fast replica will execute DROP PART and assign a merge that contains dropped blocks.
if (entry->isDropPart(format_version))
{
String drop_part_name = *entry->getDropRange(format_version);
virtual_parts.removePartAndCoveredParts(drop_part_name);
removeCoveredPartsFromMutations(drop_part_name, /*remove_part = */ true, /*remove_covered_parts = */ true);
}
queue.push_front(entry);
drop_parts.addDropPart(entry);
String drop_part_name = *entry->getDropRange(format_version);
virtual_parts.removePartAndCoveredParts(drop_part_name);
removeCoveredPartsFromMutations(drop_part_name, /*remove_part = */ true, /*remove_covered_parts = */ true);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->getDropRange(format_version))
queue.push_front(entry);
else
queue.push_back(entry);
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
inserts_by_time.insert(entry);
@ -350,36 +360,26 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
if (entry->type == LogEntry::DROP_PART)
{
LOG_TEST(log, "Removing drop part from current and virtual parts {}", *drop_range_part_name);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
LOG_TEST(log, "Removing DROP_PART from current parts {}", *drop_range_part_name);
current_parts.removePartAndCoveredParts(*drop_range_part_name);
drop_parts.removeDropPart(entry);
}
else
{
LOG_TEST(log, "Removing drop range from current and virtual parts {}", *drop_range_part_name);
LOG_TEST(log, "Removing DROP_RANGE from current and virtual parts {}", *drop_range_part_name);
current_parts.remove(*drop_range_part_name);
virtual_parts.remove(*drop_range_part_name);
}
/// During inserting to queue (insertUnlocked()) we remove part for
/// DROP_RANGE only for DROP PART but not for DROP PARTITION.
virtual_parts.remove(*drop_range_part_name);
/// NOTE: we don't need to remove part/covered parts from mutations (removeCoveredPartsFromMutations()) here because:
/// - for DROP PART we have this during inserting to queue (see insertUnlocked())
/// - for DROP PARTITION we have this in the loop above (when we adding parts to current_parts)
}
if (entry->type == LogEntry::DROP_RANGE)
{
drop_ranges.removeDropRange(entry);
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_TRACE(log, "Finishing metadata alter with version {}", entry->alter_version);
@ -388,9 +388,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
else
{
if (entry->type == LogEntry::DROP_RANGE)
if (entry->type == LogEntry::DROP_PART)
{
drop_ranges.removeDropRange(entry);
drop_parts.removeDropPart(entry);
}
LOG_TEST(log, "Removing unsuccessful entry {} virtual parts [{}]", entry->znode_name, fmt::join(entry_virtual_parts, ", "));
@ -544,8 +544,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
if (!found && need_remove_from_zk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size);
notifySubscribers(queue_size, entry->znode_name);
if (!need_remove_from_zk)
return;
@ -664,7 +663,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{
auto res = get_results[i];
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat, format_version));
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
@ -782,6 +781,7 @@ QueueRepresentation getQueueRepresentation(const std::list<ReplicatedMergeTreeLo
break;
}
case LogEntryType::DROP_RANGE:
case LogEntryType::DROP_PART:
{
result[key].dropped_parts.push_back(entry->new_part_name);
break;
@ -1082,7 +1082,7 @@ bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePart
return false;
assert(entry_ptr->replace_range_entry);
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE)
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE && current.type != LogEntry::DROP_PART)
return false;
if (entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves
@ -1260,14 +1260,15 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
/// We have found `part_name` on some replica and are going to fetch it instead of covered `entry->new_part_name`.
std::unique_lock lock(state_mutex);
if (virtual_parts.getContainingPart(part_name).empty())
String covering_part = virtual_parts.getContainingPart(part_name);
if (covering_part.empty())
{
/// We should not fetch any parts that absent in our `virtual_parts` set,
/// because we do not know about such parts according to our replication queue (we know about them from some side-channel).
/// Otherwise, it may break invariants in replication queue reordering, for example:
/// 1. Our queue contains GET_PART all_2_2_0, log contains DROP_RANGE all_2_2_0 and MERGE_PARTS all_1_3_1
/// 2. We execute GET_PART all_2_2_0, but fetch all_1_3_1 instead
/// (drop_ranges.isAffectedByDropRange(...) is false-negative, because DROP_RANGE all_2_2_0 is not pulled yet).
/// (drop_parts.isAffectedByDropPart(...) is false-negative, because DROP_RANGE all_2_2_0 is not pulled yet).
/// It actually means, that MERGE_PARTS all_1_3_1 is executed too, but it's not even pulled yet.
/// 3. Then we pull log, trying to execute DROP_RANGE all_2_2_0
/// and reveal that it was incorrectly reordered with MERGE_PARTS all_1_3_1 (drop range intersects merged part).
@ -1276,8 +1277,8 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
}
/// FIXME get rid of actual_part_name.
/// If new covering part jumps over DROP_RANGE we should execute drop range first
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
/// If new covering part jumps over non-disjoint DROP_PART we should execute DROP_PART first to avoid intersection
if (drop_parts.isAffectedByDropPart(part_name, reject_reason))
return false;
std::vector<LogEntryPtr> covered_entries_to_wait;
@ -1307,8 +1308,26 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
if (entry.type != LogEntry::DROP_RANGE && drop_ranges.isAffectedByDropRange(entry, out_postpone_reason))
return false;
if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART)
{
/// Do not touch any entries that are not disjoint with some DROP_PART to avoid intersecting parts
if (drop_parts.isAffectedByDropPart(entry, out_postpone_reason))
return false;
}
/// Optimization: it does not really make sense to generate parts that are going to be dropped anyway
if (!entry.new_part_name.empty())
{
auto new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
MergeTreePartInfo drop_info;
if (entry.type != LogEntry::DROP_PART && !new_part_info.isFakeDropRangePart() && isGoingToBeDroppedImpl(new_part_info, &drop_info))
{
out_postpone_reason = fmt::format(
"Not executing {} because it produces part {} that is going to be dropped by {}",
entry.znode_name, entry.new_part_name, drop_info.getPartNameForLogs());
return false;
}
}
/// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
@ -1461,58 +1480,55 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::REPLACE_RANGE)
/// DROP_RANGE, DROP_PART and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// But it should not happen if ranges are disjoint.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (auto drop_range = entry.getDropRange(format_version))
{
/// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// But it should not happen if ranges are disjoint.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (auto drop_range = entry.getDropRange(format_version))
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
if (drop_range_info.isDisjoint(info))
continue;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
if (drop_range_info.isDisjoint(info))
continue;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
}
if (entry.isDropPart(format_version))
if (entry.type == LogEntry::DROP_PART)
{
/// We should avoid reordering of REPLACE_RANGE and DROP_PART,
/// because if replace_range_entry->new_part_names contains drop_range_entry->new_part_name
/// and we execute DROP PART before REPLACE_RANGE, then DROP PART will be no-op
/// (because part is not created yet, so there is nothing to drop;
/// DROP_RANGE does not cover all parts of REPLACE_RANGE, so removePartProducingOpsInRange(...) will not remove anything too)
/// and part will never be removed. Replicas may diverge due to such reordering.
/// We don't need to do anything for other entry types, because removePartProducingOpsInRange(...) will remove them as expected.
auto drop_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
for (const auto & replace_entry : queue)
{
/// We should avoid reordering of REPLACE_RANGE and DROP PART (DROP_RANGE),
/// because if replace_range_entry->new_part_names contains drop_range_entry->new_part_name
/// and we execute DROP PART before REPLACE_RANGE, then DROP PART will be no-op
/// (because part is not created yet, so there is nothing to drop;
/// DROP_RANGE does not cover all parts of REPLACE_RANGE, so removePartProducingOpsInRange(...) will not remove anything too)
/// and part will never be removed. Replicas may diverge due to such reordering.
/// We don't need to do anything for other entry types, because removePartProducingOpsInRange(...) will remove them as expected.
if (replace_entry->type != LogEntry::REPLACE_RANGE)
continue;
auto drop_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
for (const auto & replace_entry : queue)
for (const auto & new_part_name : replace_entry->replace_range_entry->new_part_names)
{
if (replace_entry->type != LogEntry::REPLACE_RANGE)
continue;
for (const auto & new_part_name : replace_entry->replace_range_entry->new_part_names)
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
}
}
@ -2450,9 +2466,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
}
}
bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
bool ReplicatedMergeTreeMergePredicate::isGoingToBeDropped(const MergeTreePartInfo & new_drop_range_info,
MergeTreePartInfo * out_drop_range_info) const
{
return queue.hasDropRange(new_drop_range_info);
return queue.isGoingToBeDropped(new_drop_range_info, out_drop_range_info);
}
String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String & part_name) const
@ -2466,12 +2483,17 @@ ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
{
std::lock_guard lock(state_mutex);
std::unordered_set<String> result;
result.reserve(queue.size());
for (const auto & entry : queue)
result.insert(entry->znode_name);
std::lock_guard lock_subscribers(subscribers_mutex);
auto it = subscribers.emplace(subscribers.end(), std::move(callback));
/// Atomically notify about current size
(*it)(queue.size());
/// Notify queue size & log entry ids to avoid waiting for removed entries
(*it)(result.size(), result, std::nullopt);
return SubscriberHandler(it, *this);
}
@ -2482,16 +2504,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
queue.subscribers.erase(it);
}
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id)
{
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size);
subscriber_callback(new_queue_size, {}, removed_log_entry_id);
}
ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0);
notifySubscribers(0, std::nullopt);
}
String padIndex(Int64 index)

View File

@ -105,8 +105,9 @@ private:
ActiveDataPartSet virtual_parts;
/// Dropped ranges inserted into queue
DropPartsRanges drop_ranges;
/// We do not add DROP_PARTs to virtual_parts because they can intersect,
/// so we store them separately in this structure.
DropPartsRanges drop_parts;
/// A set of mutations loaded from ZooKeeper.
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this set.
@ -163,7 +164,7 @@ private:
/// A subscriber callback is called when an entry queue is deleted
mutable std::mutex subscribers_mutex;
using SubscriberCallBack = std::function<void(size_t /* queue_size */)>;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::unordered_set<String> /*wait_for_ids*/, std::optional<String> /* removed_log_entry_id */)>;
using Subscribers = std::list<SubscriberCallBack>;
using SubscriberIterator = Subscribers::iterator;
@ -180,8 +181,8 @@ private:
Subscribers subscribers;
/// Notify subscribers about queue change
void notifySubscribers(size_t new_queue_size);
/// Notify subscribers about queue change (new queue size and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(
@ -405,8 +406,9 @@ public:
/// Checks that part is already in virtual parts
bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
/// Returns true if part_info is covered by some DROP_RANGE
bool hasDropRange(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Returns true if part_info is covered by some DROP_RANGE or DROP_PART
bool isGoingToBeDropped(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
bool isGoingToBeDroppedImpl(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const;
/// Check that part produced by some entry in queue and get source parts for it.
/// If there are several entries return largest source_parts set. This rarely possible
@ -524,7 +526,7 @@ public:
int32_t getVersion() const { return merges_version; }
/// Returns true if there's a drop range covering new_drop_range_info
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
bool isGoingToBeDropped(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Returns virtual part covering part_name (if any) or empty string
String getCoveringVirtualPart(const String & part_name) const;

View File

@ -81,7 +81,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <memory>
#include <filesystem>

View File

@ -69,7 +69,6 @@
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
@ -233,6 +232,11 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
return res;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(const String & partition_id)
{
/// NOTE We don't have special log entry type for MOVE PARTITION/ATTACH PARTITION FROM,
@ -1551,7 +1555,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
if (entry.type == LogEntry::DROP_RANGE)
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::DROP_PART)
{
executeDropRange(entry);
return true;
@ -2356,7 +2360,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return true;
}
void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
@ -2385,7 +2388,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
auto metadata_snapshot = getInMemoryMetadataPtr();
String source_replica_path = entry.source_shard + "/replicas/" + replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(getContext());
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();
@ -2522,7 +2525,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.stat = std::move(res.stat);
try
{
info.parsed_entry = LogEntry::parse(info.data, info.stat);
info.parsed_entry = LogEntry::parse(info.data, info.stat, format_version);
}
catch (...)
{
@ -2535,7 +2538,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.parsed_entry->znode_name = source_queue_names[i];
if (info.parsed_entry->type == LogEntry::DROP_RANGE)
if (info.parsed_entry->type == LogEntry::DROP_RANGE || info.parsed_entry->type == LogEntry::DROP_PART)
{
drop_range_set.add(info.parsed_entry->new_part_name);
}
@ -2637,7 +2640,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
String covering_drop_range = drop_range_set.getContainingPart(part_name);
if (!covering_drop_range.empty())
{
LOG_TRACE(log, "{} {}: it's covered by DROP_RANGE {}", log_msg_context, part_name, covering_drop_range);
LOG_TRACE(log, "{} {}: it's covered by drop range {}", log_msg_context, part_name, covering_drop_range);
return true;
}
@ -3555,9 +3558,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
/// Because of version check this method will never create FETCH if drop part exists
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{broken_part_info.partition_id});
if (merge_pred.hasDropRange(broken_part_info))
if (merge_pred.isGoingToBeDropped(broken_part_info))
{
LOG_INFO(log, "Broken part {} is covered by DROP RANGE, don't need to fetch it", part_name);
LOG_INFO(log, "Broken part {} is covered by drop range, don't need to fetch it", part_name);
return;
}
/// Check that our version of log (and queue) is the most fresh. Otherwise don't create new entry fetch entry.
@ -3614,7 +3617,7 @@ void StorageReplicatedMergeTree::stopBeingLeader()
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context)
{
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(local_context);
auto timeouts = getHTTPTimeouts(local_context);
auto settings = getSettings();
if (settings->replicated_fetches_http_connection_timeout.changed)
@ -4261,7 +4264,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
std::function<MutableDataPartPtr()> get_part;
ReplicatedMergeTreeAddress address(zookeeper->get(fs::path(source_replica_path) / "host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(getContext());
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();
@ -5819,7 +5822,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
String log_entry_str;
Coordination::Stat log_entry_stat;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "log" / log_entry_name, log_entry_str, &log_entry_stat);
ReplicatedMergeTreeLogEntryData log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, log_entry_stat);
ReplicatedMergeTreeLogEntryData log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, log_entry_stat, format_version);
if (exists && entry.log_entry_id == log_entry.log_entry_id)
{
LOG_DEBUG(log, "Found log entry with id `{}` in the log", entry.log_entry_id);
@ -5886,7 +5889,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
/// Check if the id matches rather than just contents. This entry
/// might have been written by different ClickHouse versions and
/// it is hard to guarantee same text representation.
ReplicatedMergeTreeLogEntryData queue_entry = *ReplicatedMergeTreeLogEntry::parse(queue_entry_str, queue_entry_stat);
ReplicatedMergeTreeLogEntryData queue_entry = *ReplicatedMergeTreeLogEntry::parse(queue_entry_str, queue_entry_stat, format_version);
if (entry.log_entry_id == queue_entry.log_entry_id)
{
queue_entry_to_wait_for = entry_name;
@ -7549,26 +7552,37 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
background_moves_assignee.trigger();
}
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds)
{
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
background_operations_assignee.trigger();
Poco::Event target_size_event;
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
std::unordered_set<String> wait_for_ids;
bool set_ids_to_wait = true;
Poco::Event target_entry_event;
auto callback = [&target_entry_event, &wait_for_ids, &set_ids_to_wait](size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id)
{
if (new_queue_size <= queue_size)
target_size_event.set();
if (set_ids_to_wait)
{
wait_for_ids = log_entry_ids;
set_ids_to_wait = false;
}
if (removed_log_entry_id.has_value())
wait_for_ids.erase(removed_log_entry_id.value());
if (wait_for_ids.empty() || new_queue_size == 0)
target_entry_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));
while (!target_size_event.tryWait(50))
while (!target_entry_event.tryWait(50))
{
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
return false;
@ -7576,14 +7590,13 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
if (partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table");
}
return true;
}
bool StorageReplicatedMergeTree::dropPartImpl(
zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop)
{
LOG_TRACE(log, "Will try to insert a log entry to DROP_RANGE for part {}", part_name);
LOG_TRACE(log, "Will try to insert a log entry to DROP_PART for part {}", part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
@ -7600,7 +7613,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
return false;
}
if (merge_pred.hasDropRange(part->info))
if (merge_pred.isGoingToBeDropped(part->info))
{
if (throw_if_noop)
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Already has DROP RANGE for part {} in queue.", part_name);
@ -7645,12 +7658,12 @@ bool StorageReplicatedMergeTree::dropPartImpl(
size_t clear_block_ops_size = ops.size();
/// If `part_name` is result of a recent merge and source parts are still available then
/// DROP_RANGE with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_RANGE;
/// DROP_PART with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_PART;
entry.source_replica = replica_name;
/// We don't set fake drop level (999999999) for the single part DROP_RANGE.
/// We don't set fake drop level (999999999) for the single part drop range.
/// First of all we don't guarantee anything other than the part will not be
/// active after DROP PART, but covering part (without data of dropped part) can exist.
/// active after DROP_PART, but covering part (without data of dropped part) can exist.
/// If we add part with 9999999 level than we can break invariant in virtual_parts of
/// the queue.
entry.new_part_name = getPartNamePossiblyFake(format_version, part->info);
@ -7811,12 +7824,11 @@ StorageReplicatedMergeTree::LogEntryPtr StorageReplicatedMergeTree::dropAllParts
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds)
{
MergeTreePartInfo covering_drop_range;
/// NOTE This check is just an optimization, it's not reliable for two reasons:
/// (1) drop entry could be removed concurrently and (2) it does not take REPLACE_RANGE into account.
/// NOTE This check is just an optimization, it's not reliable because drop entry could be removed concurrently.
/// See also ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck
if (queue.hasDropRange(MergeTreePartInfo::fromPartName(part_name, format_version), &covering_drop_range))
if (queue.isGoingToBeDropped(MergeTreePartInfo::fromPartName(part_name, format_version), &covering_drop_range))
{
LOG_WARNING(log, "Do not enqueue part {} for check because it's covered by DROP_RANGE {} and going to be removed",
LOG_WARNING(log, "Do not enqueue part {} for check because it's covered by drop range {} and going to be removed",
part_name, covering_drop_range.getPartNameForLogs());
return;
}
@ -8679,9 +8691,12 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
LOG_WARNING(log, "Will not create empty part instead of lost {}, because there's no covering part in replication queue", lost_part_name);
return false;
}
if (pred.hasDropRange(MergeTreePartInfo::fromPartName(covering_virtual, format_version)))
MergeTreePartInfo drop_info;
if (pred.isGoingToBeDropped(MergeTreePartInfo::fromPartName(lost_part_name, format_version), &drop_info))
{
LOG_WARNING(log, "Will not create empty part instead of lost {}, because it's covered by DROP_RANGE", lost_part_name);
LOG_WARNING(log, "Will not create empty part instead of lost {}, "
"because it's going to be removed (by range {})",
lost_part_name, drop_info.getPartNameForLogs());
return false;
}

View File

@ -178,9 +178,9 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait when replication queue size becomes less or equal than queue_size
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
bool waitForProcessingQueue(UInt64 max_wait_milliseconds = 0);
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);

View File

@ -8,7 +8,7 @@
#include "Client/Connection.h"
#include "Core/QueryProcessingStage.h"
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>

View File

@ -13,7 +13,6 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/WriteBufferFromHTTP.h>
@ -82,6 +81,10 @@ static bool urlWithGlobs(const String & uri)
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
IStorageURLBase::IStorageURLBase(
const String & uri_,
@ -632,7 +635,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
credentials,
headers,
@ -716,7 +719,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
download_threads,
headers,
@ -740,7 +743,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
headers,
@ -775,6 +778,7 @@ Pipe StorageURLWithFailover::read(
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
@ -786,7 +790,7 @@ Pipe StorageURLWithFailover::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
@ -815,7 +819,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -827,7 +831,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -896,7 +900,7 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,

View File

@ -5,7 +5,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Formats/FormatFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
@ -130,13 +130,16 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLSink>(
request_uri.toString(),
format_name,
getFormatSettings(local_context),
metadata_snapshot->getSampleBlock(),
local_context,
ConnectionTimeouts::getHTTPTimeouts(local_context),
ConnectionTimeouts::getHTTPTimeouts(
local_context->getSettingsRef(),
{local_context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
compression_method);
}

View File

@ -2,7 +2,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -76,7 +76,14 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
columns_info_uri.addQueryParameter("external_table_functions_use_nulls", toString(use_nulls));
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context), credentials);
ReadWriteBufferFromHTTP buf(
columns_info_uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
ConnectionTimeouts::getHTTPTimeouts(
context->getSettingsRef(),
{context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials);
std::string columns_info;
readStringBinary(columns_info, buf);

View File

@ -56,6 +56,13 @@ class Reviews:
logging.info("There aren't reviews for PR #%s", self.pr.number)
return False
logging.info(
"The following users have reviewed the PR:\n %s",
"\n ".join(
f"{user.login}: {review.state}" for user, review in self.reviews.items()
),
)
filtered_reviews = {
user: review
for user, review in self.reviews.items()
@ -125,7 +132,11 @@ class Reviews:
return False
return True
logging.info("The PR #%s is not approved", self.pr.number)
logging.info(
"The PR #%s is not approved by any of %s team member",
self.pr.number,
TEAM_NAME,
)
return False

View File

@ -91,7 +91,7 @@ class HTTPError(Exception):
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(
base_args, query, timeout=30, settings=None, default_format=None
base_args, query, timeout=30, settings=None, default_format=None, max_http_retries=5
):
if args.secure:
client = http.client.HTTPSConnection(
@ -120,7 +120,7 @@ def clickhouse_execute_http(
if default_format is not None:
params["default_format"] = default_format
for i in range(MAX_RETRIES):
for i in range(max_http_retries):
try:
client.request(
"POST",
@ -130,7 +130,7 @@ def clickhouse_execute_http(
data = res.read()
break
except Exception as ex:
if i == MAX_RETRIES - 1:
if i == max_http_retries - 1:
raise ex
sleep(i + 1)
@ -140,13 +140,12 @@ def clickhouse_execute_http(
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute(base_args, query, timeout=30, settings=None, max_http_retries=5):
return clickhouse_execute_http(base_args, query, timeout, settings, max_http_retries=max_http_retries).strip()
def clickhouse_execute_json(base_args, query, timeout=60, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow")
def clickhouse_execute_json(base_args, query, timeout=60, settings=None, max_http_retries=5):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow", max_http_retries=max_http_retries)
if not data:
return None
rows = []
@ -641,7 +640,7 @@ class TestCase:
clickhouse_execute(
args,
"CREATE DATABASE " + database + get_db_engine(testcase_args, database),
"CREATE DATABASE IF NOT EXISTS " + database + get_db_engine(testcase_args, database),
settings=get_create_database_settings(args, testcase_args),
)
@ -1139,7 +1138,7 @@ class TestCase:
seconds_left = max(
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
drop_database_query = "DROP DATABASE " + database
drop_database_query = "DROP DATABASE IF EXISTS " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
@ -1670,7 +1669,7 @@ def check_server_started(args):
retry_count = args.server_check_retries
while retry_count > 0:
try:
clickhouse_execute(args, "SELECT 1")
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
print(" OK")
sys.stdout.flush()
return True

View File

@ -167,7 +167,7 @@ def test_smoke():
def test_smoke_parallel():
threads = []
for _ in range(100):
for _ in range(50):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
@ -178,7 +178,7 @@ def test_smoke_parallel():
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
for _ in range(90):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()

View File

@ -251,7 +251,7 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy):
node_another_bucket = cluster.instances["node_another_bucket"]
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.restart_clickhouse()
node_another_bucket.restart_clickhouse(stop_start_wait_sec=120)
create_table(
node_another_bucket, "test", schema, attach=True, db_atomic=db_atomic, uuid=uuid
)

View File

@ -0,0 +1,6 @@
<test>
<query>SELECT count() FROM test.hits WHERE NOT ignore(countDigits(RegionID))</query>
<query>SELECT count() FROM test.hits WHERE NOT ignore(countDigits(AdvEngineID))</query>
<query>SELECT count() FROM test.hits WHERE NOT ignore(countDigits(ClientIP))</query>
<query>SELECT count() FROM test.hits WHERE NOT ignore(countDigits(WatchID))</query>
</test>

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT count() FROM numbers(20000000) WHERE NOT ignore(toDateTime64(rand(), 0))</query>
<query>SELECT count() FROM numbers(20000000) WHERE NOT ignore(toDateTime64(rand(), 3))</query>
</test>

View File

@ -0,0 +1,15 @@
runtime messages 0.001
runtime exceptions 0.05
messages shorter than 10 10
messages shorter than 16 40
exceptions shorter than 30 125
noisy messages 0.3
noisy Trace messages 0.16
noisy Debug messages 0.09
noisy Info messages 0.05
noisy Warning messages 0.01
noisy Error messages 0.02
no Fatal messages 0
number of too noisy messages 3
number of noisy messages 10
incorrect patterns 15

View File

@ -0,0 +1,64 @@
-- Tags: no-parallel, no-fasttest
-- no-parallel because we want to run this test when most of the other tests already passed
-- If this test fails, see the "Top patterns of log messages" diagnostics in the end of run.log
system flush logs;
drop table if exists logs;
create view logs as select * from system.text_log where now() - toIntervalMinute(120) < event_time;
-- Check that we don't have too many messages formatted with fmt::runtime or strings concatenation.
-- 0.001 threshold should be always enough, the value was about 0.00025
select 'runtime messages', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.001) from logs;
-- Check the same for exceptions. The value was 0.03
select 'runtime exceptions', max2(coalesce(sum(length(message_format_string) = 0) / countOrNull(), 0), 0.05) from logs where message like '%DB::Exception%';
-- Check that we don't have too many short meaningless message patterns.
select 'messages shorter than 10', max2(countDistinctOrDefault(message_format_string), 10) from logs where length(message_format_string) < 10;
-- Same as above. Feel free to update the threshold or remove this query if really necessary
select 'messages shorter than 16', max2(countDistinctOrDefault(message_format_string), 40) from logs where length(message_format_string) < 16;
-- Same as above, but exceptions must be more informative. Feel free to update the threshold or remove this query if really necessary
select 'exceptions shorter than 30', max2(countDistinctOrDefault(message_format_string), 125) from logs where length(message_format_string) < 30 and message ilike '%DB::Exception%';
-- Avoid too noisy messages: top 1 message frequency must be less than 30%. We should reduce the threshold
select 'noisy messages', max2((select count() from logs group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.30);
-- Same as above, but excluding Test level (actually finds top 1 Trace message)
with ('Access granted: {}{}', '{} -> {}') as frequent_in_tests
select 'noisy Trace messages', max2((select count() from logs where level!='Test' and message_format_string not in frequent_in_tests
group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.16);
-- Same as above for Debug
select 'noisy Debug messages', max2((select count() from logs where level <= 'Debug' group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.09);
-- Same as above for Info
select 'noisy Info messages', max2((select count() from logs where level <= 'Information' group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.05);
-- Same as above for Warning
with ('Not enabled four letter command {}') as frequent_in_tests
select 'noisy Warning messages', max2((select countOrDefault() from logs where level = 'Warning' and message_format_string not in frequent_in_tests
group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.01);
-- Same as above for Error
select 'noisy Error messages', max2((select countOrDefault() from logs where level = 'Error' group by message_format_string order by count() desc limit 1) / (select count() from logs), 0.02);
select 'no Fatal messages', count() from logs where level = 'Fatal';
-- Avoid too noisy messages: limit the number of messages with high frequency
select 'number of too noisy messages', max2(count(), 3) from (select count() / (select count() from logs) as freq, message_format_string from logs group by message_format_string having freq > 0.10);
select 'number of noisy messages', max2(count(), 10) from (select count() / (select count() from logs) as freq, message_format_string from logs group by message_format_string having freq > 0.05);
-- Each message matches its pattern (returns 0 rows)
-- FIXME maybe we should make it stricter ('Code:%Exception: '||s||'%'), but it's not easy because of addMessage
select 'incorrect patterns', max2(countDistinct(message_format_string), 15) from (
select message_format_string, any(message) as any_message from logs
where message not like (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') as s)
and message not like ('%Exception: '||s||'%') group by message_format_string
) where any_message not like '%Poco::Exception%';
drop table logs;

View File

@ -1,4 +1,4 @@
-- Tags: no-parallel
-- Tags: no-parallel, no-random-merge-tree-settings
set allow_deprecated_syntax_for_merge_tree=1;
set optimize_on_insert = 0;

View File

@ -1,7 +1,7 @@
0 2 2 0 2 3 0 2 4
1 2 2 1 2 3 1 2 4
2 3 4
10 10 19 19 39 39
2 2 2 2 2 2 2 2 2 2 2 2
0 0 0 0 0 0 0 0 0 0 0 0
1 1 1 1 1 1 1 1 1 1 1 1
3 3 3 5 5 5 10 10 10 19 19 20
2 3 4 5 6 7

View File

@ -0,0 +1,36 @@
2149-06-06 65535
2149-06-06 toUInt16(65535)
2149-06-06 toInt32(65535)
2149-06-06 toUInt32(65535)
2149-06-06 toDate(65535)
2149-06-06 CAST(65535 as UInt16)
2149-06-06 CAST(65535 as Int32)
2149-06-06 CAST(65535 as UInt32)
2149-06-06 CAST(65535 as Date)
2149-06-05 65534
2149-06-05 toUInt16(65534)
2149-06-05 toInt32(65534)
2149-06-05 toUInt32(65534)
2149-06-05 toDate(65534)
2149-06-05 CAST(65534 as UInt16)
2149-06-05 CAST(65534 as Int32)
2149-06-05 CAST(65534 as UInt32)
2149-06-05 CAST(65534 as Date)
1970-01-01 0
1970-01-01 toUInt16(0)
1970-01-01 toInt32(0)
1970-01-01 toUInt32(0)
1970-01-01 toDate(0)
1970-01-01 CAST(0 as UInt16)
1970-01-01 CAST(0 as Int32)
1970-01-01 CAST(0 as UInt32)
1970-01-01 CAST(0 as Date)
1 65536
1 toInt32(65536)
1 toUInt32(65536)
1 toDate(65536)
1 CAST(65536 as Int32)
1 CAST(65536 as UInt32)
1 CAST(65536 as Date)
1970-01-01 toUInt16(65536)
1970-01-01 CAST(65536 as UInt16)

View File

@ -0,0 +1,99 @@
DROP TABLE IF EXISTS 02540_date;
CREATE TABLE 02540_date (txt String, x Date) engine=Memory;
-- Date: Supported range of values: [1970-01-01, 2149-06-06].
-- ^----closed interval---^
INSERT INTO 02540_date VALUES('65535', 65535);
INSERT INTO 02540_date VALUES('toUInt16(65535)', toUInt16(65535)); -- #43370 weird one -> used to be 1970-01-01
INSERT INTO 02540_date VALUES('toInt32(65535)', toInt32(65535));
INSERT INTO 02540_date VALUES('toUInt32(65535)', toUInt32(65535));
INSERT INTO 02540_date VALUES('toDate(65535)', toDate(65535));
INSERT INTO 02540_date VALUES('CAST(65535 as UInt16)', CAST(65535 as UInt16));
INSERT INTO 02540_date VALUES('CAST(65535 as Int32)', CAST(65535 as Int32));
INSERT INTO 02540_date VALUES('CAST(65535 as UInt32)', CAST(65535 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65535 as Date)', CAST(65535 as Date));
INSERT INTO 02540_date VALUES('65534', 65534);
INSERT INTO 02540_date VALUES('toUInt16(65534)', toUInt16(65534));
INSERT INTO 02540_date VALUES('toInt32(65534)', toInt32(65534));
INSERT INTO 02540_date VALUES('toUInt32(65534)', toUInt32(65534));
INSERT INTO 02540_date VALUES('toDate(65534)', toDate(65534));
INSERT INTO 02540_date VALUES('CAST(65534 as UInt16)', CAST(65534 as UInt16));
INSERT INTO 02540_date VALUES('CAST(65534 as Int32)', CAST(65534 as Int32));
INSERT INTO 02540_date VALUES('CAST(65534 as UInt32)', CAST(65534 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65534 as Date)', CAST(65534 as Date));
INSERT INTO 02540_date VALUES('0', 0);
INSERT INTO 02540_date VALUES('toUInt16(0)', toUInt16(0));
INSERT INTO 02540_date VALUES('toInt32(0)', toInt32(0));
INSERT INTO 02540_date VALUES('toUInt32(0)', toUInt32(0));
INSERT INTO 02540_date VALUES('toDate(0)', toDate(0));
INSERT INTO 02540_date VALUES('CAST(0 as UInt16)', CAST(0 as UInt16));
INSERT INTO 02540_date VALUES('CAST(0 as Int32)', CAST(0 as Int32));
INSERT INTO 02540_date VALUES('CAST(0 as UInt32)', CAST(0 as UInt32));
INSERT INTO 02540_date VALUES('CAST(0 as Date)', CAST(0 as Date));
-- 65536 will be done using the TZ settings (comments in #45914)
-- We can expect either 1970-01-01 or 1970-01-02
-- time_zone.toDayNum(std::min(time_t(from), time_t(0xFFFFFFFF)))
INSERT INTO 02540_date VALUES('65536', 65536);
INSERT INTO 02540_date VALUES('toUInt16(65536)', toUInt16(65536)); -- Narrowing conversion 65536 ==> 0
INSERT INTO 02540_date VALUES('toInt32(65536)', toInt32(65536));
INSERT INTO 02540_date VALUES('toUInt32(65536)', toUInt32(65536));
INSERT INTO 02540_date VALUES('toDate(65536)', toDate(65536));
INSERT INTO 02540_date VALUES('CAST(65536 as UInt16)', CAST(65536 as UInt16)); -- Narrowing conversion 65536 ==> 0
INSERT INTO 02540_date VALUES('CAST(65536 as Int32)', CAST(65536 as Int32));
INSERT INTO 02540_date VALUES('CAST(65536 as UInt32)', CAST(65536 as UInt32));
INSERT INTO 02540_date VALUES('CAST(65536 as Date)', CAST(65536 as Date));
SELECT x, txt FROM 02540_date WHERE txt == '65535';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(65535)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65535 as Date)';
SELECT x, txt FROM 02540_date WHERE txt == '65534';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(65534)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65534 as Date)';
SELECT x, txt FROM 02540_date WHERE txt == '0';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(0)';
SELECT x, txt FROM 02540_date WHERE txt == 'toInt32(0)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt32(0)';
SELECT x, txt FROM 02540_date WHERE txt == 'toDate(0)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(0 as UInt16)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(0 as Int32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(0 as UInt32)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(0 as Date)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == '65536';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'toInt32(65536)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'toUInt32(65536)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'toDate(65536)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'CAST(65536 as Int32)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'CAST(65536 as UInt32)';
SELECT (x == CAST(65536 as Date)), txt FROM 02540_date WHERE txt == 'CAST(65536 as Date)';
SELECT x, txt FROM 02540_date WHERE txt == 'toUInt16(65536)';
SELECT x, txt FROM 02540_date WHERE txt == 'CAST(65536 as UInt16)';

View File

@ -0,0 +1,19 @@
1
1
1
5
5
20
19
39
78
39
77
8
9
10
11
8
9
10
11

View File

@ -0,0 +1,22 @@
SELECT countDigits(0);
SELECT countDigits(1);
SELECT countDigits(-1);
SELECT countDigits(12345);
SELECT countDigits(-12345);
SELECT countDigits(0xFFFFFFFFFFFFFFFF);
SELECT countDigits(CAST(0x8000000000000000 AS Int64));
SELECT countDigits(CAST(-1 AS UInt128));
SELECT countDigits(CAST(-1 AS UInt256));
SELECT countDigits(CAST(CAST(-1 AS UInt128) DIV 2 + 1 AS Int128));
SELECT countDigits(CAST(CAST(-1 AS UInt256) DIV 2 + 1 AS Int256));
SELECT countDigits(-123.45678::Decimal32(5));
SELECT countDigits(-123.456789::Decimal64(6));
SELECT countDigits(-123.4567890::Decimal128(7));
SELECT countDigits(-123.45678901::Decimal256(8));
-- this behavior can be surprising, but actually reasonable:
SELECT countDigits(-123.456::Decimal32(5));
SELECT countDigits(-123.4567::Decimal64(6));
SELECT countDigits(-123.45678::Decimal128(7));
SELECT countDigits(-123.456789::Decimal256(8));