Merge branch 'master' into tavplubix-patch-2

This commit is contained in:
Alexander Tokmakov 2023-02-09 22:42:23 +03:00 committed by GitHub
commit 9e5e3e7ca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
123 changed files with 1777 additions and 811 deletions

View File

@ -433,6 +433,11 @@ else()
link_libraries(global-group)
endif ()
option (ENABLE_GWP_ASAN "Enable Gwp-Asan" ON)
if (NOT OS_LINUX AND NOT OS_ANDROID)
set(ENABLE_GWP_ASAN OFF)
endif ()
option(WERROR "Enable -Werror compiler option" ON)
if (WERROR)

View File

@ -127,10 +127,14 @@
/// because SIGABRT is easier to debug than SIGTRAP (the second one makes gdb crazy)
#if !defined(chassert)
#if defined(ABORT_ON_LOGICAL_ERROR)
#define chassert(x) static_cast<bool>(x) ? void(0) : abortOnFailedAssertion(#x)
#define chassert(x) static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x)
#define UNREACHABLE() abort()
#else
#define chassert(x) ((void)0)
/// Here sizeof() trick is used to suppress unused warning for result,
/// since simple "(void)x" will evaluate the expression, while
/// "sizeof(!(x))" will not.
#define NIL_EXPRESSION(x) (void)sizeof(!(x))
#define chassert(x) NIL_EXPRESSION(x)
#define UNREACHABLE() __builtin_unreachable()
#endif
#endif

View File

@ -114,6 +114,7 @@ endif()
add_contrib (llvm-project-cmake llvm-project)
add_contrib (libfuzzer-cmake llvm-project)
add_contrib (gwpasan-cmake llvm-project)
add_contrib (libxml2-cmake libxml2)
add_contrib (aws-cmake

2
contrib/capnproto vendored

@ -1 +1 @@
Subproject commit 2e88221d3dde22266bfccf40eaee6ff9b40d113d
Subproject commit e19cd661e49dd9022d3f920b69d843333b896451

View File

@ -0,0 +1,24 @@
if (NOT ENABLE_GWP_ASAN)
message (STATUS "Not using gwp-asan")
return ()
endif ()
set(COMPILER_RT_GWP_ASAN_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/compiler-rt/lib/gwp_asan")
set(GWP_ASAN_SOURCES
${COMPILER_RT_GWP_ASAN_SRC_DIR}/common.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/crash_handler.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/common_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/guarded_pool_allocator_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/mutex_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/platform_specific/utilities_posix.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/guarded_pool_allocator.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/stack_trace_compressor.cpp
${COMPILER_RT_GWP_ASAN_SRC_DIR}/optional/options_parser.cpp
)
set(GWP_ASAN_HEADERS "${ClickHouse_SOURCE_DIR}/contrib/llvm-project/compiler-rt/lib")
add_library(_gwp_asan ${GWP_ASAN_SOURCES})
target_include_directories (_gwp_asan SYSTEM PUBLIC ${GWP_ASAN_HEADERS})
add_library(ch_contrib::gwp_asan ALIAS _gwp_asan)

View File

@ -105,6 +105,9 @@ set(SRCS
if (ARCH_AMD64)
find_program(YASM_PATH NAMES yasm)
if (NOT YASM_PATH)
message(FATAL_ERROR "Please install the Yasm assembler")
endif ()
add_custom_command(
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/crc_iscsi_v_pcl.o
COMMAND ${YASM_PATH} -f x64 -f elf64 -X gnu -g dwarf2 -D LINUX -o ${CMAKE_CURRENT_BINARY_DIR}/crc_iscsi_v_pcl.o ${HDFS3_SOURCE_DIR}/common/crc_iscsi_v_pcl.asm

View File

@ -402,7 +402,7 @@ start
# NOTE Hung check is implemented in docker/tests/stress/stress
rg -Fa "No queries hung" /test_output/test_results.tsv | grep -Fa "OK" \
|| echo -e "Hung check failed, possible deadlock found (see hung_check.log)$FAIL$(head_escaped /test_output/hung_check.log | unts)"
|| echo -e "Hung check failed, possible deadlock found (see hung_check.log)$FAIL$(head_escaped /test_output/hung_check.log | unts)" >> /test_output/test_results.tsv
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.stress.log

View File

@ -17,10 +17,10 @@ Supported platforms:
The following tutorial is based on the Ubuntu Linux system. With appropriate changes, it should also work on any other Linux distribution.
### Install Git, CMake, Python and Ninja {#install-git-cmake-python-and-ninja}
### Install Prerequisites {#install-prerequisites}
``` bash
sudo apt-get install git cmake ccache python3 ninja-build
sudo apt-get install git cmake ccache python3 ninja-build yasm gawk
```
Or cmake3 instead of cmake on older systems.
@ -87,13 +87,15 @@ The build requires the following components:
- Ninja
- C++ compiler: clang-14 or newer
- Linker: lld
- Yasm
- Gawk
If all the components are installed, you may build in the same way as the steps above.
Example for Ubuntu Eoan:
``` bash
sudo apt update
sudo apt install git cmake ninja-build clang++ python
sudo apt install git cmake ninja-build clang++ python yasm gawk
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
mkdir build && cd build
cmake ../ClickHouse
@ -102,7 +104,7 @@ ninja
Example for OpenSUSE Tumbleweed:
``` bash
sudo zypper install git cmake ninja clang-c++ python lld
sudo zypper install git cmake ninja clang-c++ python lld yasm gawk
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
mkdir build && cd build
cmake ../ClickHouse
@ -112,7 +114,7 @@ ninja
Example for Fedora Rawhide:
``` bash
sudo yum update
sudo yum --nogpg install git cmake make clang python3 ccache
sudo yum --nogpg install git cmake make clang python3 ccache yasm gawk
git clone --recursive https://github.com/ClickHouse/ClickHouse.git
mkdir build && cd build
cmake ../ClickHouse

View File

@ -22,3 +22,8 @@ List of supported integrations:
- [PostgreSQL](../../../engines/table-engines/integrations/postgresql.md)
- [SQLite](../../../engines/table-engines/integrations/sqlite.md)
- [Hive](../../../engines/table-engines/integrations/hive.md)
- [ExternalDistributed](../../../engines/table-engines/integrations/ExternalDistributed.md)
- [MaterializedPostgreSQL](../../../engines/table-engines/integrations/materialized-postgresql.md)
- [NATS](../../../engines/table-engines/integrations/nats.md)
- [DeltaLake](../../../engines/table-engines/integrations/deltalake.md)
- [Hudi](../../../engines/table-engines/integrations/hudi.md)

View File

@ -683,6 +683,11 @@ Example:
## JSONColumns {#jsoncolumns}
:::tip
The output of the JSONColumns* formats provides the ClickHouse field name and then the content of each row of the table for that field;
visually, the data is rotated 90 degrees to the left.
:::
In this format, all data is represented as a single JSON Object.
Note that JSONColumns output format buffers all data in memory to output it as a single block and it can lead to high memory consumption.

View File

@ -3310,6 +3310,15 @@ SELECT
FROM fuse_tbl
```
## optimize_rewrite_aggregate_function_with_if
Rewrite aggregate functions with if expression as argument when logically equivalent.
For example, `avg(if(cond, col, null))` can be rewritten to `avgOrNullIf(cond, col)`. It may improve performance.
:::note
Supported only with experimental analyzer (`allow_experimental_analyzer = 1`).
:::
## allow_experimental_database_replicated {#allow_experimental_database_replicated}
Enables to create databases with [Replicated](../../engines/database-engines/replicated.md) engine.

View File

@ -493,3 +493,41 @@ Result:
│ 0 │
└────────────────────────────────────────────────────────────────────┘
```
## reverseDNSQuery
Performs a reverse DNS query to get the PTR records associated with the IP address.
**Syntax**
``` sql
reverseDNSQuery(address)
```
This function performs reverse DNS resolutions on both IPv4 and IPv6.
**Arguments**
- `address` — An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md).
**Returned value**
- Associated domains (PTR records).
Type: Type: [Array(String)](../../sql-reference/data-types/array.md).
**Example**
Query:
``` sql
SELECT reverseDNSQuery('192.168.0.2');
```
Result:
``` text
┌─reverseDNSQuery('192.168.0.2')────────────┐
│ ['test2.example.com','test3.example.com'] │
└───────────────────────────────────────────┘
```

View File

@ -26,7 +26,6 @@
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>

View File

@ -1,7 +1,7 @@
#include "ClusterCopierApp.h"
#include <Common/StatusFile.h>
#include <Common/TerminalSize.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/registerFormats.h>
#include <Common/scope_guard_safe.h>
#include <unistd.h>

View File

@ -433,7 +433,6 @@ extern "C"
}
#endif
/// This allows to implement assert to forbid initialization of a class in static constructors.
/// Usage:
///

View File

@ -1,6 +1,6 @@
#include "LibraryBridgeHelper.h"
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
@ -12,7 +12,7 @@ LibraryBridgeHelper::LibraryBridgeHelper(ContextPtr context_)
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, bridge_host(config.getString("library_bridge.host", DEFAULT_HOST))
, bridge_port(config.getUInt("library_bridge.port", DEFAULT_PORT))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
, http_timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
}

View File

@ -12,7 +12,7 @@
#include <Common/BridgeProtocolVersion.h>
#include <Common/ShellCommand.h>
#include <Common/logger_useful.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <base/range.h>
#include <BridgeHelper/IBridgeHelper.h>
@ -98,7 +98,7 @@ protected:
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, getHTTPTimeouts(), credentials);
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
@ -161,6 +161,10 @@ private:
Poco::Net::HTTPBasicCredentials credentials{};
ConnectionTimeouts getHTTPTimeouts()
{
return ConnectionTimeouts::getHTTPTimeouts(getContext()->getSettingsRef(), {getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
@ -195,7 +199,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
bool res;
readBoolText(res, buf);
@ -217,7 +221,7 @@ protected:
uri.addQueryParameter("connection_string", getConnectionString());
uri.addQueryParameter("use_connection_pooling", toString(use_connection_pooling));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, getHTTPTimeouts(), credentials);
std::string character;
readStringBinary(character, buf);

View File

@ -304,6 +304,11 @@ if (TARGET ch_contrib::llvm)
dbms_target_link_libraries (PUBLIC ch_contrib::llvm)
endif ()
if (TARGET ch_contrib::gwp_asan)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::gwp_asan)
target_link_libraries (clickhouse_new_delete PRIVATE ch_contrib::gwp_asan)
endif()
# Otherwise it will slow down stack traces printing too much.
set_source_files_properties(
Common/Elf.cpp
@ -337,14 +342,13 @@ set_source_files_properties(
PROPERTIES COMPILE_FLAGS "-mwaitpkg")
endif ()
target_link_libraries(common PUBLIC ch_contrib::re2_st)
target_link_libraries(common PUBLIC ch_contrib::re2)
target_link_libraries(clickhouse_common_io
PUBLIC
boost::program_options
boost::system
ch_contrib::cityhash
ch_contrib::re2
ch_contrib::re2_st
ch_contrib::zlib
pcg_random
Poco::Foundation

View File

@ -1616,14 +1616,28 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->settings_ast)
if (const auto * select = parsed_query->as<ASTSelectQuery>(); select && select->settings())
apply_query_settings(*select->settings());
else if (const auto * select_with_union = parsed_query->as<ASTSelectWithUnionQuery>())
{
const ASTs & children = select_with_union->list_of_selects->children;
if (!children.empty())
{
// On the client it is enough to apply settings only for the
// last SELECT, since the only thing that is important to apply
// on the client is format settings.
const auto * last_select = children.back()->as<ASTSelectQuery>();
if (last_select && last_select->settings())
{
apply_query_settings(*last_select->settings());
}
}
}
else if (const auto * query_with_output = parsed_query->as<ASTQueryWithOutput>(); query_with_output && query_with_output->settings_ast)
apply_query_settings(*query_with_output->settings_ast);
else if (insert && insert->settings_ast)
apply_query_settings(*insert->settings_ast);
/// FIXME: try to prettify this cast using `as<>()`
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
if (!connection->checkConnected(connection_parameters.timeouts))
connect();

View File

@ -16,9 +16,9 @@ namespace DB
static void callback(void * arg, int status, int, struct hostent * host)
{
auto * ptr_records = static_cast<std::unordered_set<std::string>*>(arg);
if (ptr_records && status == ARES_SUCCESS)
if (status == ARES_SUCCESS)
{
auto * ptr_records = static_cast<std::unordered_set<std::string>*>(arg);
/*
* In some cases (e.g /etc/hosts), hostent::h_name is filled and hostent::h_aliases is empty.
* Thus, we can't rely solely on hostent::h_aliases. More info on:
@ -81,7 +81,12 @@ namespace DB
std::unordered_set<std::string> ptr_records;
resolve(ip, ptr_records);
wait();
if (!wait_and_process())
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}
return ptr_records;
}
@ -93,7 +98,12 @@ namespace DB
std::unordered_set<std::string> ptr_records;
resolve_v6(ip, ptr_records);
wait();
if (!wait_and_process())
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}
return ptr_records;
}
@ -115,7 +125,7 @@ namespace DB
ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET6, callback, &response);
}
void CaresPTRResolver::wait()
bool CaresPTRResolver::wait_and_process()
{
int sockets[ARES_GETSOCK_MAXNUM];
pollfd pollfd[ARES_GETSOCK_MAXNUM];
@ -129,6 +139,21 @@ namespace DB
if (!readable_sockets.empty())
{
number_of_fds_ready = poll(readable_sockets.data(), static_cast<nfds_t>(readable_sockets.size()), static_cast<int>(timeout));
bool poll_error = number_of_fds_ready < 0;
bool is_poll_error_an_interrupt = poll_error && errno == EINTR;
/*
* Retry in case of interrupts and return false in case of actual errors.
* */
if (is_poll_error_an_interrupt)
{
continue;
}
else if (poll_error)
{
return false;
}
}
if (number_of_fds_ready > 0)
@ -141,6 +166,13 @@ namespace DB
break;
}
}
return true;
}
void CaresPTRResolver::cancel_requests()
{
ares_cancel(channel);
}
std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd)
@ -149,7 +181,7 @@ namespace DB
int number_of_sockets_to_poll = 0;
for (int i = 0; i < ARES_GETSOCK_MAXNUM; i++, number_of_sockets_to_poll++)
for (int i = 0; i < ARES_GETSOCK_MAXNUM; i++)
{
pollfd[i].events = 0;
pollfd[i].revents = 0;
@ -157,7 +189,12 @@ namespace DB
if (ARES_GETSOCK_READABLE(sockets_bitmask, i))
{
pollfd[i].fd = sockets[i];
pollfd[i].events = POLLIN;
pollfd[i].events = C_ARES_POLL_EVENTS;
}
if (pollfd[i].events)
{
number_of_sockets_to_poll++;
}
else
{
@ -192,7 +229,7 @@ namespace DB
{
for (auto readable_socket : readable_sockets)
{
auto fd = readable_socket.revents & POLLIN ? readable_socket.fd : ARES_SOCKET_BAD;
auto fd = readable_socket.revents & C_ARES_POLL_EVENTS ? readable_socket.fd : ARES_SOCKET_BAD;
ares_process_fd(channel, fd, ARES_SOCKET_BAD);
}
}

View File

@ -23,6 +23,9 @@ namespace DB
* Allow only DNSPTRProvider to instantiate this class
* */
struct provider_token {};
static constexpr auto C_ARES_POLL_EVENTS = POLLRDNORM | POLLIN;
public:
explicit CaresPTRResolver(provider_token);
~CaresPTRResolver() override;
@ -32,7 +35,9 @@ namespace DB
std::unordered_set<std::string> resolve_v6(const std::string & ip) override;
private:
void wait();
bool wait_and_process();
void cancel_requests();
void resolve(const std::string & ip, std::unordered_set<std::string> & response);

View File

@ -157,6 +157,19 @@ static void deleteAttributesRecursive(Node * root)
}
}
static void mergeAttributes(Element & config_element, Element & with_element)
{
auto * with_element_attributes = with_element.attributes();
for (size_t i = 0; i < with_element_attributes->length(); ++i)
{
auto * attr = with_element_attributes->item(i);
config_element.setAttribute(attr->nodeName(), attr->getNodeValue());
}
with_element_attributes->release();
}
void ConfigProcessor::mergeRecursive(XMLDocumentPtr config, Node * config_root, const Node * with_root)
{
const NodeListPtr with_nodes = with_root->childNodes();
@ -211,6 +224,9 @@ void ConfigProcessor::mergeRecursive(XMLDocumentPtr config, Node * config_root,
}
else
{
Element & config_element = dynamic_cast<Element &>(*config_node);
mergeAttributes(config_element, with_element);
mergeRecursive(config, config_node, with_node);
}
merged = true;

View File

@ -16,6 +16,7 @@
#include <IO/WriteHelpers.h>
#include <Common/Exception.h>
#include <base/defines.h>
#include <base/types.h>
@ -112,11 +113,13 @@ public:
}
catch (...)
{
close(fd);
int err = close(fd);
chassert(!err || errno == EINTR);
throw;
}
close(fd);
int err = close(fd);
chassert(!err || errno == EINTR);
return res;
}
@ -180,11 +183,13 @@ public:
}
catch (...)
{
close(fd);
int err = close(fd);
chassert(!err || errno == EINTR);
throw;
}
close(fd);
int err = close(fd);
chassert(!err || errno == EINTR);
}
private:

View File

@ -2,6 +2,7 @@
#include "Epoll.h"
#include <Common/Exception.h>
#include <base/defines.h>
#include <unistd.h>
namespace DB
@ -78,7 +79,10 @@ size_t Epoll::getManyReady(int max_events, epoll_event * events_out, bool blocki
Epoll::~Epoll()
{
if (epoll_fd != -1)
close(epoll_fd);
{
int err = close(epoll_fd);
chassert(!err || errno == EINTR);
}
}
}

View File

@ -3,6 +3,7 @@
#include <Common/EventFD.h>
#include <Common/Exception.h>
#include <base/defines.h>
#include <sys/eventfd.h>
#include <unistd.h>
@ -55,7 +56,10 @@ bool EventFD::write(uint64_t increase) const
EventFD::~EventFD()
{
if (fd != -1)
close(fd);
{
int err = close(fd);
chassert(!err || errno == EINTR);
}
}
}

View File

@ -7,6 +7,7 @@
#include <IO/ReadHelpers.h>
#include <base/find_symbols.h>
#include <base/defines.h>
#include <Common/logger_useful.h>
#include <cassert>
@ -102,7 +103,8 @@ ProcfsMetricsProvider::ProcfsMetricsProvider(pid_t /*tid*/)
thread_stat_fd = ::open(thread_stat, O_RDONLY | O_CLOEXEC);
if (-1 == thread_stat_fd)
{
::close(thread_schedstat_fd);
int err = ::close(thread_schedstat_fd);
chassert(!err || errno == EINTR);
throwWithFailedToOpenFile(thread_stat);
}
thread_io_fd = ::open(thread_io, O_RDONLY | O_CLOEXEC);

View File

@ -6,6 +6,7 @@
#include <Common/StackTrace.h>
#include <Common/thread_local_rng.h>
#include <Common/logger_useful.h>
#include <base/defines.h>
#include <base/phdr_cache.h>
#include <base/errnoToString.h>
@ -186,8 +187,10 @@ void QueryProfilerBase<ProfilerImpl>::tryCleanup()
#if USE_UNWIND
if (timer_id.has_value())
{
if (timer_delete(*timer_id))
int err = timer_delete(*timer_id);
if (err)
LOG_ERROR(log, "Failed to delete query profiler timer {}", errnoToString());
chassert(!err && "Failed to delete query profiler timer");
timer_id.reset();
}

View File

@ -5,9 +5,10 @@
#include <cerrno>
#include <Common/logger_useful.h>
#include <base/errnoToString.h>
#include <Common/ClickHouseRevision.h>
#include <Common/LocalDateTime.h>
#include <base/errnoToString.h>
#include <base/defines.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
@ -88,7 +89,8 @@ StatusFile::StatusFile(std::string path_, FillFunction fill_)
}
catch (...)
{
close(fd);
int err = close(fd);
chassert(!err || errno == EINTR);
throw;
}
}

View File

@ -1,5 +1,6 @@
#include "TaskStatsInfoGetter.h"
#include <Common/Exception.h>
#include <base/defines.h>
#include <base/types.h>
#include <unistd.h>
@ -280,7 +281,10 @@ TaskStatsInfoGetter::TaskStatsInfoGetter()
catch (...)
{
if (netlink_socket_fd >= 0)
close(netlink_socket_fd);
{
int err = close(netlink_socket_fd);
chassert(!err || errno == EINTR);
}
throw;
}
}
@ -314,7 +318,10 @@ void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid) const
TaskStatsInfoGetter::~TaskStatsInfoGetter()
{
if (netlink_socket_fd >= 0)
close(netlink_socket_fd);
{
int err = close(netlink_socket_fd);
chassert(!err || errno == EINTR);
}
}
}

View File

@ -1,6 +1,7 @@
#if defined(OS_LINUX)
#include <Common/TimerDescriptor.h>
#include <Common/Exception.h>
#include <base/defines.h>
#include <sys/timerfd.h>
#include <fcntl.h>
@ -36,7 +37,10 @@ TimerDescriptor::~TimerDescriptor()
{
/// Do not check for result cause cannot throw exception.
if (timer_fd != -1)
close(timer_fd);
{
int err = close(timer_fd);
chassert(!err || errno == EINTR);
}
}
void TimerDescriptor::reset() const

View File

@ -24,6 +24,7 @@
#cmakedefine01 USE_ODBC
#cmakedefine01 USE_REPLXX
#cmakedefine01 USE_JEMALLOC
#cmakedefine01 USE_GWP_ASAN
#cmakedefine01 USE_H3
#cmakedefine01 USE_S2_GEOMETRY
#cmakedefine01 USE_FASTOPS

View File

@ -17,6 +17,12 @@
# include <cstdlib>
#endif
#if USE_GWP_ASAN
# include <gwp_asan/guarded_pool_allocator.h>
static gwp_asan::GuardedPoolAllocator GuardedAlloc;
#endif
namespace Memory
{
@ -29,6 +35,23 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void * newImpl(std::size_t size, TAlign... align)
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if constexpr (sizeof...(TAlign) == 1)
{
if (void * ptr = GuardedAlloc.allocate(size, alignToSizeT(align...)))
return ptr;
}
else
{
if (void * ptr = GuardedAlloc.allocate(size))
return ptr;
}
}
#endif
void * ptr = nullptr;
if constexpr (sizeof...(TAlign) == 1)
ptr = aligned_alloc(alignToSizeT(align...), size);
@ -44,16 +67,37 @@ inline ALWAYS_INLINE void * newImpl(std::size_t size, TAlign... align)
inline ALWAYS_INLINE void * newNoExept(std::size_t size) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if (void * ptr = GuardedAlloc.allocate(size))
return ptr;
}
#endif
return malloc(size);
}
inline ALWAYS_INLINE void * newNoExept(std::size_t size, std::align_val_t align) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.shouldSample()))
{
if (void * ptr = GuardedAlloc.allocate(size, alignToSizeT(align)))
return ptr;
}
#endif
return aligned_alloc(static_cast<size_t>(align), size);
}
inline ALWAYS_INLINE void deleteImpl(void * ptr) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
free(ptr);
}
@ -66,6 +110,14 @@ inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size, TAlign... al
if (unlikely(ptr == nullptr))
return;
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
if constexpr (sizeof...(TAlign) == 1)
sdallocx(ptr, size, MALLOCX_ALIGN(alignToSizeT(align...)));
else
@ -78,6 +130,13 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unused]], TAlign... /* align */) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
GuardedAlloc.deallocate(ptr);
return;
}
#endif
free(ptr);
}
@ -122,6 +181,16 @@ template <std::same_as<std::align_val_t>... TAlign>
requires DB::OptionalArgument<TAlign...>
inline ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0, TAlign... align [[maybe_unused]]) noexcept
{
#if USE_GWP_ASAN
if (unlikely(GuardedAlloc.pointerIsMine(ptr)))
{
if (!size)
size = GuardedAlloc.getSize(ptr);
CurrentMemoryTracker::free(size);
return;
}
#endif
try
{
#if USE_JEMALLOC

View File

@ -1,4 +1,5 @@
#include <cassert>
#include <iostream>
#include <new>
#include "config.h"
#include <Common/memory.h>
@ -41,6 +42,26 @@ static struct InitializeJemallocZoneAllocatorForOSX
} initializeJemallocZoneAllocatorForOSX;
#endif
#if USE_GWP_ASAN
#include <gwp_asan/optional/options_parser.h>
/// Both clickhouse_new_delete and clickhouse_common_io links gwp_asan, but It should only init once, otherwise it
/// will cause unexpected deadlock.
static struct InitGwpAsan
{
InitGwpAsan()
{
gwp_asan::options::initOptions();
gwp_asan::options::Options &opts = gwp_asan::options::getOptions();
GuardedAlloc.init(opts);
///std::cerr << "GwpAsan is initialized, the options are { Enabled: " << opts.Enabled
/// << ", MaxSimultaneousAllocations: " << opts.MaxSimultaneousAllocations
/// << ", SampleRate: " << opts.SampleRate << " }\n";
}
} init_gwp_asan;
#endif
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new

View File

@ -0,0 +1,57 @@
#include <gtest/gtest.h>
#include <thread>
#include <Common/DNSPTRResolverProvider.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/IPAddress.h>
#include <random>
namespace DB
{
TEST(Common, ReverseDNS)
{
auto addresses = std::vector<std::string>({
"8.8.8.8", "2001:4860:4860::8888", // dns.google
"142.250.219.35", // google.com
"157.240.12.35", // facebook
"208.84.244.116", "2600:1419:c400::214:c410", //www.terra.com.br,
"127.0.0.1", "::1"
});
auto func = [&]()
{
// Good random seed, good engine
auto rnd1 = std::mt19937(std::random_device{}());
for (int i = 0; i < 50; ++i)
{
auto & dns_resolver_instance = DNSResolver::instance();
// unfortunately, DNS cache can't be disabled because we might end up causing a DDoS attack
// dns_resolver_instance.setDisableCacheFlag();
auto addr_index = rnd1() % addresses.size();
[[maybe_unused]] auto result = dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ addresses[addr_index] });
// will not assert either because some of the IP addresses might change in the future and
// this test will become flaky
// ASSERT_TRUE(!result.empty());
}
};
auto number_of_threads = 200u;
std::vector<std::thread> threads;
threads.reserve(number_of_threads);
for (auto i = 0u; i < number_of_threads; i++)
{
threads.emplace_back(func);
}
for (auto & thread : threads)
{
thread.join();
}
}
}

View File

@ -2,6 +2,9 @@
#include <Common/VersionNumber.h>
#include <Poco/Environment.h>
#include <Common/Stopwatch.h>
/// for abortOnFailedAssertion() via chassert() (dependency chain looks odd)
#include <Common/Exception.h>
#include <base/defines.h>
#include <fcntl.h>
#include <sys/wait.h>
@ -105,7 +108,8 @@ static PollPidResult pollPid(pid_t pid, int timeout_in_ms)
if (ready <= 0)
return PollPidResult::FAILED;
close(pid_fd);
int err = close(pid_fd);
chassert(!err || errno == EINTR);
return PollPidResult::RESTART;
}

View File

@ -173,14 +173,12 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecFPC(CompressionCodecFactory & factory);
#endif
CompressionCodecFactory::CompressionCodecFactory()

View File

@ -1,5 +1,6 @@
#include <cerrno>
#include <base/errnoToString.h>
#include <base/defines.h>
#include <future>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h>
@ -471,13 +472,15 @@ static int bufferFromFile(Poco::Logger * log, const std::string & path, nuraft::
if (chunk == MAP_FAILED)
{
LOG_WARNING(log, "Error mmapping {}, error: {}, errno: {}", path, errnoToString(), errno);
::close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
return errno;
}
data_out = nuraft::buffer::alloc(file_size);
data_out->put_raw(chunk, file_size);
::munmap(chunk, file_size);
::close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
return 0;
}

View File

@ -52,6 +52,8 @@
/// the number is unmotivated
#define DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT 15
#define DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT 10
#define DBMS_DEFAULT_PATH "/var/lib/clickhouse/"
/// Actually, there may be multiple acquisitions of different locks for a given table within one query.

View File

@ -54,7 +54,6 @@ namespace
return applyVisitor(FieldVisitorConvertToNumber<T>(), f);
}
#ifndef KEEPER_STANDALONE_BUILD
Map stringToMap(const String & str)
{
/// Allow empty string as an empty map
@ -71,7 +70,7 @@ namespace
return (*column)[0].safeGet<Map>();
}
Map fieldToMap(const Field & f)
[[maybe_unused]] Map fieldToMap(const Field & f)
{
if (f.getType() == Field::Types::String)
{
@ -82,7 +81,6 @@ namespace
return f.safeGet<const Map &>();
}
#endif
}
@ -327,6 +325,13 @@ void SettingFieldString::readBinary(ReadBuffer & in)
*this = std::move(str);
}
/// Unbeautiful workaround for clickhouse-keeper standalone build ("-DBUILD_STANDALONE_KEEPER=1").
/// In this build, we don't build and link library dbms (to which SettingsField.cpp belongs) but
/// only build SettingsField.cpp. Further dependencies, e.g. DataTypeString and DataTypeMap below,
/// require building of further files for clickhouse-keeper. To keep dependencies slim, we don't do
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -239,8 +239,6 @@ struct SettingFieldString
void readBinary(ReadBuffer & in);
};
#ifndef KEEPER_STANDALONE_BUILD
struct SettingFieldMap
{
public:
@ -264,8 +262,6 @@ public:
void readBinary(ReadBuffer & in);
};
#endif
struct SettingFieldChar
{
public:

View File

@ -6,6 +6,7 @@
#include <Daemon/SentryWriter.h>
#include <Parsers/toOneLineQuery.h>
#include <base/errnoToString.h>
#include <base/defines.h>
#include <sys/stat.h>
#include <sys/types.h>
@ -568,6 +569,7 @@ std::string BaseDaemon::getDefaultConfigFileName() const
void BaseDaemon::closeFDs()
{
/// NOTE: may benefit from close_range() (linux 5.9+)
#if defined(OS_FREEBSD) || defined(OS_DARWIN)
fs::path proc_path{"/dev/fd"};
#else
@ -584,7 +586,13 @@ void BaseDaemon::closeFDs()
for (const auto & fd : fds)
{
if (fd > 2 && fd != signal_pipe.fds_rw[0] && fd != signal_pipe.fds_rw[1])
::close(fd);
{
int err = ::close(fd);
/// NOTE: it is OK to ignore error here since at least one fd
/// is already closed (for proc_path), and there can be some
/// tricky cases, likely.
(void)err;
}
}
}
else
@ -597,8 +605,16 @@ void BaseDaemon::closeFDs()
#endif
max_fd = 256; /// bad fallback
for (int fd = 3; fd < max_fd; ++fd)
{
if (fd != signal_pipe.fds_rw[0] && fd != signal_pipe.fds_rw[1])
::close(fd);
{
int err = ::close(fd);
/// NOTE: it is OK to get EBADF here, since it is simply
/// iterator over all possible fds, without any checks does
/// this process has this fd or not.
(void)err;
}
}
}
}
@ -701,7 +717,10 @@ void BaseDaemon::initialize(Application & self)
if ((fd = creat(stderr_path.c_str(), 0600)) == -1 && errno != EEXIST)
throw Poco::OpenFileException("File " + stderr_path + " (logger.stderr) is not writable");
if (fd != -1)
::close(fd);
{
int err = ::close(fd);
chassert(!err || errno == EINTR);
}
}
if (!freopen(stderr_path.c_str(), "a+", stderr))

View File

@ -183,10 +183,11 @@ namespace
/// joinGet(join_storage_table_name, `value_column`, join_keys)
addQualifiedNameFromArgument(function, 0);
}
else if (function.name == "in" || function.name == "notIn" || function.name == "globalIn" || function.name == "globalNotIn")
else if (functionIsInOrGlobalInOperator(function.name))
{
/// in(x, table_name) - function for evaluating (x IN table_name)
addQualifiedNameFromArgument(function, 1);
/// x IN table_name.
/// We set evaluate=false here because we don't want to evaluate a subquery in "x IN subquery".
addQualifiedNameFromArgument(function, 1, /* evaluate= */ false);
}
else if (function.name == "dictionary")
{
@ -248,7 +249,10 @@ namespace
if (has_local_replicas && !table_function)
{
auto maybe_qualified_name = tryGetQualifiedNameFromArgument(function, 1, /* apply_current_database= */ false);
/// We set `apply_current_database=false` here because if this argument is an identifier without dot,
/// then it's not the name of a table within the current database, it's the name of a database, and
/// the name of a table will be in the following argument.
auto maybe_qualified_name = tryGetQualifiedNameFromArgument(function, 1, /* evaluate= */ true, /* apply_current_database= */ false);
if (!maybe_qualified_name)
return;
auto & qualified_name = *maybe_qualified_name;
@ -271,7 +275,7 @@ namespace
}
/// Gets an argument as a string, evaluates constants if necessary.
std::optional<String> tryGetStringFromArgument(const ASTFunction & function, size_t arg_idx) const
std::optional<String> tryGetStringFromArgument(const ASTFunction & function, size_t arg_idx, bool evaluate = true) const
{
if (!function.arguments)
return {};
@ -281,28 +285,41 @@ namespace
return {};
const auto & arg = args[arg_idx];
ASTPtr evaluated;
try
if (evaluate)
{
evaluated = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
try
{
/// We're just searching for dependencies here, it's not safe to execute subqueries now.
auto evaluated = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
const auto * literal = evaluated->as<ASTLiteral>();
if (!literal || (literal->value.getType() != Field::Types::String))
return {};
return literal->value.safeGet<String>();
}
catch (...)
{
return {};
}
}
catch (...)
else
{
if (const auto * id = arg->as<ASTIdentifier>())
return id->name();
if (const auto * literal = arg->as<ASTLiteral>())
{
if (literal->value.getType() == Field::Types::String)
return literal->value.safeGet<String>();
}
return {};
}
const auto * literal = evaluated->as<ASTLiteral>();
if (!literal || (literal->value.getType() != Field::Types::String))
return {};
return literal->value.safeGet<String>();
}
/// Gets an argument as a qualified table name.
/// Accepts forms db_name.table_name (as an identifier) and 'db_name.table_name' (as a string).
/// The function doesn't replace an empty database name with the current_database (the caller must do that).
std::optional<QualifiedTableName>
tryGetQualifiedNameFromArgument(const ASTFunction & function, size_t arg_idx, bool apply_current_database = true) const
std::optional<QualifiedTableName> tryGetQualifiedNameFromArgument(
const ASTFunction & function, size_t arg_idx, bool evaluate = true, bool apply_current_database = true) const
{
if (!function.arguments)
return {};
@ -326,7 +343,7 @@ namespace
}
else
{
auto qualified_name_as_string = tryGetStringFromArgument(function, arg_idx);
auto qualified_name_as_string = tryGetStringFromArgument(function, arg_idx, evaluate);
if (!qualified_name_as_string)
return {};
@ -345,9 +362,9 @@ namespace
/// Adds a qualified table name from an argument to the collection of dependencies.
/// Accepts forms db_name.table_name (as an identifier) and 'db_name.table_name' (as a string).
void addQualifiedNameFromArgument(const ASTFunction & function, size_t arg_idx)
void addQualifiedNameFromArgument(const ASTFunction & function, size_t arg_idx, bool evaluate = true)
{
if (auto qualified_name = tryGetQualifiedNameFromArgument(function, arg_idx))
if (auto qualified_name = tryGetQualifiedNameFromArgument(function, arg_idx, evaluate))
dependencies.emplace(std::move(qualified_name).value());
}
@ -360,7 +377,7 @@ namespace
return {};
auto table = tryGetStringFromArgument(function, table_arg_idx);
if (!table)
if (!table || table->empty())
return {};
QualifiedTableName qualified_name;

View File

@ -1,7 +1,6 @@
#include "HTTPDictionarySource.h"
#include <Formats/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>
@ -39,7 +38,7 @@ HTTPDictionarySource::HTTPDictionarySource(
, configuration(configuration_)
, sample_block(sample_block_)
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
credentials.setUsername(credentials_.getUsername());
credentials.setPassword(credentials_.getPassword());
@ -52,7 +51,7 @@ HTTPDictionarySource::HTTPDictionarySource(const HTTPDictionarySource & other)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
credentials.setUsername(other.credentials.getUsername());
credentials.setPassword(other.credentials.getPassword());

View File

@ -12,6 +12,7 @@
#include <absl/container/flat_hash_set.h>
#include <base/unaligned.h>
#include <base/defines.h>
#include <base/sort.h>
#include <Common/randomSeed.h>
#include <Common/Arena.h>
@ -768,7 +769,8 @@ private:
if (this == &rhs)
return *this;
close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
fd = rhs.fd;
rhs.fd = -1;
@ -777,7 +779,10 @@ private:
~FileDescriptor()
{
if (fd != -1)
close(fd);
{
int err = close(fd);
chassert(!err || errno == EINTR);
}
}
int fd = -1;

View File

@ -4,7 +4,7 @@
#include <DataTypes/DataTypeString.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -76,7 +76,7 @@ XDBCDictionarySource::XDBCDictionarySource(
, load_all_query(query_builder.composeLoadAllQuery())
, bridge_helper(bridge_)
, bridge_url(bridge_helper->getMainURI())
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context_))
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context_->getSettingsRef(), {context_->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}))
{
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)

View File

@ -149,13 +149,13 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
if (buffer_size % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned size.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned size.
buffer_size = align_up(buffer_size);
}
if (reinterpret_cast<uintptr_t>(existing_memory) % min_alignment)
{
existing_memory = nullptr; /// Cannot reuse existing memory is it has unaligned offset.
existing_memory = nullptr; /// Cannot reuse existing memory as it has unaligned offset.
}
/// Attempt to open a file with O_DIRECT

View File

@ -40,11 +40,15 @@ void WebObjectStorage::initialize(const String & uri_path) const
try
{
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP metadata_buf(
Poco::URI(fs::path(uri_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(getContext()),
ConnectionTimeouts::getHTTPTimeouts(
getContext()->getSettingsRef(),
{getContext()->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials,
/* max_redirects= */ 0,
/* buffer_size_= */ DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -31,43 +31,43 @@ public:
TEST_F(DiskTest, createDirectories)
{
this->disk->createDirectories("test_dir1/");
EXPECT_TRUE(this->disk->isDirectory("test_dir1/"));
disk->createDirectories("test_dir1/");
EXPECT_TRUE(disk->isDirectory("test_dir1/"));
this->disk->createDirectories("test_dir2/nested_dir/");
EXPECT_TRUE(this->disk->isDirectory("test_dir2/nested_dir/"));
disk->createDirectories("test_dir2/nested_dir/");
EXPECT_TRUE(disk->isDirectory("test_dir2/nested_dir/"));
}
TEST_F(DiskTest, writeFile)
{
{
std::unique_ptr<DB::WriteBuffer> out = this->disk->writeFile("test_file");
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
}
DB::String data;
{
std::unique_ptr<DB::ReadBuffer> in = this->disk->readFile("test_file");
std::unique_ptr<DB::ReadBuffer> in = disk->readFile("test_file");
readString(data, *in);
}
EXPECT_EQ("test data", data);
EXPECT_EQ(data.size(), this->disk->getFileSize("test_file"));
EXPECT_EQ(data.size(), disk->getFileSize("test_file"));
}
TEST_F(DiskTest, readFile)
{
{
std::unique_ptr<DB::WriteBuffer> out = this->disk->writeFile("test_file");
std::unique_ptr<DB::WriteBuffer> out = disk->writeFile("test_file");
writeString("test data", *out);
}
// Test SEEK_SET
{
String buf(4, '0');
std::unique_ptr<DB::SeekableReadBuffer> in = this->disk->readFile("test_file");
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
in->seek(5, SEEK_SET);
@ -77,7 +77,7 @@ TEST_F(DiskTest, readFile)
// Test SEEK_CUR
{
std::unique_ptr<DB::SeekableReadBuffer> in = this->disk->readFile("test_file");
std::unique_ptr<DB::SeekableReadBuffer> in = disk->readFile("test_file");
String buf(4, '0');
in->readStrict(buf.data(), 4);
@ -94,10 +94,10 @@ TEST_F(DiskTest, readFile)
TEST_F(DiskTest, iterateDirectory)
{
this->disk->createDirectories("test_dir/nested_dir/");
disk->createDirectories("test_dir/nested_dir/");
{
auto iter = this->disk->iterateDirectory("");
auto iter = disk->iterateDirectory("");
EXPECT_TRUE(iter->isValid());
EXPECT_EQ("test_dir/", iter->path());
iter->next();
@ -105,7 +105,7 @@ TEST_F(DiskTest, iterateDirectory)
}
{
auto iter = this->disk->iterateDirectory("test_dir/");
auto iter = disk->iterateDirectory("test_dir/");
EXPECT_TRUE(iter->isValid());
EXPECT_EQ("test_dir/nested_dir/", iter->path());
iter->next();

View File

@ -0,0 +1,142 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/IPAddress.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int FUNCTION_NOT_ALLOWED;
}
class ReverseDNSQuery : public IFunction
{
public:
static constexpr auto name = "reverseDNSQuery";
static constexpr auto allow_function_config_name = "allow_reverse_dns_query_function";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<ReverseDNSQuery>();
}
String getName() const override
{
return name;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & data_type, size_t input_rows_count) const override
{
if (!Context::getGlobalContextInstance()->getConfigRef().getBool(allow_function_config_name, false))
{
throw Exception(ErrorCodes::FUNCTION_NOT_ALLOWED, "Function {} is not allowed because {} is not set", name, allow_function_config_name);
}
if (arguments.empty())
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "Function {} requires at least one argument", name);
}
auto res_type = getReturnTypeImpl({data_type});
if (input_rows_count == 0u)
{
return res_type->createColumnConstWithDefaultValue(input_rows_count);
}
if (!isString(arguments[0].type))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Function {} requires the input column to be of type String", name);
}
auto input_column = arguments[0].column;
auto ip_address = Poco::Net::IPAddress(input_column->getDataAt(0).toString());
auto ptr_records = DNSResolver::instance().reverseResolve(ip_address);
if (ptr_records.empty())
return res_type->createColumnConstWithDefaultValue(input_rows_count);
Array res;
for (const auto & ptr_record : ptr_records)
{
res.push_back(ptr_record);
}
return res_type->createColumnConst(input_rows_count, res);
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override
{
return false;
}
size_t getNumberOfArguments() const override
{
return 1u;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>());
}
};
REGISTER_FUNCTION(ReverseDNSQuery)
{
factory.registerFunction<ReverseDNSQuery>(
Documentation(
R"(Performs a reverse DNS query to get the PTR records associated with the IP address.
**Syntax**
``` sql
reverseDNSQuery(address)
```
This function performs reverse DNS resolutions on both IPv4 and IPv6.
**Arguments**
- `address` An IPv4 or IPv6 address. [String](../../sql-reference/data-types/string.md).
**Returned value**
- Associated domains (PTR records).
Type: Type: [Array(String)](../../sql-reference/data-types/array.md).
**Example**
Query:
``` sql
SELECT reverseDNSQuery('192.168.0.2');
```
Result:
``` text
reverseDNSQuery('192.168.0.2')
['test2.example.com','test3.example.com']
```
)")
);
}
}

View File

@ -3,6 +3,7 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>
#include <cerrno>
@ -81,7 +82,8 @@ AsynchronousReadBufferFromFile::~AsynchronousReadBufferFromFile()
if (fd < 0)
return;
::close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
}

View File

@ -0,0 +1,126 @@
#include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(0)
, http_keep_alive_timeout(0)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(0)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(http_keep_alive_timeout_)
, secure_connection_timeout(connection_timeout)
, hedged_connection_timeout(receive_timeout_)
, receive_data_timeout(receive_timeout_)
{
}
ConnectionTimeouts::ConnectionTimeouts(
Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_,
Poco::Timespan secure_connection_timeout_,
Poco::Timespan receive_hello_timeout_,
Poco::Timespan receive_data_timeout_)
: connection_timeout(connection_timeout_)
, send_timeout(send_timeout_)
, receive_timeout(receive_timeout_)
, tcp_keep_alive_timeout(tcp_keep_alive_timeout_)
, http_keep_alive_timeout(http_keep_alive_timeout_)
, secure_connection_timeout(secure_connection_timeout_)
, hedged_connection_timeout(receive_hello_timeout_)
, receive_data_timeout(receive_data_timeout_)
{
}
Poco::Timespan ConnectionTimeouts::saturate(Poco::Timespan timespan, Poco::Timespan limit)
{
if (limit.totalMicroseconds() == 0)
return timespan;
else
return (timespan > limit) ? limit : timespan;
}
ConnectionTimeouts ConnectionTimeouts::getSaturated(Poco::Timespan limit) const
{
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit),
saturate(secure_connection_timeout, limit),
saturate(hedged_connection_timeout, limit),
saturate(receive_data_timeout, limit));
}
/// Timeouts for the case when we have just single attempt to connect.
ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
/// Timeouts for the case when we will try many addresses in a loop.
ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(
settings.connect_timeout_with_failover_ms,
settings.send_timeout,
settings.receive_timeout,
settings.tcp_keep_alive_timeout,
0,
settings.connect_timeout_with_failover_secure_ms,
settings.hedged_connection_timeout_ms,
settings.receive_data_timeout_ms);
}
ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout)
{
return ConnectionTimeouts(
settings.http_connection_timeout,
settings.http_send_timeout,
settings.http_receive_timeout,
settings.tcp_keep_alive_timeout,
http_keep_alive_timeout);
}
}

View File

@ -30,47 +30,18 @@ struct ConnectionTimeouts
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(0),
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan receive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(0),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan tcp_keep_alive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
Poco::Timespan receive_timeout_,
Poco::Timespan tcp_keep_alive_timeout_,
Poco::Timespan http_keep_alive_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(connection_timeout),
hedged_connection_timeout(receive_timeout_),
receive_data_timeout(receive_timeout_)
{
}
Poco::Timespan http_keep_alive_timeout_);
ConnectionTimeouts(Poco::Timespan connection_timeout_,
Poco::Timespan send_timeout_,
@ -79,43 +50,17 @@ struct ConnectionTimeouts
Poco::Timespan http_keep_alive_timeout_,
Poco::Timespan secure_connection_timeout_,
Poco::Timespan receive_hello_timeout_,
Poco::Timespan receive_data_timeout_)
: connection_timeout(connection_timeout_),
send_timeout(send_timeout_),
receive_timeout(receive_timeout_),
tcp_keep_alive_timeout(tcp_keep_alive_timeout_),
http_keep_alive_timeout(http_keep_alive_timeout_),
secure_connection_timeout(secure_connection_timeout_),
hedged_connection_timeout(receive_hello_timeout_),
receive_data_timeout(receive_data_timeout_)
{
}
Poco::Timespan receive_data_timeout_);
static Poco::Timespan saturate(Poco::Timespan timespan, Poco::Timespan limit)
{
if (limit.totalMicroseconds() == 0)
return timespan;
else
return (timespan > limit) ? limit : timespan;
}
ConnectionTimeouts getSaturated(Poco::Timespan limit) const
{
return ConnectionTimeouts(saturate(connection_timeout, limit),
saturate(send_timeout, limit),
saturate(receive_timeout, limit),
saturate(tcp_keep_alive_timeout, limit),
saturate(http_keep_alive_timeout, limit),
saturate(secure_connection_timeout, limit),
saturate(hedged_connection_timeout, limit),
saturate(receive_data_timeout, limit));
}
static Poco::Timespan saturate(Poco::Timespan timespan, Poco::Timespan limit);
ConnectionTimeouts getSaturated(Poco::Timespan limit) const;
/// Timeouts for the case when we have just single attempt to connect.
static ConnectionTimeouts getTCPTimeoutsWithoutFailover(const Settings & settings);
/// Timeouts for the case when we will try many addresses in a loop.
static ConnectionTimeouts getTCPTimeoutsWithFailover(const Settings & settings);
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context);
static ConnectionTimeouts getHTTPTimeouts(const Settings & settings, Poco::Timespan http_keep_alive_timeout);
};
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
/// Timeouts for the case when we have just single attempt to connect.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithoutFailover(const Settings & settings)
{
return ConnectionTimeouts(settings.connect_timeout, settings.send_timeout, settings.receive_timeout, settings.tcp_keep_alive_timeout);
}
/// Timeouts for the case when we will try many addresses in a loop.
inline ConnectionTimeouts ConnectionTimeouts::getTCPTimeoutsWithFailover(const Settings & settings)
{
return ConnectionTimeouts(
settings.connect_timeout_with_failover_ms,
settings.send_timeout,
settings.receive_timeout,
settings.tcp_keep_alive_timeout,
0,
settings.connect_timeout_with_failover_secure_ms,
settings.hedged_connection_timeout_ms,
settings.receive_data_timeout_ms);
}
inline ConnectionTimeouts ConnectionTimeouts::getHTTPTimeouts(ContextPtr context)
{
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 10), 0};
return ConnectionTimeouts(settings.http_connection_timeout, settings.http_send_timeout, settings.http_receive_timeout, settings.tcp_keep_alive_timeout, http_keep_alive_timeout);
}
}

View File

@ -3,6 +3,7 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>
#include <cerrno>
@ -73,7 +74,8 @@ ReadBufferFromFile::~ReadBufferFromFile()
if (fd < 0)
return;
::close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
}

View File

@ -3,6 +3,7 @@
#include <cerrno>
#include <Common/ProfileEvents.h>
#include <base/defines.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
@ -71,8 +72,18 @@ WriteBufferFromFile::WriteBufferFromFile(
WriteBufferFromFile::~WriteBufferFromFile()
{
if (fd < 0)
return;
finalize();
::close(fd);
int err = ::close(fd);
/// Everything except for EBADF should be ignored in dtor, since all of
/// others (EINTR/EIO/ENOSPC/EDQUOT) could be possible during writing to
/// fd, and then write already failed and the error had been reported to
/// the user/caller.
///
/// Note, that for close() on Linux, EINTR should *not* be retried.
chassert(!(err && errno == EBADF));
}
void WriteBufferFromFile::finalizeImpl()

View File

@ -55,7 +55,7 @@ ReadBufferPtr WriteBufferFromTemporaryFile::getReadBufferImpl()
auto res = ReadBufferFromTemporaryWriteBuffer::createFrom(this);
/// invalidate FD to avoid close(fd) in destructor
/// invalidate FD to avoid close() in destructor
setFD(-1);
file_name = {};

View File

@ -7,7 +7,7 @@
#include <Common/ProfileEvents.h>
#include <Common/checkStackSize.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>

View File

@ -886,8 +886,8 @@ void InterpreterSystemQuery::syncReplica()
if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
if (!storage_replicated->waitForProcessingQueue(getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \

View File

@ -3,6 +3,7 @@
#if defined(OS_LINUX)
#include <Common/Exception.h>
#include <base/defines.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
@ -31,8 +32,11 @@ PollingQueue::PollingQueue()
PollingQueue::~PollingQueue()
{
close(pipe_fd[0]);
close(pipe_fd[1]);
int err;
err = close(pipe_fd[0]);
chassert(!err || errno == EINTR);
err = close(pipe_fd[1]);
chassert(!err || errno == EINTR);
}
void PollingQueue::addTask(size_t thread_number, void * data, int fd)

View File

@ -117,7 +117,7 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
data->rethrowExceptionIfHas();
bool is_execution_finished
= !data->executor->checkTimeLimitSoft() || lazy_format ? lazy_format->isFinished() : data->is_finished.load();
= !data->executor->checkTimeLimitSoft() || (lazy_format ? lazy_format->isFinished() : data->is_finished.load());
if (is_execution_finished)
{

View File

@ -12,6 +12,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Common/Config/ConfigProcessor.h>
#include <base/defines.h>
#include <base/errnoToString.h>
@ -57,7 +58,9 @@ static ConfigProcessor::LoadedConfig loadConfigurationFromString(std::string & s
{
throw std::runtime_error("unable write to temp file");
}
close(fd);
int error = close(fd);
chassert(!error);
auto config_path = std::string(tmp_file) + ".xml";
if (std::rename(tmp_file, config_path.c_str()))
{

View File

@ -12,7 +12,6 @@
#include <Processors/Transforms/ReadFromMergeTreeDependencyTransform.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/ConnectionTimeoutsContext.h>
#include "Common/logger_useful.h"
#include <Common/checkStackSize.h>
#include <Core/QueryProcessingStage.h>

View File

@ -21,7 +21,7 @@
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>

View File

@ -1,6 +1,7 @@
#if defined(OS_LINUX)
#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
#include <base/defines.h>
#include <Common/Exception.h>
#include <Common/NetException.h>
#include <Client/IConnections.h>
@ -219,9 +220,15 @@ RemoteQueryExecutorReadContext::~RemoteQueryExecutorReadContext()
{
/// connection_fd is closed by Poco::Net::Socket or Epoll
if (pipe_fd[0] != -1)
close(pipe_fd[0]);
{
int err = close(pipe_fd[0]);
chassert(!err || errno == EINTR);
}
if (pipe_fd[1] != -1)
close(pipe_fd[1]);
{
int err = close(pipe_fd[1]);
chassert(!err || errno == EINTR);
}
}
}

View File

@ -12,6 +12,7 @@
#include <Common/NetException.h>
#include <Common/setThreadName.h>
#include <Common/logger_useful.h>
#include <base/defines.h>
#include <chrono>
#include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h>
@ -87,14 +88,18 @@ struct SocketInterruptablePollWrapper
socket_event.data.fd = sockfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &socket_event) < 0)
{
::close(epollfd);
int err = ::close(epollfd);
chassert(!err || errno == EINTR);
throwFromErrno("Cannot insert socket into epoll queue", ErrorCodes::SYSTEM_ERROR);
}
pipe_event.events = EPOLLIN | EPOLLERR | EPOLLPRI;
pipe_event.data.fd = pipe.fds_rw[0];
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, pipe.fds_rw[0], &pipe_event) < 0)
{
::close(epollfd);
int err = ::close(epollfd);
chassert(!err || errno == EINTR);
throwFromErrno("Cannot insert socket into epoll queue", ErrorCodes::SYSTEM_ERROR);
}
#endif
@ -198,7 +203,8 @@ struct SocketInterruptablePollWrapper
#if defined(POCO_HAVE_FD_EPOLL)
~SocketInterruptablePollWrapper()
{
::close(epollfd);
int err = ::close(epollfd);
chassert(!err || errno == EINTR);
}
#endif
};

View File

@ -20,7 +20,6 @@
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CheckingCompressedReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/Operators.h>
#include <Disks/IDisk.h>
#include <boost/algorithm/string/find_iterator.hpp>

View File

@ -13,7 +13,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Formats/NativeWriter.h>
#include <Processors/Sinks/RemoteSink.h>
#include <Processors/Executors/PushingPipelineExecutor.h>

View File

@ -2,6 +2,7 @@
#include <Storages/FileLog/DirectoryWatcherBase.h>
#include <Storages/FileLog/FileLogDirectoryWatcher.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <base/defines.h>
#include <filesystem>
#include <unistd.h>
@ -129,7 +130,8 @@ void DirectoryWatcherBase::watchFunc()
DirectoryWatcherBase::~DirectoryWatcherBase()
{
stop();
close(fd);
int err = ::close(fd);
chassert(!err || errno == EINTR);
}
void DirectoryWatcherBase::start()

View File

@ -8,7 +8,7 @@
#include <Client/Connection.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>

View File

@ -9,7 +9,7 @@ namespace ErrorCodes
}
bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const
bool DropPartsRanges::isAffectedByDropPart(const std::string & new_part_name, std::string & postpone_reason) const
{
if (new_part_name.empty())
return false;
@ -19,7 +19,9 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
{
if (!drop_range.isDisjoint(entry_info))
{
postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartNameForLogs(), new_part_name);
postpone_reason = fmt::format("Has DROP_PART affecting entry {} producing part {}. "
"Will postpone it's execution.",
drop_range.getPartNameForLogs(), new_part_name);
return true;
}
}
@ -27,23 +29,24 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
return false;
}
bool DropPartsRanges::isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const
bool DropPartsRanges::isAffectedByDropPart(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const
{
return isAffectedByDropRange(entry.new_part_name, postpone_reason);
return isAffectedByDropPart(entry.new_part_name, postpone_reason);
}
void DropPartsRanges::addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
void DropPartsRanges::addDropPart(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE", entry->typeToString());
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_PART)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE",
entry->typeToString());
MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(*entry->getDropRange(format_version), format_version);
drop_ranges.emplace(entry->znode_name, entry_info);
}
void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
void DropPartsRanges::removeDropPart(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_PART)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Trying to remove entry of type {} from drop ranges, expected DROP_RANGE",
entry->typeToString());
@ -53,7 +56,7 @@ void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & ent
drop_ranges.erase(it);
}
bool DropPartsRanges::hasDropRange(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info) const
bool DropPartsRanges::hasDropPart(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info) const
{
for (const auto & [_, drop_range] : drop_ranges)
{

View File

@ -23,20 +23,20 @@ public:
: format_version(format_version_)
{}
/// Entry is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Entry is affected by DROP_PART and must be postponed
bool isAffectedByDropPart(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Part is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const;
/// Part is affected by DROP_PART and must be postponed
bool isAffectedByDropPart(const std::string & new_part_name, std::string & postpone_reason) const;
/// Already has equal DROP_RANGE. Don't need to assign new one
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
bool hasDropPart(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Add DROP_RANGE to map
void addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
void addDropPart(const ReplicatedMergeTreeLogEntryPtr & entry);
/// Remove DROP_RANGE from map
void removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
void removeDropPart(const ReplicatedMergeTreeLogEntryPtr & entry);
};

View File

@ -205,11 +205,11 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
}
}
IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
IMergeTreeReader::ColumnPositionLevel IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
{
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
{
Names offsets_streams;
std::vector<std::pair<String, size_t>> offsets_streams;
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
@ -217,7 +217,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
offsets_streams.push_back(full_name);
offsets_streams.emplace_back(full_name, ISerialization::getArrayLevel(subpath));
});
return offsets_streams;
@ -227,7 +227,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);
size_t max_matched_streams = 0;
ColumnPosition position;
ColumnPositionLevel position_level;
/// Find column that has maximal number of matching
/// offsets columns with required_column.
@ -238,23 +238,26 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
continue;
auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage);
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
NameToIndexMap offsets_streams_map(offsets_streams.begin(), offsets_streams.end());
size_t i = 0;
auto it = offsets_streams_map.end();
for (; i < required_offsets_streams.size(); ++i)
{
if (!offsets_streams_set.contains(required_offsets_streams[i]))
auto current_it = offsets_streams_map.find(required_offsets_streams[i].first);
if (current_it == offsets_streams_map.end())
break;
it = current_it;
}
if (i && (!position || i > max_matched_streams))
if (i && (!position_level || i > max_matched_streams))
{
max_matched_streams = i;
position = data_part_info_for_read->getColumnPosition(part_column.name);
position_level.emplace(*data_part_info_for_read->getColumnPosition(part_column.name), it->second);
}
}
return position;
return position_level;
}
void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const

View File

@ -91,8 +91,12 @@ protected:
StorageMetadataPtr metadata_snapshot;
MarkRanges all_mark_ranges;
using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
/// Position and level (of nesting).
using ColumnPositionLevel = std::optional<std::pair<size_t, size_t>>;
/// In case of part of the nested column does not exists, offsets should be
/// read, but only the offsets for the current column, that is why it
/// returns pair of size_t, not just one.
ColumnPositionLevel findColumnForOffsets(const NameAndTypePair & column) const;
NameSet partially_read_columns;

View File

@ -141,13 +141,16 @@ void MergeTreeReaderCompact::fillColumnPositions()
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read);
read_only_offsets[i] = (position != std::nullopt);
auto position_level = findColumnForOffsets(column_to_read);
if (position_level.has_value())
{
column_positions[i].emplace(position_level->first);
read_only_offsets[i].emplace(position_level->second);
partially_read_columns.insert(column_to_read.name);
}
}
column_positions[i] = std::move(position);
if (read_only_offsets[i])
partially_read_columns.insert(column_to_read.name);
else
column_positions[i] = std::move(position);
}
}
@ -217,7 +220,8 @@ size_t MergeTreeReaderCompact::readRows(
void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
std::optional<size_t> only_offsets_level)
{
const auto & [name, type] = name_and_type;
@ -228,9 +232,34 @@ void MergeTreeReaderCompact::readData(
auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{
/// Offset stream from another column could be read, in case of current
/// column does not exists (see findColumnForOffsets() in
/// MergeTreeReaderCompact::fillColumnPositions())
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (only_offsets && !is_offsets)
return nullptr;
if (only_offsets_level.has_value())
{
if (!is_offsets)
return nullptr;
/// Offset stream can be read only from columns of current level or
/// below (since it is OK to read all parent streams from the
/// alternative).
///
/// Consider the following columns in nested "root":
/// - root.array Array(UInt8) - exists
/// - root.nested_array Array(Array(UInt8)) - does not exists (only_offsets_level=1)
///
/// For root.nested_array it will try to read multiple streams:
/// - offsets (substream_path = {ArraySizes})
/// OK
/// - root.nested_array elements (substream_path = {ArrayElements, ArraySizes})
/// NOT OK - cannot use root.array offsets stream for this
///
/// Here only_offsets_level is the level of the alternative stream,
/// and substream_path.size() is the level of the current stream.
if (only_offsets_level.value() < ISerialization::getArrayLevel(substream_path))
return nullptr;
}
return data_buffer;
};
@ -267,7 +296,7 @@ void MergeTreeReaderCompact::readData(
}
/// The buffer is left in inconsistent state after reading single offsets
if (only_offsets)
if (only_offsets_level.has_value())
last_read_granule.reset();
else
last_read_granule.emplace(from_mark, column_position);

View File

@ -50,10 +50,11 @@ private:
MergeTreeMarksLoader marks_loader;
/// Positions of columns in part structure.
using ColumnPositions = std::vector<ColumnPosition>;
using ColumnPositions = std::vector<std::optional<size_t>>;
ColumnPositions column_positions;
/// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets;
/// Should we read full column or only it's offsets.
/// Element of the vector is the level of the alternative stream.
std::vector<std::optional<size_t>> read_only_offsets;
/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
std::optional<size_t> last_right_offset;
@ -64,7 +65,8 @@ private:
void seekToMark(size_t row_index, size_t column_index);
void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets);
size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
std::optional<size_t> only_offsets_level);
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.

View File

@ -42,7 +42,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
{
if (auto offsets_position = findColumnForOffsets(column_to_read))
{
positions_for_offsets[column_to_read.name] = *offsets_position;
positions_for_offsets[column_to_read.name] = offsets_position->first;
partially_read_columns.insert(column_to_read.name);
}
}

View File

@ -211,7 +211,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat);
sync_source_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_source_log_entry_str, sync_source_log_entry_stat,
storage.format_version);
}
else
{
@ -267,7 +268,9 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str, sync_destination_log_entry_stat);
sync_destination_log_entry = *ReplicatedMergeTreeLogEntry::parse(sync_destination_log_entry_str,
sync_destination_log_entry_stat,
storage.format_version);
}
else
{
@ -330,7 +333,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat);
fetch_log_entry = *ReplicatedMergeTreeLogEntry::parse(fetch_log_entry_str, fetch_log_entry_stat,
storage.format_version);
}
else
{
@ -397,11 +401,14 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str, attach_rollback_log_entry_stat);
attach_rollback_log_entry = *ReplicatedMergeTreeLogEntry::parse(attach_rollback_log_entry_str,
attach_rollback_log_entry_stat,
storage.format_version);
}
else
{
const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat);
const auto attach_log_entry = ReplicatedMergeTreeLogEntry::parse(attach_log_entry_str, attach_log_entry_stat,
storage.format_version);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCheckRequest(entry.znode_path, entry.version));
@ -495,7 +502,7 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
Coordination::Stat stat;
String log_entry_str = zk->get(attach_log_entry_barrier_path, &stat);
log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat);
log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, stat, storage.format_version);
}
Strings unwaited = storage.tryWaitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 1);
@ -542,7 +549,8 @@ PartMovesBetweenShardsOrchestrator::Entry PartMovesBetweenShardsOrchestrator::st
{
LOG_DEBUG(log, "Log entry was already created will check the existing one.");
source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat);
source_drop_log_entry = *ReplicatedMergeTreeLogEntry::parse(source_drop_log_entry_str, source_drop_log_entry_stat,
storage.format_version);
}
else
{

View File

@ -99,6 +99,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
break;
case DROP_RANGE:
case DROP_PART:
if (detach)
out << "detach\n";
else
@ -180,7 +181,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << "quorum: " << quorum << '\n';
}
void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFormatVersion partition_format_version)
{
UInt8 format_version = 0;
String type_str;
@ -278,6 +279,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
type = DROP_RANGE;
detach = type_str == "detach";
in >> new_part_name;
auto drop_range_info = MergeTreePartInfo::fromPartName(new_part_name, partition_format_version);
if (!drop_range_info.isFakeDropRangePart())
type = DROP_PART;
}
else if (type_str == "clear_column") /// NOTE: Deprecated.
{
@ -426,11 +430,12 @@ String ReplicatedMergeTreeLogEntryData::toString() const
return out.str();
}
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat)
ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String & s, const Coordination::Stat & stat,
MergeTreeDataFormatVersion format_version)
{
ReadBufferFromString in(s);
Ptr res = std::make_shared<ReplicatedMergeTreeLogEntry>();
res->readText(in);
res->readText(in, format_version);
assertEOF(in);
if (!res->create_time)
@ -444,6 +449,9 @@ std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDat
if (type == DROP_RANGE)
return new_part_name;
if (type == DROP_PART)
return new_part_name;
if (type == REPLACE_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
@ -457,14 +465,9 @@ std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDat
return {};
}
bool ReplicatedMergeTreeLogEntryData::isDropPart(MergeTreeDataFormatVersion format_version) const
bool ReplicatedMergeTreeLogEntryData::isDropPart(MergeTreeDataFormatVersion) const
{
if (type == DROP_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
return !drop_range_info.isFakeDropRangePart();
}
return false;
return type == DROP_PART;
}
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
@ -475,30 +478,7 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
{
auto drop_range_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It's DROP PART and we don't want to add it into virtual parts
/// because it can lead to intersecting parts on stale replicas and this
/// problem is fundamental. So we have very weak guarantees for DROP
/// PART. If any concurrent merge will be assigned then DROP PART will
/// delete nothing and part will be successfully merged into bigger part.
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In
/// the second case part UUIDs used to forbid merges for moding parts so
/// there is no problem with concurrent merges. The third case is quite
/// rare and we give very weak guarantee: there will be no active part
/// with this name, but possibly it was merged to some other part.
if (!drop_range_part_info.isFakeDropRangePart())
return {};
return {new_part_name};
}
if (type == REPLACE_RANGE)
{
@ -509,6 +489,25 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
return res;
}
/// It's DROP PART and we don't want to add it into virtual parts
/// because it can lead to intersecting parts on stale replicas and this
/// problem is fundamental. So we have very weak guarantees for DROP
/// PART. If any concurrent merge will be assigned then DROP PART will
/// delete nothing and part will be successfully merged into bigger part.
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In
/// the second case part UUIDs used to forbid merges for moding parts so
/// there is no problem with concurrent merges. The third case is quite
/// rare and we give very weak guarantee: there will be no active part
/// with this name, but possibly it was merged to some other part.
if (type == DROP_PART)
return {};
/// Doesn't produce any part.
if (type == SYNC_PINNED_PART_UUIDS)
return {};

View File

@ -45,6 +45,7 @@ struct ReplicatedMergeTreeLogEntryData
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths
SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state.
CLONE_PART_FROM_SHARD, /// Clone part from another shard.
DROP_PART, /// NOTE: Virtual (has the same (de)serialization format as DROP_RANGE). Deletes the specified part.
};
static String typeToString(Type type)
@ -62,6 +63,7 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA";
case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS";
case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD";
case ReplicatedMergeTreeLogEntryData::DROP_PART: return "DROP_PART";
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", DB::toString<int>(type));
}
@ -73,7 +75,7 @@ struct ReplicatedMergeTreeLogEntryData
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
void readText(ReadBuffer & in, MergeTreeDataFormatVersion partition_format_version);
String toString() const;
String znode_name;
@ -183,7 +185,7 @@ struct ReplicatedMergeTreeLogEntry : public ReplicatedMergeTreeLogEntryData, std
std::condition_variable execution_complete; /// Awake when currently_executing becomes false.
static Ptr parse(const String & s, const Coordination::Stat & stat);
static Ptr parse(const String & s, const Coordination::Stat & stat, MergeTreeDataFormatVersion format_version);
};
using ReplicatedMergeTreeLogEntryPtr = std::shared_ptr<ReplicatedMergeTreeLogEntry>;

View File

@ -29,7 +29,7 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
, drop_ranges(format_version)
, drop_parts(format_version)
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
@ -95,10 +95,26 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr &
return !virtual_part_name.empty() && virtual_part_name != data_part->name;
}
bool ReplicatedMergeTreeQueue::hasDropRange(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
bool ReplicatedMergeTreeQueue::isGoingToBeDropped(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
{
std::lock_guard lock(state_mutex);
return drop_ranges.hasDropRange(part_info, out_drop_range_info);
return isGoingToBeDroppedImpl(part_info, out_drop_range_info);
}
bool ReplicatedMergeTreeQueue::isGoingToBeDroppedImpl(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const
{
String covering_virtual = virtual_parts.getContainingPart(part_info);
if (!covering_virtual.empty())
{
auto covering_virtual_info = MergeTreePartInfo::fromPartName(covering_virtual, format_version);
if (covering_virtual_info.isFakeDropRangePart())
{
if (out_drop_range_info)
*out_drop_range_info = covering_virtual_info;
return true;
}
}
return drop_parts.hasDropPart(part_info, out_drop_range_info);
}
bool ReplicatedMergeTreeQueue::checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const
@ -165,7 +181,7 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
for (size_t i = 0; i < children_num; ++i)
{
auto res = results[i];
LogEntryPtr entry = LogEntry::parse(res.data, res.stat);
LogEntryPtr entry = LogEntry::parse(res.data, res.stat, format_version);
entry->znode_name = children[i];
std::lock_guard lock(state_mutex);
@ -221,41 +237,35 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
for (const String & virtual_part_name : entry_virtual_parts)
{
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type == LogEntry::DROP_RANGE)
continue;
/// Note: DROP_PART does not have virtual parts
auto part_info = MergeTreePartInfo::fromPartName(virtual_part_name, format_version);
if (entry->type == LogEntry::REPLACE_RANGE && part_info.isFakeDropRangePart())
if (part_info.isFakeDropRangePart())
continue;
addPartToMutations(virtual_part_name, part_info);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->type != LogEntry::DROP_RANGE)
if (entry->type == LogEntry::DROP_PART)
{
queue.push_back(entry);
}
else
{
drop_ranges.addDropRange(entry);
/// DROP PART remove parts, so we remove it from virtual parts to
/// preserve invariant virtual_parts = current_parts + queue.
/// Also remove it from parts_to_do to avoid intersecting parts in parts_to_do
/// if fast replica will execute DROP PART and assign a merge that contains dropped blocks.
if (entry->isDropPart(format_version))
{
String drop_part_name = *entry->getDropRange(format_version);
virtual_parts.removePartAndCoveredParts(drop_part_name);
removeCoveredPartsFromMutations(drop_part_name, /*remove_part = */ true, /*remove_covered_parts = */ true);
}
queue.push_front(entry);
drop_parts.addDropPart(entry);
String drop_part_name = *entry->getDropRange(format_version);
virtual_parts.removePartAndCoveredParts(drop_part_name);
removeCoveredPartsFromMutations(drop_part_name, /*remove_part = */ true, /*remove_covered_parts = */ true);
}
/// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted
if (entry->getDropRange(format_version))
queue.push_front(entry);
else
queue.push_back(entry);
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
inserts_by_time.insert(entry);
@ -350,36 +360,26 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
if (entry->type == LogEntry::DROP_PART)
{
LOG_TEST(log, "Removing drop part from current and virtual parts {}", *drop_range_part_name);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
LOG_TEST(log, "Removing DROP_PART from current parts {}", *drop_range_part_name);
current_parts.removePartAndCoveredParts(*drop_range_part_name);
drop_parts.removeDropPart(entry);
}
else
{
LOG_TEST(log, "Removing drop range from current and virtual parts {}", *drop_range_part_name);
LOG_TEST(log, "Removing DROP_RANGE from current and virtual parts {}", *drop_range_part_name);
current_parts.remove(*drop_range_part_name);
virtual_parts.remove(*drop_range_part_name);
}
/// During inserting to queue (insertUnlocked()) we remove part for
/// DROP_RANGE only for DROP PART but not for DROP PARTITION.
virtual_parts.remove(*drop_range_part_name);
/// NOTE: we don't need to remove part/covered parts from mutations (removeCoveredPartsFromMutations()) here because:
/// - for DROP PART we have this during inserting to queue (see insertUnlocked())
/// - for DROP PARTITION we have this in the loop above (when we adding parts to current_parts)
}
if (entry->type == LogEntry::DROP_RANGE)
{
drop_ranges.removeDropRange(entry);
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_TRACE(log, "Finishing metadata alter with version {}", entry->alter_version);
@ -388,9 +388,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
else
{
if (entry->type == LogEntry::DROP_RANGE)
if (entry->type == LogEntry::DROP_PART)
{
drop_ranges.removeDropRange(entry);
drop_parts.removeDropPart(entry);
}
LOG_TEST(log, "Removing unsuccessful entry {} virtual parts [{}]", entry->znode_name, fmt::join(entry_virtual_parts, ", "));
@ -544,8 +544,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
if (!found && need_remove_from_zk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size);
notifySubscribers(queue_size, entry->znode_name);
if (!need_remove_from_zk)
return;
@ -664,7 +663,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{
auto res = get_results[i];
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat));
copied_entries.emplace_back(LogEntry::parse(res.data, res.stat, format_version));
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));
@ -782,6 +781,7 @@ QueueRepresentation getQueueRepresentation(const std::list<ReplicatedMergeTreeLo
break;
}
case LogEntryType::DROP_RANGE:
case LogEntryType::DROP_PART:
{
result[key].dropped_parts.push_back(entry->new_part_name);
break;
@ -1082,7 +1082,7 @@ bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePart
return false;
assert(entry_ptr->replace_range_entry);
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE)
if (current.type != LogEntry::REPLACE_RANGE && current.type != LogEntry::DROP_RANGE && current.type != LogEntry::DROP_PART)
return false;
if (entry_ptr->replace_range_entry == current.replace_range_entry) /// same partition, don't want to drop ourselves
@ -1260,14 +1260,15 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
/// We have found `part_name` on some replica and are going to fetch it instead of covered `entry->new_part_name`.
std::unique_lock lock(state_mutex);
if (virtual_parts.getContainingPart(part_name).empty())
String covering_part = virtual_parts.getContainingPart(part_name);
if (covering_part.empty())
{
/// We should not fetch any parts that absent in our `virtual_parts` set,
/// because we do not know about such parts according to our replication queue (we know about them from some side-channel).
/// Otherwise, it may break invariants in replication queue reordering, for example:
/// 1. Our queue contains GET_PART all_2_2_0, log contains DROP_RANGE all_2_2_0 and MERGE_PARTS all_1_3_1
/// 2. We execute GET_PART all_2_2_0, but fetch all_1_3_1 instead
/// (drop_ranges.isAffectedByDropRange(...) is false-negative, because DROP_RANGE all_2_2_0 is not pulled yet).
/// (drop_parts.isAffectedByDropPart(...) is false-negative, because DROP_RANGE all_2_2_0 is not pulled yet).
/// It actually means, that MERGE_PARTS all_1_3_1 is executed too, but it's not even pulled yet.
/// 3. Then we pull log, trying to execute DROP_RANGE all_2_2_0
/// and reveal that it was incorrectly reordered with MERGE_PARTS all_1_3_1 (drop range intersects merged part).
@ -1276,8 +1277,8 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
}
/// FIXME get rid of actual_part_name.
/// If new covering part jumps over DROP_RANGE we should execute drop range first
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
/// If new covering part jumps over non-disjoint DROP_PART we should execute DROP_PART first to avoid intersection
if (drop_parts.isAffectedByDropPart(part_name, reject_reason))
return false;
std::vector<LogEntryPtr> covered_entries_to_wait;
@ -1307,8 +1308,26 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
if (entry.type != LogEntry::DROP_RANGE && drop_ranges.isAffectedByDropRange(entry, out_postpone_reason))
return false;
if (entry.type != LogEntry::DROP_RANGE && entry.type != LogEntry::DROP_PART)
{
/// Do not touch any entries that are not disjoint with some DROP_PART to avoid intersecting parts
if (drop_parts.isAffectedByDropPart(entry, out_postpone_reason))
return false;
}
/// Optimization: it does not really make sense to generate parts that are going to be dropped anyway
if (!entry.new_part_name.empty())
{
auto new_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
MergeTreePartInfo drop_info;
if (entry.type != LogEntry::DROP_PART && !new_part_info.isFakeDropRangePart() && isGoingToBeDroppedImpl(new_part_info, &drop_info))
{
out_postpone_reason = fmt::format(
"Not executing {} because it produces part {} that is going to be dropped by {}",
entry.znode_name, entry.new_part_name, drop_info.getPartNameForLogs());
return false;
}
}
/// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
@ -1461,58 +1480,55 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::REPLACE_RANGE)
/// DROP_RANGE, DROP_PART and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// But it should not happen if ranges are disjoint.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (auto drop_range = entry.getDropRange(format_version))
{
/// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// But it should not happen if ranges are disjoint.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (auto drop_range = entry.getDropRange(format_version))
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
if (drop_range_info.isDisjoint(info))
continue;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
if (drop_range_info.isDisjoint(info))
continue;
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartNameForLogs());
return false;
}
}
if (entry.isDropPart(format_version))
if (entry.type == LogEntry::DROP_PART)
{
/// We should avoid reordering of REPLACE_RANGE and DROP_PART,
/// because if replace_range_entry->new_part_names contains drop_range_entry->new_part_name
/// and we execute DROP PART before REPLACE_RANGE, then DROP PART will be no-op
/// (because part is not created yet, so there is nothing to drop;
/// DROP_RANGE does not cover all parts of REPLACE_RANGE, so removePartProducingOpsInRange(...) will not remove anything too)
/// and part will never be removed. Replicas may diverge due to such reordering.
/// We don't need to do anything for other entry types, because removePartProducingOpsInRange(...) will remove them as expected.
auto drop_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
for (const auto & replace_entry : queue)
{
/// We should avoid reordering of REPLACE_RANGE and DROP PART (DROP_RANGE),
/// because if replace_range_entry->new_part_names contains drop_range_entry->new_part_name
/// and we execute DROP PART before REPLACE_RANGE, then DROP PART will be no-op
/// (because part is not created yet, so there is nothing to drop;
/// DROP_RANGE does not cover all parts of REPLACE_RANGE, so removePartProducingOpsInRange(...) will not remove anything too)
/// and part will never be removed. Replicas may diverge due to such reordering.
/// We don't need to do anything for other entry types, because removePartProducingOpsInRange(...) will remove them as expected.
if (replace_entry->type != LogEntry::REPLACE_RANGE)
continue;
auto drop_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
for (const auto & replace_entry : queue)
for (const auto & new_part_name : replace_entry->replace_range_entry->new_part_names)
{
if (replace_entry->type != LogEntry::REPLACE_RANGE)
continue;
for (const auto & new_part_name : replace_entry->replace_range_entry->new_part_names)
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
auto new_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
if (!new_part_info.isDisjoint(drop_part_info))
{
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
constexpr auto fmt_string = "Not executing log entry {} of type {} for part {} "
"because it probably depends on {} (REPLACE_RANGE).";
LOG_TRACE(LogToStr(out_postpone_reason, log), fmt_string, entry.znode_name, entry.typeToString(),
entry.new_part_name, replace_entry->znode_name);
return false;
}
}
}
@ -2450,9 +2466,10 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const std::string & z
}
}
bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
bool ReplicatedMergeTreeMergePredicate::isGoingToBeDropped(const MergeTreePartInfo & new_drop_range_info,
MergeTreePartInfo * out_drop_range_info) const
{
return queue.hasDropRange(new_drop_range_info);
return queue.isGoingToBeDropped(new_drop_range_info, out_drop_range_info);
}
String ReplicatedMergeTreeMergePredicate::getCoveringVirtualPart(const String & part_name) const
@ -2466,12 +2483,17 @@ ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
{
std::lock_guard lock(state_mutex);
std::unordered_set<String> result;
result.reserve(queue.size());
for (const auto & entry : queue)
result.insert(entry->znode_name);
std::lock_guard lock_subscribers(subscribers_mutex);
auto it = subscribers.emplace(subscribers.end(), std::move(callback));
/// Atomically notify about current size
(*it)(queue.size());
/// Notify queue size & log entry ids to avoid waiting for removed entries
(*it)(result.size(), result, std::nullopt);
return SubscriberHandler(it, *this);
}
@ -2482,16 +2504,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
queue.subscribers.erase(it);
}
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id)
{
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size);
subscriber_callback(new_queue_size, {}, removed_log_entry_id);
}
ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0);
notifySubscribers(0, std::nullopt);
}
String padIndex(Int64 index)

View File

@ -105,8 +105,9 @@ private:
ActiveDataPartSet virtual_parts;
/// Dropped ranges inserted into queue
DropPartsRanges drop_ranges;
/// We do not add DROP_PARTs to virtual_parts because they can intersect,
/// so we store them separately in this structure.
DropPartsRanges drop_parts;
/// A set of mutations loaded from ZooKeeper.
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this set.
@ -163,7 +164,7 @@ private:
/// A subscriber callback is called when an entry queue is deleted
mutable std::mutex subscribers_mutex;
using SubscriberCallBack = std::function<void(size_t /* queue_size */)>;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::unordered_set<String> /*wait_for_ids*/, std::optional<String> /* removed_log_entry_id */)>;
using Subscribers = std::list<SubscriberCallBack>;
using SubscriberIterator = Subscribers::iterator;
@ -180,8 +181,8 @@ private:
Subscribers subscribers;
/// Notify subscribers about queue change
void notifySubscribers(size_t new_queue_size);
/// Notify subscribers about queue change (new queue size and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(
@ -405,8 +406,9 @@ public:
/// Checks that part is already in virtual parts
bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
/// Returns true if part_info is covered by some DROP_RANGE
bool hasDropRange(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Returns true if part_info is covered by some DROP_RANGE or DROP_PART
bool isGoingToBeDropped(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
bool isGoingToBeDroppedImpl(const MergeTreePartInfo & part_info, MergeTreePartInfo * out_drop_range_info) const;
/// Check that part produced by some entry in queue and get source parts for it.
/// If there are several entries return largest source_parts set. This rarely possible
@ -524,7 +526,7 @@ public:
int32_t getVersion() const { return merges_version; }
/// Returns true if there's a drop range covering new_drop_range_info
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
bool isGoingToBeDropped(const MergeTreePartInfo & new_drop_range_info, MergeTreePartInfo * out_drop_range_info = nullptr) const;
/// Returns virtual part covering part_name (if any) or empty string
String getCoveringVirtualPart(const String & part_name) const;

View File

@ -81,7 +81,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <memory>
#include <filesystem>

View File

@ -69,7 +69,6 @@
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
@ -233,6 +232,11 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeperAndAssertNotReadonl
return res;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
static MergeTreePartInfo makeDummyDropRangeForMovePartitionOrAttachPartitionFrom(const String & partition_id)
{
/// NOTE We don't have special log entry type for MOVE PARTITION/ATTACH PARTITION FROM,
@ -1551,7 +1555,7 @@ MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::attachPartHelperFo
bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
if (entry.type == LogEntry::DROP_RANGE)
if (entry.type == LogEntry::DROP_RANGE || entry.type == LogEntry::DROP_PART)
{
executeDropRange(entry);
return true;
@ -2356,7 +2360,6 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
return true;
}
void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entry)
{
auto zookeeper = getZooKeeper();
@ -2385,7 +2388,7 @@ void StorageReplicatedMergeTree::executeClonePartFromShard(const LogEntry & entr
auto metadata_snapshot = getInMemoryMetadataPtr();
String source_replica_path = entry.source_shard + "/replicas/" + replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(getContext());
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();
@ -2522,7 +2525,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.stat = std::move(res.stat);
try
{
info.parsed_entry = LogEntry::parse(info.data, info.stat);
info.parsed_entry = LogEntry::parse(info.data, info.stat, format_version);
}
catch (...)
{
@ -2535,7 +2538,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
info.parsed_entry->znode_name = source_queue_names[i];
if (info.parsed_entry->type == LogEntry::DROP_RANGE)
if (info.parsed_entry->type == LogEntry::DROP_RANGE || info.parsed_entry->type == LogEntry::DROP_PART)
{
drop_range_set.add(info.parsed_entry->new_part_name);
}
@ -2637,7 +2640,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
String covering_drop_range = drop_range_set.getContainingPart(part_name);
if (!covering_drop_range.empty())
{
LOG_TRACE(log, "{} {}: it's covered by DROP_RANGE {}", log_msg_context, part_name, covering_drop_range);
LOG_TRACE(log, "{} {}: it's covered by drop range {}", log_msg_context, part_name, covering_drop_range);
return true;
}
@ -3555,9 +3558,9 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
/// Because of version check this method will never create FETCH if drop part exists
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{broken_part_info.partition_id});
if (merge_pred.hasDropRange(broken_part_info))
if (merge_pred.isGoingToBeDropped(broken_part_info))
{
LOG_INFO(log, "Broken part {} is covered by DROP RANGE, don't need to fetch it", part_name);
LOG_INFO(log, "Broken part {} is covered by drop range, don't need to fetch it", part_name);
return;
}
/// Check that our version of log (and queue) is the most fresh. Otherwise don't create new entry fetch entry.
@ -3614,7 +3617,7 @@ void StorageReplicatedMergeTree::stopBeingLeader()
ConnectionTimeouts StorageReplicatedMergeTree::getFetchPartHTTPTimeouts(ContextPtr local_context)
{
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(local_context);
auto timeouts = getHTTPTimeouts(local_context);
auto settings = getSettings();
if (settings->replicated_fetches_http_connection_timeout.changed)
@ -4261,7 +4264,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
std::function<MutableDataPartPtr()> get_part;
ReplicatedMergeTreeAddress address(zookeeper->get(fs::path(source_replica_path) / "host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(getContext());
auto timeouts = getHTTPTimeouts(getContext());
auto credentials = getContext()->getInterserverCredentials();
String interserver_scheme = getContext()->getInterserverScheme();
@ -5819,7 +5822,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
String log_entry_str;
Coordination::Stat log_entry_stat;
bool exists = getZooKeeper()->tryGet(fs::path(table_zookeeper_path) / "log" / log_entry_name, log_entry_str, &log_entry_stat);
ReplicatedMergeTreeLogEntryData log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, log_entry_stat);
ReplicatedMergeTreeLogEntryData log_entry = *ReplicatedMergeTreeLogEntry::parse(log_entry_str, log_entry_stat, format_version);
if (exists && entry.log_entry_id == log_entry.log_entry_id)
{
LOG_DEBUG(log, "Found log entry with id `{}` in the log", entry.log_entry_id);
@ -5886,7 +5889,7 @@ bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry(
/// Check if the id matches rather than just contents. This entry
/// might have been written by different ClickHouse versions and
/// it is hard to guarantee same text representation.
ReplicatedMergeTreeLogEntryData queue_entry = *ReplicatedMergeTreeLogEntry::parse(queue_entry_str, queue_entry_stat);
ReplicatedMergeTreeLogEntryData queue_entry = *ReplicatedMergeTreeLogEntry::parse(queue_entry_str, queue_entry_stat, format_version);
if (entry.log_entry_id == queue_entry.log_entry_id)
{
queue_entry_to_wait_for = entry_name;
@ -7549,26 +7552,37 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
background_moves_assignee.trigger();
}
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds)
{
Stopwatch watch;
/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
background_operations_assignee.trigger();
Poco::Event target_size_event;
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
std::unordered_set<String> wait_for_ids;
bool set_ids_to_wait = true;
Poco::Event target_entry_event;
auto callback = [&target_entry_event, &wait_for_ids, &set_ids_to_wait](size_t new_queue_size, std::unordered_set<String> log_entry_ids, std::optional<String> removed_log_entry_id)
{
if (new_queue_size <= queue_size)
target_size_event.set();
if (set_ids_to_wait)
{
wait_for_ids = log_entry_ids;
set_ids_to_wait = false;
}
if (removed_log_entry_id.has_value())
wait_for_ids.erase(removed_log_entry_id.value());
if (wait_for_ids.empty() || new_queue_size == 0)
target_entry_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));
while (!target_size_event.tryWait(50))
while (!target_entry_event.tryWait(50))
{
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
return false;
@ -7576,14 +7590,13 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
if (partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table");
}
return true;
}
bool StorageReplicatedMergeTree::dropPartImpl(
zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop)
{
LOG_TRACE(log, "Will try to insert a log entry to DROP_RANGE for part {}", part_name);
LOG_TRACE(log, "Will try to insert a log entry to DROP_PART for part {}", part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
@ -7600,7 +7613,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
return false;
}
if (merge_pred.hasDropRange(part->info))
if (merge_pred.isGoingToBeDropped(part->info))
{
if (throw_if_noop)
throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED, "Already has DROP RANGE for part {} in queue.", part_name);
@ -7645,12 +7658,12 @@ bool StorageReplicatedMergeTree::dropPartImpl(
size_t clear_block_ops_size = ops.size();
/// If `part_name` is result of a recent merge and source parts are still available then
/// DROP_RANGE with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_RANGE;
/// DROP_PART with detach will move this part together with source parts to `detached/` dir.
entry.type = LogEntry::DROP_PART;
entry.source_replica = replica_name;
/// We don't set fake drop level (999999999) for the single part DROP_RANGE.
/// We don't set fake drop level (999999999) for the single part drop range.
/// First of all we don't guarantee anything other than the part will not be
/// active after DROP PART, but covering part (without data of dropped part) can exist.
/// active after DROP_PART, but covering part (without data of dropped part) can exist.
/// If we add part with 9999999 level than we can break invariant in virtual_parts of
/// the queue.
entry.new_part_name = getPartNamePossiblyFake(format_version, part->info);
@ -7811,12 +7824,11 @@ StorageReplicatedMergeTree::LogEntryPtr StorageReplicatedMergeTree::dropAllParts
void StorageReplicatedMergeTree::enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds)
{
MergeTreePartInfo covering_drop_range;
/// NOTE This check is just an optimization, it's not reliable for two reasons:
/// (1) drop entry could be removed concurrently and (2) it does not take REPLACE_RANGE into account.
/// NOTE This check is just an optimization, it's not reliable because drop entry could be removed concurrently.
/// See also ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck
if (queue.hasDropRange(MergeTreePartInfo::fromPartName(part_name, format_version), &covering_drop_range))
if (queue.isGoingToBeDropped(MergeTreePartInfo::fromPartName(part_name, format_version), &covering_drop_range))
{
LOG_WARNING(log, "Do not enqueue part {} for check because it's covered by DROP_RANGE {} and going to be removed",
LOG_WARNING(log, "Do not enqueue part {} for check because it's covered by drop range {} and going to be removed",
part_name, covering_drop_range.getPartNameForLogs());
return;
}
@ -8679,9 +8691,12 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
LOG_WARNING(log, "Will not create empty part instead of lost {}, because there's no covering part in replication queue", lost_part_name);
return false;
}
if (pred.hasDropRange(MergeTreePartInfo::fromPartName(covering_virtual, format_version)))
MergeTreePartInfo drop_info;
if (pred.isGoingToBeDropped(MergeTreePartInfo::fromPartName(lost_part_name, format_version), &drop_info))
{
LOG_WARNING(log, "Will not create empty part instead of lost {}, because it's covered by DROP_RANGE", lost_part_name);
LOG_WARNING(log, "Will not create empty part instead of lost {}, "
"because it's going to be removed (by range {})",
lost_part_name, drop_info.getPartNameForLogs());
return false;
}

View File

@ -178,9 +178,9 @@ public:
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait when replication queue size becomes less or equal than queue_size
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
bool waitForProcessingQueue(UInt64 max_wait_milliseconds = 0);
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);

View File

@ -8,7 +8,7 @@
#include "Client/Connection.h"
#include "Core/QueryProcessingStage.h"
#include <DataTypes/DataTypeString.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>

View File

@ -13,7 +13,6 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/WriteBufferFromHTTP.h>
@ -82,6 +81,10 @@ static bool urlWithGlobs(const String & uri)
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
}
static ConnectionTimeouts getHTTPTimeouts(ContextPtr context)
{
return ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), {context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0});
}
IStorageURLBase::IStorageURLBase(
const String & uri_,
@ -632,7 +635,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
credentials,
headers,
@ -716,7 +719,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
download_threads,
headers,
@ -740,7 +743,7 @@ Pipe IStorageURLBase::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
max_download_threads,
headers,
@ -775,6 +778,7 @@ Pipe StorageURLWithFailover::read(
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info,
getReadMethod(),
@ -786,7 +790,7 @@ Pipe StorageURLWithFailover::read(
local_context,
columns_description,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
getHTTPTimeouts(local_context),
compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
@ -815,7 +819,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -827,7 +831,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
compression_method,
http_method);
}
@ -896,7 +900,7 @@ std::optional<time_t> IStorageURLBase::getLastModificationTime(
Poco::URI(url),
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
getHTTPTimeouts(context),
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,

View File

@ -5,7 +5,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Formats/FormatFactory.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
@ -130,13 +130,16 @@ SinkToStoragePtr StorageXDBC::write(const ASTPtr & /* query */, const StorageMet
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLSink>(
request_uri.toString(),
format_name,
getFormatSettings(local_context),
metadata_snapshot->getSampleBlock(),
local_context,
ConnectionTimeouts::getHTTPTimeouts(local_context),
ConnectionTimeouts::getHTTPTimeouts(
local_context->getSettingsRef(),
{local_context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
compression_method);
}

View File

@ -2,7 +2,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
@ -76,7 +76,14 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
columns_info_uri.addQueryParameter("external_table_functions_use_nulls", toString(use_nulls));
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context), credentials);
ReadWriteBufferFromHTTP buf(
columns_info_uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
ConnectionTimeouts::getHTTPTimeouts(
context->getSettingsRef(),
{context->getConfigRef().getUInt("keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT), 0}),
credentials);
std::string columns_info;
readStringBinary(columns_info, buf);

View File

@ -17,7 +17,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
@ -88,7 +87,7 @@ ColumnsDescription TableFunctionFile::getActualTableStructure(ContextPtr context
if (structure == "auto")
{
if (fd >= 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Schema inference is not supported for table function '{}' with file descriptor", getName());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Schema inference is not supported for table function '{}' with file descriptor", getName());
size_t total_bytes_to_read = 0;
Strings paths = StorageFile::getPathsList(filename, context->getUserFilesPath(), context, total_bytes_to_read);
return StorageFile::getTableStructureFromFile(format, paths, compression_method, std::nullopt, context);

View File

@ -109,6 +109,9 @@ endif()
if (TARGET ch_contrib::jemalloc)
set(USE_JEMALLOC 1)
endif()
if (TARGET ch_contrib::gwp_asan)
set(USE_GWP_ASAN 1)
endif()
if (TARGET ch_contrib::h3)
set(USE_H3 1)
endif()

View File

@ -56,6 +56,13 @@ class Reviews:
logging.info("There aren't reviews for PR #%s", self.pr.number)
return False
logging.info(
"The following users have reviewed the PR:\n %s",
"\n ".join(
f"{user.login}: {review.state}" for user, review in self.reviews.items()
),
)
filtered_reviews = {
user: review
for user, review in self.reviews.items()
@ -125,7 +132,11 @@ class Reviews:
return False
return True
logging.info("The PR #%s is not approved", self.pr.number)
logging.info(
"The PR #%s is not approved by any of %s team member",
self.pr.number,
TEAM_NAME,
)
return False

View File

@ -91,7 +91,7 @@ class HTTPError(Exception):
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(
base_args, query, timeout=30, settings=None, default_format=None
base_args, query, timeout=30, settings=None, default_format=None, max_http_retries=5
):
if args.secure:
client = http.client.HTTPSConnection(
@ -120,7 +120,7 @@ def clickhouse_execute_http(
if default_format is not None:
params["default_format"] = default_format
for i in range(MAX_RETRIES):
for i in range(max_http_retries):
try:
client.request(
"POST",
@ -130,7 +130,7 @@ def clickhouse_execute_http(
data = res.read()
break
except Exception as ex:
if i == MAX_RETRIES - 1:
if i == max_http_retries - 1:
raise ex
sleep(i + 1)
@ -140,13 +140,12 @@ def clickhouse_execute_http(
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute(base_args, query, timeout=30, settings=None, max_http_retries=5):
return clickhouse_execute_http(base_args, query, timeout, settings, max_http_retries=max_http_retries).strip()
def clickhouse_execute_json(base_args, query, timeout=60, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow")
def clickhouse_execute_json(base_args, query, timeout=60, settings=None, max_http_retries=5):
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow", max_http_retries=max_http_retries)
if not data:
return None
rows = []
@ -641,7 +640,7 @@ class TestCase:
clickhouse_execute(
args,
"CREATE DATABASE " + database + get_db_engine(testcase_args, database),
"CREATE DATABASE IF NOT EXISTS " + database + get_db_engine(testcase_args, database),
settings=get_create_database_settings(args, testcase_args),
)
@ -1139,7 +1138,7 @@ class TestCase:
seconds_left = max(
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
drop_database_query = "DROP DATABASE " + database
drop_database_query = "DROP DATABASE IF EXISTS " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
@ -1670,7 +1669,7 @@ def check_server_started(args):
retry_count = args.server_check_retries
while retry_count > 0:
try:
clickhouse_execute(args, "SELECT 1")
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
print(" OK")
sys.stdout.flush()
return True

View File

@ -0,0 +1,3 @@
<clickhouse>
<allow_reverse_dns_query_function>1</allow_reverse_dns_query_function>
</clickhouse>

View File

@ -52,6 +52,7 @@ ln -sf $SRC_PATH/config.d/enable_zero_copy_replication.xml $DEST_SERVER_PATH/con
ln -sf $SRC_PATH/config.d/nlp.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/enable_keeper_map.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/display_name.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/reverse_dns_query_function.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/compressed_marks_and_index.xml $DEST_SERVER_PATH/config.d/
# Not supported with fasttest.

View File

@ -1331,23 +1331,16 @@ def test_tables_dependency():
instance.query("CREATE DATABASE test2")
# For this test we use random names of tables to check they're created according to their dependency (not just in alphabetic order).
random_table_names = [f"{chr(ord('A')+i)}" for i in range(0, 10)]
random_table_names = [f"{chr(ord('A')+i)}" for i in range(0, 15)]
random.shuffle(random_table_names)
random_table_names = [
random.choice(["test", "test2"]) + "." + table_name
for table_name in random_table_names
]
print(f"random_table_names={random_table_names}")
t1 = random_table_names[0]
t2 = random_table_names[1]
t3 = random_table_names[2]
t4 = random_table_names[3]
t5 = random_table_names[4]
t6 = random_table_names[5]
t7 = random_table_names[6]
t8 = random_table_names[7]
t9 = random_table_names[8]
t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11, t12, t13, t14, t15 = tuple(
random_table_names
)
# Create a materialized view and a dictionary with a local table as source.
instance.query(
@ -1373,11 +1366,34 @@ def test_tables_dependency():
instance.query(f"CREATE VIEW {t7} AS SELECT sum(x) FROM (SELECT x FROM {t6})")
instance.query(
f"CREATE TABLE {t8} AS {t2} ENGINE = Buffer({t2.split('.')[0]}, {t2.split('.')[1]}, 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
f"CREATE DICTIONARY {t8} (x Int64, y String) PRIMARY KEY x SOURCE(CLICKHOUSE(TABLE '{t1.split('.')[1]}' DB '{t1.split('.')[0]}')) LAYOUT(FLAT()) LIFETIME(9)"
)
instance.query(f"CREATE TABLE {t9}(a Int64) ENGINE=Log")
instance.query(
f"CREATE VIEW {t10}(x Int64, y String) AS SELECT * FROM {t1} WHERE x IN {t9}"
)
instance.query(
f"CREATE DICTIONARY {t9} (x Int64, y String) PRIMARY KEY x SOURCE(CLICKHOUSE(TABLE '{t1.split('.')[1]}' DB '{t1.split('.')[0]}')) LAYOUT(FLAT()) LIFETIME(9)"
f"CREATE VIEW {t11}(x Int64, y String) AS SELECT * FROM {t2} WHERE x NOT IN (SELECT a FROM {t9})"
)
instance.query(
f"CREATE TABLE {t12} AS {t1} ENGINE = Buffer({t2.split('.')[0]}, {t2.split('.')[1]}, 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
)
instance.query(
f"CREATE TABLE {t13} AS {t1} ENGINE = Buffer((SELECT '{t2.split('.')[0]}'), (SELECT '{t2.split('.')[1]}'), 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
)
instance.query(
f"CREATE TABLE {t14} AS {t1} ENGINE = Buffer('', {t2.split('.')[1]}, 16, 10, 100, 10000, 1000000, 10000000, 100000000)",
database=t2.split(".")[0],
)
instance.query(
f"CREATE TABLE {t15} AS {t1} ENGINE = Buffer('', '', 16, 10, 100, 10000, 1000000, 10000000, 100000000)"
)
# Make backup.
@ -1386,8 +1402,14 @@ def test_tables_dependency():
# Drop everything in reversive order.
def drop():
instance.query(f"DROP DICTIONARY {t9}")
instance.query(f"DROP TABLE {t8} NO DELAY")
instance.query(f"DROP TABLE {t15} NO DELAY")
instance.query(f"DROP TABLE {t14} NO DELAY")
instance.query(f"DROP TABLE {t13} NO DELAY")
instance.query(f"DROP TABLE {t12} NO DELAY")
instance.query(f"DROP TABLE {t11} NO DELAY")
instance.query(f"DROP TABLE {t10} NO DELAY")
instance.query(f"DROP TABLE {t9} NO DELAY")
instance.query(f"DROP DICTIONARY {t8}")
instance.query(f"DROP TABLE {t7} NO DELAY")
instance.query(f"DROP TABLE {t6} NO DELAY")
instance.query(f"DROP TABLE {t5} NO DELAY")
@ -1406,7 +1428,7 @@ def test_tables_dependency():
# Check everything is restored.
assert instance.query(
"SELECT concat(database, '.', name) AS c FROM system.tables WHERE database IN ['test', 'test2'] ORDER BY c"
) == TSV(sorted([t1, t2, t3, t4, t5, t6, t7, t8, t9]))
) == TSV(sorted(random_table_names))
# Check logs.
instance.query("SYSTEM FLUSH LOGS")
@ -1421,8 +1443,20 @@ def test_tables_dependency():
f"Table {t5} has 1 dependencies: {t4} (level 2)",
f"Table {t6} has 1 dependencies: {t4} (level 2)",
f"Table {t7} has 1 dependencies: {t6} (level 3)",
f"Table {t8} has 1 dependencies: {t2} (level 1)",
f"Table {t9} has 1 dependencies: {t1} (level 1)",
f"Table {t8} has 1 dependencies: {t1} (level 1)",
f"Table {t9} has no dependencies (level 0)",
(
f"Table {t10} has 2 dependencies: {t1}, {t9} (level 1)",
f"Table {t10} has 2 dependencies: {t9}, {t1} (level 1)",
),
(
f"Table {t11} has 2 dependencies: {t2}, {t9} (level 1)",
f"Table {t11} has 2 dependencies: {t9}, {t2} (level 1)",
),
f"Table {t12} has 1 dependencies: {t2} (level 1)",
f"Table {t13} has 1 dependencies: {t2} (level 1)",
f"Table {t14} has 1 dependencies: {t2} (level 1)",
f"Table {t15} has no dependencies (level 0)",
]
for expect in expect_in_logs:
assert any(

View File

@ -0,0 +1,2 @@
mark_cache_size:
'@from_env': CONFIG_TEST_ENV

View File

@ -271,4 +271,6 @@
<anonymize>false</anonymize>
<endpoint>https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277</endpoint>
</send_crash_reports>
<mark_cache_size>123451234</mark_cache_size>
</clickhouse>

View File

@ -21,6 +21,7 @@ def test_extra_yaml_mix():
"configs/config.d/logging_no_rotate.xml",
"configs/config.d/log_to_console.yaml",
"configs/config.d/macros.yaml",
"configs/config.d/mark_cache_size.yaml",
"configs/config.d/metric_log.xml",
"configs/config.d/more_clusters.yaml",
"configs/config.d/part_log.xml",
@ -46,6 +47,7 @@ def test_extra_yaml_mix():
users_config_name="users.yaml",
copy_common_configs=False,
config_root_name="clickhouse",
env_variables={"CONFIG_TEST_ENV": "8956"},
)
try:

View File

@ -167,7 +167,7 @@ def test_smoke():
def test_smoke_parallel():
threads = []
for _ in range(100):
for _ in range(50):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
@ -178,7 +178,7 @@ def test_smoke_parallel():
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
for _ in range(90):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()

View File

@ -251,7 +251,7 @@ def test_restore_another_bucket_path(cluster, db_atomic, zero_copy):
node_another_bucket = cluster.instances["node_another_bucket"]
create_restore_file(node_another_bucket, bucket="root")
node_another_bucket.restart_clickhouse()
node_another_bucket.restart_clickhouse(stop_start_wait_sec=120)
create_table(
node_another_bucket, "test", schema, attach=True, db_atomic=db_atomic, uuid=uuid
)

Some files were not shown because too many files have changed in this diff Show More