Merge remote-tracking branch 'origin/master' into LWDRebuildProj

This commit is contained in:
jsc0218 2024-08-12 14:06:25 +00:00
commit a9d19c7aca
73 changed files with 2096 additions and 837 deletions

View File

@ -187,14 +187,6 @@ else ()
set(NO_WHOLE_ARCHIVE --no-whole-archive)
endif ()
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--gdb-index")
message (STATUS "Adding .gdb-index via --gdb-index linker option.")
endif ()
endif()
if (NOT (SANITIZE_COVERAGE OR WITH_COVERAGE)
AND (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
@ -402,7 +394,7 @@ if ((NOT OS_LINUX AND NOT OS_ANDROID) OR (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
set(ENABLE_GWP_ASAN OFF)
endif ()
option (ENABLE_FIU "Enable Fiu" ON)
option (ENABLE_LIBFIU "Enable libfiu" ON)
option(WERROR "Enable -Werror compiler option" ON)

View File

@ -179,7 +179,7 @@ else()
message(STATUS "Not using QPL")
endif ()
if (OS_LINUX AND ARCH_AMD64)
if (OS_LINUX AND ARCH_AMD64 AND NOT NO_SSE3_OR_HIGHER)
option (ENABLE_QATLIB "Enable Intel® QuickAssist Technology Library (QATlib)" ${ENABLE_LIBRARIES})
elseif(ENABLE_QATLIB)
message (${RECONFIGURE_MESSAGE_LEVEL} "QATLib is only supported on x86_64")

View File

@ -27,7 +27,7 @@ if (ENABLE_QAT_OUT_OF_TREE_BUILD)
${QAT_AL_INCLUDE_DIR}
${QAT_USDM_INCLUDE_DIR}
${ZSTD_LIBRARY_DIR})
target_compile_definitions(_qatzstd_plugin PRIVATE -DDEBUGLEVEL=0 PUBLIC -DENABLE_ZSTD_QAT_CODEC)
target_compile_definitions(_qatzstd_plugin PRIVATE -DDEBUGLEVEL=0)
add_library (ch_contrib::qatzstd_plugin ALIAS _qatzstd_plugin)
else () # In-tree build
message(STATUS "Intel QATZSTD in-tree build")
@ -78,7 +78,7 @@ else () # In-tree build
${QAT_USDM_INCLUDE_DIR}
${ZSTD_LIBRARY_DIR}
${LIBQAT_HEADER_DIR})
target_compile_definitions(_qatzstd_plugin PRIVATE -DDEBUGLEVEL=0 PUBLIC -DENABLE_ZSTD_QAT_CODEC -DINTREE)
target_compile_definitions(_qatzstd_plugin PRIVATE -DDEBUGLEVEL=0 PUBLIC -DINTREE)
target_include_directories(_qatzstd_plugin SYSTEM PUBLIC $<BUILD_INTERFACE:${QATZSTD_SRC_DIR}> $<INSTALL_INTERFACE:include>)
add_library (ch_contrib::qatzstd_plugin ALIAS _qatzstd_plugin)
endif ()

View File

@ -1,20 +1,21 @@
if (NOT ENABLE_FIU)
message (STATUS "Not using fiu")
if (NOT ENABLE_LIBFIU)
message (STATUS "Not using libfiu")
return ()
endif ()
set(FIU_DIR "${ClickHouse_SOURCE_DIR}/contrib/libfiu/")
set(LIBFIU_DIR "${ClickHouse_SOURCE_DIR}/contrib/libfiu/")
set(FIU_SOURCES
${FIU_DIR}/libfiu/fiu.c
${FIU_DIR}/libfiu/fiu-rc.c
${FIU_DIR}/libfiu/backtrace.c
${FIU_DIR}/libfiu/wtable.c
set(LIBFIU_SOURCES
${LIBFIU_DIR}/libfiu/fiu.c
${LIBFIU_DIR}/libfiu/fiu-rc.c
${LIBFIU_DIR}/libfiu/backtrace.c
${LIBFIU_DIR}/libfiu/wtable.c
)
set(FIU_HEADERS "${FIU_DIR}/libfiu")
set(LIBFIU_HEADERS "${LIBFIU_DIR}/libfiu")
add_library(_fiu ${FIU_SOURCES})
target_compile_definitions(_fiu PUBLIC DUMMY_BACKTRACE)
target_include_directories(_fiu PUBLIC ${FIU_HEADERS})
add_library(ch_contrib::fiu ALIAS _fiu)
add_library(_libfiu ${LIBFIU_SOURCES})
target_compile_definitions(_libfiu PUBLIC DUMMY_BACKTRACE)
target_compile_definitions(_libfiu PUBLIC FIU_ENABLE)
target_include_directories(_libfiu PUBLIC ${LIBFIU_HEADERS})
add_library(ch_contrib::libfiu ALIAS _libfiu)

View File

@ -728,10 +728,6 @@ add_library(_qpl STATIC ${LIB_DEPS})
target_include_directories(_qpl
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/> $<INSTALL_INTERFACE:include>)
target_compile_definitions(_qpl
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE ch_contrib::accel-config)

View File

@ -1,3 +1,5 @@
# docker build -t clickhouse/cctools .
# This is a hack to significantly reduce the build time of the clickhouse/binary-builder
# It's based on the assumption that we don't care of the cctools version so much
# It event does not depend on the clickhouse/fasttest in the `docker/images.json`
@ -30,5 +32,29 @@ RUN git clone https://github.com/tpoechtrager/cctools-port.git \
&& cd ../.. \
&& rm -rf cctools-port
#
# GDB
#
# ld from binutils is 2.38, which has the following error:
#
# DWARF error: invalid or unhandled FORM value: 0x23
#
ENV LD=ld.lld-${LLVM_VERSION}
ARG GDB_VERSION=15.1
RUN apt-get update \
&& apt-get install --yes \
libgmp-dev \
libmpfr-dev \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/*
RUN wget https://sourceware.org/pub/gdb/releases/gdb-$GDB_VERSION.tar.gz \
&& tar -xvf gdb-$GDB_VERSION.tar.gz \
&& cd gdb-$GDB_VERSION \
&& ./configure --prefix=/opt/gdb \
&& make -j $(nproc) \
&& make install \
&& rm -fr gdb-$GDB_VERSION gdb-$GDB_VERSION.tar.gz
FROM scratch
COPY --from=builder /cctools /cctools
COPY --from=builder /opt/gdb /opt/gdb

View File

@ -83,7 +83,7 @@ RUN arch=${TARGETARCH:-amd64} \
# Give suid to gdb to grant it attach permissions
# chmod 777 to make the container user independent
RUN chmod u+s /usr/bin/gdb \
RUN chmod u+s /opt/gdb/bin/gdb \
&& mkdir -p /var/lib/clickhouse \
&& chmod 777 /var/lib/clickhouse

View File

@ -11,7 +11,6 @@ RUN apt-get update \
curl \
default-jre \
g++ \
gdb \
iproute2 \
krb5-user \
libicu-dev \
@ -73,3 +72,6 @@ maxClientCnxns=80' > /opt/zookeeper/conf/zoo.cfg && \
ENV TZ=Etc/UTC
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY --from=clickhouse/cctools:0d6b90a7a490 /opt/gdb /opt/gdb
ENV PATH="/opt/gdb/bin:${PATH}"

View File

@ -30,7 +30,6 @@ RUN apt-get update \
luajit \
libssl-dev \
libcurl4-openssl-dev \
gdb \
default-jdk \
software-properties-common \
libkrb5-dev \
@ -87,6 +86,8 @@ COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/
COPY misc/ /misc/
COPY --from=clickhouse/cctools:0d6b90a7a490 /opt/gdb /opt/gdb
ENV PATH="/opt/gdb/bin:${PATH}"
# Same options as in test/base/Dockerfile
# (in case you need to override them in tests)

View File

@ -9,7 +9,6 @@ RUN apt-get update \
curl \
dmidecode \
g++ \
gdb \
git \
gnuplot \
imagemagick \
@ -42,6 +41,9 @@ RUN pip3 --no-cache-dir install -r requirements.txt
COPY run.sh /
COPY --from=clickhouse/cctools:0d6b90a7a490 /opt/gdb /opt/gdb
ENV PATH="/opt/gdb/bin:${PATH}"
CMD ["bash", "/run.sh"]
# docker run --network=host --volume <workspace>:/workspace --volume=<output>:/output -e PR_TO_TEST=<> -e SHA_TO_TEST=<> clickhouse/performance-comparison

View File

@ -69,8 +69,8 @@ ENV MAX_RUN_TIME=0
# Unrelated to vars in setup_minio.sh, but should be the same there
# to have the same binaries for local running scenario
ARG MINIO_SERVER_VERSION=2022-01-03T18-22-58Z
ARG MINIO_CLIENT_VERSION=2022-01-05T23-52-51Z
ARG MINIO_SERVER_VERSION=2024-08-03T04-33-23Z
ARG MINIO_CLIENT_VERSION=2024-07-31T15-58-33Z
ARG TARGETARCH
# Download Minio-related binaries

View File

@ -59,8 +59,8 @@ find_os() {
download_minio() {
local os
local arch
local minio_server_version=${MINIO_SERVER_VERSION:-2022-09-07T22-25-02Z}
local minio_client_version=${MINIO_CLIENT_VERSION:-2022-08-28T20-08-11Z}
local minio_server_version=${MINIO_SERVER_VERSION:-2024-08-03T04-33-23Z}
local minio_client_version=${MINIO_CLIENT_VERSION:-2024-07-31T15-58-33Z}
os=$(find_os)
arch=$(find_arch)
@ -82,10 +82,10 @@ setup_minio() {
local test_type=$1
./mc alias set clickminio http://localhost:11111 clickhouse clickhouse
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc admin policy attach clickminio readwrite --user=test
./mc mb --ignore-existing clickminio/test
if [ "$test_type" = "stateless" ]; then
./mc policy set public clickminio/test
./mc anonymous set public clickminio/test
fi
}

View File

@ -44,7 +44,6 @@ RUN apt-get update \
bash \
bsdmainutils \
build-essential \
gdb \
git \
gperf \
moreutils \
@ -58,3 +57,6 @@ RUN apt-get update \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/*
COPY process_functional_tests_result.py /
COPY --from=clickhouse/cctools:0d6b90a7a490 /opt/gdb /opt/gdb
ENV PATH="/opt/gdb/bin:${PATH}"

View File

@ -75,7 +75,7 @@ Data are received by this protocol and written to a [TimeSeries](/en/engines/tab
<my_rule_1>
<url>/write</url>
<handler>
<type>remote_write</type
<type>remote_write</type>
<database>db_name</database>
<table>time_series_table</table>
</handler>
@ -105,7 +105,7 @@ Data are read from a [TimeSeries](/en/engines/table-engines/special/time_series)
<my_rule_1>
<url>/read</url>
<handler>
<type>remote_read</type
<type>remote_read</type>
<database>db_name</database>
<table>time_series_table</table>
</handler>
@ -144,14 +144,14 @@ Multiple protocols can be specified together in one place:
<my_rule_2>
<url>/write</url>
<handler>
<type>remote_write</type
<type>remote_write</type>
<table>db_name.time_series_table</table>
</handler>
</my_rule_2>
<my_rule_3>
<url>/read</url>
<handler>
<type>remote_read</type
<type>remote_read</type>
<table>db_name.time_series_table</table>
</handler>
</my_rule_3>

File diff suppressed because it is too large Load Diff

View File

@ -143,7 +143,7 @@ void LocalServer::initialize(Poco::Util::Application & self)
if (fs::exists(config_path))
{
ConfigProcessor config_processor(config_path, false, true);
ConfigProcessor config_processor(config_path);
ConfigProcessor::setConfigPath(fs::path(config_path).parent_path());
auto loaded_config = config_processor.loadConfig();
getClientConfiguration().add(loaded_config.configuration.duplicate(), PRIO_DEFAULT, false);

View File

@ -68,13 +68,19 @@ const WithRetries::KeeperSettings & WithRetries::getKeeperSettings() const
WithRetries::FaultyKeeper WithRetries::getFaultyZooKeeper() const
{
/// We need to create new instance of ZooKeeperWithFaultInjection each time a copy a pointer to ZooKeeper client there
zkutil::ZooKeeperPtr current_zookeeper;
{
std::lock_guard lock(zookeeper_mutex);
current_zookeeper = zookeeper;
}
/// We need to create new instance of ZooKeeperWithFaultInjection each time and copy a pointer to ZooKeeper client there
/// The reason is that ZooKeeperWithFaultInjection may reset the underlying pointer and there could be a race condition
/// when the same object is used from multiple threads.
auto faulty_zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.keeper_fault_injection_probability,
settings.keeper_fault_injection_seed,
zookeeper,
current_zookeeper,
log->name(),
log);

View File

@ -353,8 +353,8 @@ target_link_libraries(clickhouse_common_io
Poco::Foundation
)
if (TARGET ch_contrib::fiu)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::fiu)
if (TARGET ch_contrib::libfiu)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::libfiu)
endif()
if (TARGET ch_contrib::cpuid)
@ -556,14 +556,13 @@ target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::lz4)
if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
target_link_libraries (clickhouse_compression PUBLIC ch_contrib::qpl)
target_link_libraries (clickhouse_compression PUBLIC ch_contrib::accel-config)
endif ()
if (TARGET ch_contrib::accel-config)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
endif ()
if (TARGET ch_contrib::qatzstd_plugin)
if (TARGET ch_contrib::accel-config AND TARGET ch_contrib::qatzstd_plugin)
dbms_target_link_libraries(PUBLIC ch_contrib::qatzstd_plugin)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
target_link_libraries(clickhouse_common_io PUBLIC ch_contrib::qatzstd_plugin)
endif ()

View File

@ -7,6 +7,8 @@
#include <condition_variable>
#include <mutex>
#include "config.h"
namespace DB
{
@ -15,7 +17,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
};
#if FIU_ENABLE
#if USE_LIBFIU
static struct InitFiu
{
InitFiu()
@ -135,7 +137,7 @@ void FailPointInjection::pauseFailPoint(const String & fail_point_name)
void FailPointInjection::enableFailPoint(const String & fail_point_name)
{
#if FIU_ENABLE
#if USE_LIBFIU
#define SUB_M(NAME, flags, pause) \
if (fail_point_name == FailPoints::NAME) \
{ \

View File

@ -1,17 +1,16 @@
#pragma once
#include "config.h"
#include <Common/Exception.h>
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "config.h"
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdocumentation"
#pragma clang diagnostic ignored "-Wreserved-macro-identifier"
#include <fiu.h>
#include <fiu-control.h>
# include <fiu.h>
# include <fiu-control.h>
#pragma clang diagnostic pop
#include <unordered_map>

View File

@ -237,7 +237,14 @@ std::unique_ptr<ShellCommand> ShellCommand::executeImpl(
res->write_fds.emplace(fd, fds.fds_rw[1]);
}
LOG_TRACE(getLogger(), "Started shell command '{}' with pid {}", filename, pid);
LOG_TRACE(
getLogger(),
"Started shell command '{}' with pid {} and file descriptors: out {}, err {}",
filename,
pid,
res->out.getFD(),
res->err.getFD());
return res;
}

View File

@ -27,12 +27,14 @@
#include <Common/logger_useful.h>
#include <base/scope_guard.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int ABORTED;
}
ISystemLog::~ISystemLog() = default;
@ -86,32 +88,18 @@ void SystemLogQueue<LogElement>::push(LogElement&& element)
// by one, under exclusive lock, so we will see each message count.
// It is enough to only wake the flushing thread once, after the message
// count increases past half available size.
const uint64_t queue_end = queue_front_index + queue.size();
requested_flush_up_to = std::max(requested_flush_up_to, queue_end);
flush_event.notify_all();
const auto last_log_index = queue_front_index + queue.size();
notifyFlushUnlocked(last_log_index, /* should_prepare_tables_anyway */ false);
}
if (queue.size() >= settings.max_size_rows)
{
chassert(queue.size() == settings.max_size_rows);
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
// problematic in case of trace log. Remember what the front index of the
// queue was when we last logged the message. If it changed, it means the
// queue was flushed, and we can log again.
if (queue_front_index != logged_queue_full_at_index)
{
logged_queue_full_at_index = queue_front_index;
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}. max_size_rows {}",
demangle(typeid(*this).name()),
queue_front_index,
settings.max_size_rows);
}
// To the next batch we add a log message about how much we have lost
++ignored_logs;
return;
}
@ -127,20 +115,50 @@ template <typename LogElement>
void SystemLogQueue<LogElement>::handleCrash()
{
if (settings.notify_flush_on_crash)
notifyFlush(/* force */ true);
{
notifyFlush(getLastLogIndex(), /* should_prepare_tables_anyway */ true);
}
}
template <typename LogElement>
void SystemLogQueue<LogElement>::waitFlush(uint64_t expected_flushed_up_to)
void SystemLogQueue<LogElement>::notifyFlushUnlocked(Index expected_flushed_index, bool should_prepare_tables_anyway)
{
if (should_prepare_tables_anyway)
requested_prepare_tables = std::max(requested_prepare_tables, expected_flushed_index);
requested_flush_index = std::max(requested_flush_index, expected_flushed_index);
flush_event.notify_all();
}
template <typename LogElement>
void SystemLogQueue<LogElement>::notifyFlush(SystemLogQueue<LogElement>::Index expected_flushed_index, bool should_prepare_tables_anyway)
{
std::lock_guard lock(mutex);
notifyFlushUnlocked(expected_flushed_index, should_prepare_tables_anyway);
}
template <typename LogElement>
void SystemLogQueue<LogElement>::waitFlush(SystemLogQueue<LogElement>::Index expected_flushed_index, bool should_prepare_tables_anyway)
{
LOG_DEBUG(log, "Requested flush up to offset {}", expected_flushed_index);
// Use an arbitrary timeout to avoid endless waiting. 60s proved to be
// too fast for our parallel functional tests, probably because they
// heavily load the disk.
const int timeout_seconds = 180;
std::unique_lock lock(mutex);
bool result = flush_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
// there is no obligation to call notifyFlush before waitFlush, than we have to be sure that flush_event has been triggered before we wait the result
notifyFlushUnlocked(expected_flushed_index, should_prepare_tables_anyway);
auto result = confirm_event.wait_for(lock, std::chrono::seconds(timeout_seconds), [&]
{
return flushed_up_to >= expected_flushed_up_to && !is_force_prepare_tables;
if (should_prepare_tables_anyway)
return (flushed_index >= expected_flushed_index && prepared_tables >= requested_prepare_tables) || is_shutdown;
else
return (flushed_index >= expected_flushed_index) || is_shutdown;
});
if (!result)
@ -148,67 +166,63 @@ void SystemLogQueue<LogElement>::waitFlush(uint64_t expected_flushed_up_to)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout exceeded ({} s) while flushing system log '{}'.",
toString(timeout_seconds), demangle(typeid(*this).name()));
}
}
template <typename LogElement>
uint64_t SystemLogQueue<LogElement>::notifyFlush(bool should_prepare_tables_anyway)
{
uint64_t this_thread_requested_offset;
if (is_shutdown)
{
std::lock_guard lock(mutex);
if (is_shutdown)
return uint64_t(-1);
this_thread_requested_offset = queue_front_index + queue.size();
// Publish our flush request, taking care not to overwrite the requests
// made by other threads.
is_force_prepare_tables |= should_prepare_tables_anyway;
requested_flush_up_to = std::max(requested_flush_up_to, this_thread_requested_offset);
flush_event.notify_all();
throw Exception(ErrorCodes::ABORTED, "Shutdown has been called while flushing system log '{}'. Aborting.",
demangle(typeid(*this).name()));
}
LOG_DEBUG(log, "Requested flush up to offset {}", this_thread_requested_offset);
return this_thread_requested_offset;
}
template <typename LogElement>
void SystemLogQueue<LogElement>::confirm(uint64_t to_flush_end)
SystemLogQueue<LogElement>::Index SystemLogQueue<LogElement>::getLastLogIndex()
{
std::lock_guard lock(mutex);
flushed_up_to = to_flush_end;
is_force_prepare_tables = false;
flush_event.notify_all();
return queue_front_index + queue.size();
}
template <typename LogElement>
typename SystemLogQueue<LogElement>::Index SystemLogQueue<LogElement>::pop(std::vector<LogElement> & output,
bool & should_prepare_tables_anyway,
bool & exit_this_thread)
void SystemLogQueue<LogElement>::confirm(SystemLogQueue<LogElement>::Index last_flashed_index)
{
/// Call dtors and deallocate strings without holding the global lock
output.resize(0);
std::lock_guard lock(mutex);
prepared_tables = std::max(prepared_tables, last_flashed_index);
flushed_index = std::max(flushed_index, last_flashed_index);
confirm_event.notify_all();
}
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(settings.flush_interval_milliseconds),
[&] ()
template <typename LogElement>
typename SystemLogQueue<LogElement>::PopResult SystemLogQueue<LogElement>::pop()
{
PopResult result;
size_t prev_ignored_logs = 0;
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock, std::chrono::milliseconds(settings.flush_interval_milliseconds), [&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
}
);
return requested_flush_index > flushed_index || requested_prepare_tables > prepared_tables || is_shutdown;
});
queue_front_index += queue.size();
// Swap with existing array from previous flush, to save memory
// allocations.
queue.swap(output);
if (is_shutdown)
return PopResult{.is_shutdown = true};
should_prepare_tables_anyway = is_force_prepare_tables;
queue_front_index += queue.size();
prev_ignored_logs = ignored_logs;
ignored_logs = 0;
exit_this_thread = is_shutdown;
return queue_front_index;
result.last_log_index = queue_front_index;
result.logs.swap(queue);
result.create_table_force = requested_prepare_tables > prepared_tables;
}
if (prev_ignored_logs)
LOG_ERROR(log, "Queue had been full at {}, accepted {} logs, ignored {} logs.",
result.last_log_index - result.logs.size(),
result.logs.size(),
prev_ignored_logs);
return result;
}
template <typename LogElement>
@ -229,13 +243,21 @@ SystemLogBase<LogElement>::SystemLogBase(
}
template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
SystemLogBase<LogElement>::Index SystemLogBase<LogElement>::getLastLogIndex()
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;
return queue->getLastLogIndex();
}
queue->waitFlush(this_thread_requested_offset);
template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway)
{
queue->notifyFlush(expected_flushed_index, should_prepare_tables_anyway);
}
template <typename LogElement>
void SystemLogBase<LogElement>::flush(Index expected_flushed_index, bool should_prepare_tables_anyway)
{
queue->waitFlush(expected_flushed_index, should_prepare_tables_anyway);
}
template <typename LogElement>
@ -257,9 +279,6 @@ void SystemLogBase<LogElement>::add(LogElement element)
queue->push(std::move(element));
}
template <typename LogElement>
void SystemLogBase<LogElement>::notifyFlush(bool force) { queue->notifyFlush(force); }
#define INSTANTIATE_SYSTEM_LOG_BASE(ELEMENT) template class SystemLogBase<ELEMENT>;
SYSTEM_LOG_ELEMENTS(INSTANTIATE_SYSTEM_LOG_BASE)

View File

@ -1,6 +1,7 @@
#pragma once
#include <condition_variable>
#include <limits>
#include <memory>
#include <vector>
#include <base/types.h>
@ -54,10 +55,19 @@ struct StorageID;
class ISystemLog
{
public:
using Index = int64_t;
virtual String getName() const = 0;
//// force -- force table creation (used for SYSTEM FLUSH LOGS)
virtual void flush(bool force = false) = 0; /// NOLINT
/// Return the index of the latest added log element. That index no less than the flashed index.
/// The flashed index is the index of the last log element which has been flushed successfully.
/// Thereby all the records whose index is less than the flashed index are flushed already.
virtual Index getLastLogIndex() = 0;
/// Call this method to wake up the flush thread and flush the data in the background. It is non blocking call
virtual void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0;
/// Call this method to wait intill the logs are flushed up to expected_flushed_index. It is blocking call.
virtual void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) = 0;
virtual void prepareTable() = 0;
/// Start the background thread.
@ -97,26 +107,38 @@ struct SystemLogQueueSettings
template <typename LogElement>
class SystemLogQueue
{
using Index = uint64_t;
public:
using Index = ISystemLog::Index;
explicit SystemLogQueue(const SystemLogQueueSettings & settings_);
void shutdown();
// producer methods
void push(LogElement && element);
Index notifyFlush(bool should_prepare_tables_anyway);
void waitFlush(Index expected_flushed_up_to);
Index getLastLogIndex();
void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway);
void waitFlush(Index expected_flushed_index, bool should_prepare_tables_anyway);
/// Handles crash, flushes log without blocking if notify_flush_on_crash is set
void handleCrash();
struct PopResult
{
Index last_log_index = 0;
std::vector<LogElement> logs = {};
bool create_table_force = false;
bool is_shutdown = false;
};
// consumer methods
Index pop(std::vector<LogElement>& output, bool & should_prepare_tables_anyway, bool & exit_this_thread);
void confirm(Index to_flush_end);
PopResult pop();
void confirm(Index last_flashed_index);
private:
void notifyFlushUnlocked(Index expected_flushed_index, bool should_prepare_tables_anyway);
/// Data shared between callers of add()/flush()/shutdown(), and the saving thread
std::mutex mutex;
@ -124,22 +146,32 @@ private:
// Queue is bounded. But its size is quite large to not block in all normal cases.
std::vector<LogElement> queue;
// An always-incrementing index of the first message currently in the queue.
// We use it to give a global sequential index to every message, so that we
// can wait until a particular message is flushed. This is used to implement
// synchronous log flushing for SYSTEM FLUSH LOGS.
Index queue_front_index = 0;
// A flag that says we must create the tables even if the queue is empty.
bool is_force_prepare_tables = false;
// Requested to flush logs up to this index, exclusive
Index requested_flush_up_to = 0;
Index requested_flush_index = std::numeric_limits<Index>::min();
// Flushed log up to this index, exclusive
Index flushed_up_to = 0;
// Logged overflow message at this queue front index
Index logged_queue_full_at_index = -1;
Index flushed_index = 0;
// The same logic for the prepare tables: if requested_prepar_tables > prepared_tables we need to do prepare
// except that initial prepared_tables is -1
// it is due to the difference: when no logs have been written and we call flush logs
// it becomes in the state: requested_flush_index = 0 and flushed_index = 0 -- we do not want to do anything
// but if we need to prepare tables it becomes requested_prepare_tables = 0 and prepared_tables = -1
// we trigger background thread and do prepare
Index requested_prepare_tables = std::numeric_limits<Index>::min();
Index prepared_tables = -1;
size_t ignored_logs = 0;
bool is_shutdown = false;
std::condition_variable confirm_event;
std::condition_variable flush_event;
const SystemLogQueueSettings settings;
@ -150,6 +182,7 @@ template <typename LogElement>
class SystemLogBase : public ISystemLog
{
public:
using Index = ISystemLog::Index;
using Self = SystemLogBase;
explicit SystemLogBase(
@ -163,15 +196,16 @@ public:
*/
void add(LogElement element);
Index getLastLogIndex() override;
void notifyFlush(Index expected_flushed_index, bool should_prepare_tables_anyway) override;
/// Flush data in the buffer to disk. Block the thread until the data is stored on disk.
void flush(bool force) override;
void flush(Index expected_flushed_index, bool should_prepare_tables_anyway) override;
/// Handles crash, flushes log without blocking if notify_flush_on_crash is set
void handleCrash() override;
/// Non-blocking flush data in the buffer to disk.
void notifyFlush(bool force);
String getName() const override { return LogElement::name(); }
static const char * getDefaultOrderBy() { return "event_date, event_time"; }

View File

@ -32,6 +32,8 @@
#cmakedefine01 USE_IDNA
#cmakedefine01 USE_NLP
#cmakedefine01 USE_VECTORSCAN
#cmakedefine01 USE_QPL
#cmakedefine01 USE_QATLIB
#cmakedefine01 USE_LIBURING
#cmakedefine01 USE_AVRO
#cmakedefine01 USE_CAPNP
@ -59,7 +61,7 @@
#cmakedefine01 USE_SKIM
#cmakedefine01 USE_PRQL
#cmakedefine01 USE_ULID
#cmakedefine01 FIU_ENABLE
#cmakedefine01 USE_LIBFIU
#cmakedefine01 USE_BCRYPT
#cmakedefine01 USE_LIBARCHIVE
#cmakedefine01 USE_POCKETFFT

View File

@ -1,7 +1,3 @@
#ifdef ENABLE_QPL_COMPRESSION
#include <cstdio>
#include <thread>
#include <Compression/CompressionCodecDeflateQpl.h>
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionInfo.h>
@ -11,6 +7,10 @@
#include <Common/randomSeed.h>
#include <base/scope_guard.h>
#include <base/getPageSize.h>
#include <cstdio>
#include <thread>
#if USE_QPL
#include "libaccel_config.h"

View File

@ -4,6 +4,11 @@
#include <map>
#include <random>
#include <pcg_random.hpp>
#include "config.h"
#if USE_QPL
#include <qpl/qpl.h>
namespace Poco
@ -117,3 +122,4 @@ private:
};
}
#endif

View File

@ -1,4 +1,6 @@
#ifdef ENABLE_ZSTD_QAT_CODEC
#include "config.h"
#if USE_QATLIB
#include <Common/logger_useful.h>
#include <Compression/CompressionCodecZSTD.h>
@ -6,6 +8,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/IAST.h>
#include <Poco/Logger.h>
#include <qatseqprod.h>
#include <zstd.h>

View File

@ -1,20 +1,20 @@
#include "config.h"
#include <Compression/CompressionFactory.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Poco/String.h>
#include <IO/ReadBuffer.h>
#include <Parsers/queryToString.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionCodecNone.h>
#include <IO/WriteHelpers.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Poco/String.h>
#include <boost/algorithm/string/join.hpp>
#include "config.h"
namespace DB
{
@ -175,11 +175,11 @@ void registerCodecNone(CompressionCodecFactory & factory);
void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
#ifdef ENABLE_ZSTD_QAT_CODEC
#if USE_QATLIB
void registerCodecZSTDQAT(CompressionCodecFactory & factory);
#endif
void registerCodecMultiple(CompressionCodecFactory & factory);
#ifdef ENABLE_QPL_COMPRESSION
#if USE_QPL
void registerCodecDeflateQpl(CompressionCodecFactory & factory);
#endif
@ -198,7 +198,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecNone(*this);
registerCodecLZ4(*this);
registerCodecZSTD(*this);
#ifdef ENABLE_ZSTD_QAT_CODEC
#if USE_QATLIB
registerCodecZSTDQAT(*this);
#endif
registerCodecLZ4HC(*this);
@ -209,7 +209,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecGorilla(*this);
registerCodecEncrypted(*this);
registerCodecFPC(*this);
#ifdef ENABLE_QPL_COMPRESSION
#if USE_QPL
registerCodecDeflateQpl(*this);
#endif
registerCodecGCD(*this);

View File

@ -604,7 +604,7 @@ class IColumn;
M(Bool, optimize_if_chain_to_multiif, false, "Replace if(cond1, then1, if(cond2, ...)) chains to multiIf. Currently it's not beneficial for numeric types.", 0) \
M(Bool, optimize_multiif_to_if, true, "Replace 'multiIf' with only one condition to 'if'.", 0) \
M(Bool, optimize_if_transform_strings_to_enum, false, "Replaces string-type arguments in If and Transform to enum. Disabled by default cause it could make inconsistent change in distributed query that would lead to its fail.", 0) \
M(Bool, optimize_functions_to_subcolumns, false, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, optimize_functions_to_subcolumns, true, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, optimize_using_constraints, false, "Use constraints for query optimization", 0) \
M(Bool, optimize_substitute_columns, false, "Use constraints for column substitution", 0) \
M(Bool, optimize_append_index, false, "Use constraints in order to append index condition (indexHint)", 0) \

View File

@ -84,6 +84,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},
{"optimize_functions_to_subcolumns", false, true, "Enabled settings by default"},
}
},
{"24.7",

View File

@ -305,7 +305,8 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
S3::ListObjectsV2Request request;
request.SetBucket(uri.bucket);
request.SetPrefix(path);
if (path != "/")
request.SetPrefix(path);
if (max_keys)
request.SetMaxKeys(static_cast<int>(max_keys));
else

View File

@ -267,7 +267,12 @@ struct TimeWindowImpl<TUMBLE_START>
{
auto type = WhichDataType(arguments[0].type);
if (type.isTuple())
return std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElement(0);
{
const auto & tuple_elems = std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElements();
if (tuple_elems.empty())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Tuple passed to {} should not be empty", function_name);
return tuple_elems[0];
}
else if (type.isUInt32())
return std::make_shared<DataTypeDateTime>();
else
@ -622,7 +627,12 @@ struct TimeWindowImpl<HOP_START>
{
auto type = WhichDataType(arguments[0].type);
if (type.isTuple())
return std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElement(0);
{
const auto & tuple_elems = std::static_pointer_cast<const DataTypeTuple>(arguments[0].type)->getElements();
if (tuple_elems.empty())
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Tuple passed to {} should not be empty", function_name);
return tuple_elems[0];
}
else if (type.isUInt32())
return std::make_shared<DataTypeDateTime>();
else

View File

@ -99,6 +99,7 @@
#include <Common/logger_useful.h>
#include <Common/RemoteHostFilter.h>
#include <Common/HTTPHeaderFilter.h>
#include <Interpreters/SystemLog.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/DatabaseCatalog.h>
@ -618,7 +619,7 @@ struct ContextSharedPart : boost::noncopyable
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log.
*/
SHUTDOWN(log, "system logs", system_logs, shutdown());
SHUTDOWN(log, "system logs", system_logs, flushAndShutdown());
LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown();
@ -4255,7 +4256,7 @@ std::shared_ptr<ObjectStorageQueueLog> Context::getS3QueueLog() const
if (!shared->system_logs)
return {};
return shared->system_logs->s3_queue_log;
return shared->system_logs->s3queue_log;
}
std::shared_ptr<ObjectStorageQueueLog> Context::getAzureQueueLog() const
@ -4312,13 +4313,13 @@ std::shared_ptr<BlobStorageLog> Context::getBlobStorageLog() const
return shared->system_logs->blob_storage_log;
}
std::vector<ISystemLog *> Context::getSystemLogs() const
SystemLogs Context::getSystemLogs() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->logs;
return *shared->system_logs;
}
std::optional<Context::Dashboards> Context::getDashboards() const

View File

@ -48,6 +48,8 @@ namespace DB
class ASTSelectQuery;
class SystemLogs;
struct ContextSharedPart;
class ContextAccess;
class ContextAccessWrapper;
@ -1150,7 +1152,7 @@ public:
std::shared_ptr<BackupLog> getBackupLog() const;
std::shared_ptr<BlobStorageLog> getBlobStorageLog() const;
std::vector<ISystemLog *> getSystemLogs() const;
SystemLogs getSystemLogs() const;
using Dashboards = std::vector<std::map<String, String>>;
std::optional<Dashboards> getDashboards() const;

View File

@ -710,14 +710,8 @@ BlockIO InterpreterSystemQuery::execute()
case Type::FLUSH_LOGS:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_LOGS);
auto logs = getContext()->getSystemLogs();
std::vector<std::function<void()>> commands;
commands.reserve(logs.size());
for (auto * system_log : logs)
commands.emplace_back([system_log] { system_log->flush(true); });
executeCommandsAndThrowIfError(commands);
auto system_logs = getContext()->getSystemLogs();
system_logs.flush(true);
break;
}
case Type::STOP_LISTEN:

View File

@ -1,6 +1,7 @@
#include <Interpreters/SystemLog.h>
#include <base/scope_guard.h>
#include <Common/SystemLogBase.h>
#include <Common/logger_useful.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/quoteString.h>
@ -49,6 +50,7 @@
#include <fmt/core.h>
namespace DB
{
@ -282,85 +284,21 @@ ASTPtr getCreateTableQueryClean(const StorageID & table_id, ContextPtr context)
SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config)
{
query_log = createSystemLog<QueryLog>(global_context, "system", "query_log", config, "query_log", "Contains information about executed queries, for example, start time, duration of processing, error messages.");
query_thread_log = createSystemLog<QueryThreadLog>(global_context, "system", "query_thread_log", config, "query_thread_log", "Contains information about threads that execute queries, for example, thread name, thread start time, duration of query processing.");
part_log = createSystemLog<PartLog>(global_context, "system", "part_log", config, "part_log", "This table contains information about events that occurred with data parts in the MergeTree family tables, such as adding or merging data.");
trace_log = createSystemLog<TraceLog>(global_context, "system", "trace_log", config, "trace_log", "Contains stack traces collected by the sampling query profiler.");
crash_log = createSystemLog<CrashLog>(global_context, "system", "crash_log", config, "crash_log", "Contains information about stack traces for fatal errors. The table does not exist in the database by default, it is created only when fatal errors occur.");
text_log = createSystemLog<TextLog>(global_context, "system", "text_log", config, "text_log", "Contains logging entries which are normally written to a log file or to stdout.");
metric_log = createSystemLog<MetricLog>(global_context, "system", "metric_log", config, "metric_log", "Contains history of metrics values from tables system.metrics and system.events, periodically flushed to disk.");
error_log = createSystemLog<ErrorLog>(global_context, "system", "error_log", config, "error_log", "Contains history of error values from table system.errors, periodically flushed to disk.");
filesystem_cache_log = createSystemLog<FilesystemCacheLog>(global_context, "system", "filesystem_cache_log", config, "filesystem_cache_log", "Contains a history of all events occurred with filesystem cache for objects on a remote filesystem.");
filesystem_read_prefetches_log = createSystemLog<FilesystemReadPrefetchesLog>(
global_context, "system", "filesystem_read_prefetches_log", config, "filesystem_read_prefetches_log", "Contains a history of all prefetches done during reading from MergeTables backed by a remote filesystem.");
asynchronous_metric_log = createSystemLog<AsynchronousMetricLog>(
global_context, "system", "asynchronous_metric_log", config,
"asynchronous_metric_log", "Contains the historical values for system.asynchronous_metrics, once per time interval (one second by default).");
opentelemetry_span_log = createSystemLog<OpenTelemetrySpanLog>(
global_context, "system", "opentelemetry_span_log", config,
"opentelemetry_span_log", "Contains information about trace spans for executed queries.");
query_views_log = createSystemLog<QueryViewsLog>(global_context, "system", "query_views_log", config, "query_views_log", "Contains information about the dependent views executed when running a query, for example, the view type or the execution time.");
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log", "This table contains information about the parameters of the request to the ZooKeeper server and the response from it.");
session_log = createSystemLog<SessionLog>(global_context, "system", "session_log", config, "session_log", "Contains information about all successful and failed login and logout events.");
transactions_info_log = createSystemLog<TransactionsInfoLog>(
global_context, "system", "transactions_info_log", config, "transactions_info_log", "Contains information about all transactions executed on a current server.");
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log", "Contains profiling information on processors level (building blocks for a pipeline for query execution.");
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log", "Contains a history for all asynchronous inserts executed on current server.");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log", "Contains logging entries with the information about BACKUP and RESTORE operations.");
s3_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
azure_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes.");
/// NOLINTBEGIN(bugprone-macro-parentheses)
#define CREATE_PUBLIC_MEMBERS(log_type, member, descr) \
member = createSystemLog<log_type>(global_context, "system", #member, config, #member, descr); \
LIST_OF_ALL_SYSTEM_LOGS(CREATE_PUBLIC_MEMBERS)
#undef CREATE_PUBLIC_MEMBERS
/// NOLINTEND(bugprone-macro-parentheses)
if (query_log)
logs.emplace_back(query_log.get());
if (query_thread_log)
logs.emplace_back(query_thread_log.get());
if (part_log)
logs.emplace_back(part_log.get());
if (trace_log)
logs.emplace_back(trace_log.get());
if (crash_log)
logs.emplace_back(crash_log.get());
if (text_log)
logs.emplace_back(text_log.get());
if (metric_log)
logs.emplace_back(metric_log.get());
if (error_log)
logs.emplace_back(error_log.get());
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
if (opentelemetry_span_log)
logs.emplace_back(opentelemetry_span_log.get());
if (query_views_log)
logs.emplace_back(query_views_log.get());
if (zookeeper_log)
logs.emplace_back(zookeeper_log.get());
if (session_log)
{
logs.emplace_back(session_log.get());
global_context->addWarningMessage("Table system.session_log is enabled. It's unreliable and may contain garbage. Do not use it for any kind of security monitoring.");
}
if (transactions_info_log)
logs.emplace_back(transactions_info_log.get());
if (processors_profile_log)
logs.emplace_back(processors_profile_log.get());
if (filesystem_cache_log)
logs.emplace_back(filesystem_cache_log.get());
if (filesystem_read_prefetches_log)
logs.emplace_back(filesystem_read_prefetches_log.get());
if (asynchronous_insert_log)
logs.emplace_back(asynchronous_insert_log.get());
if (backup_log)
logs.emplace_back(backup_log.get());
if (s3_queue_log)
logs.emplace_back(s3_queue_log.get());
if (blob_storage_log)
logs.emplace_back(blob_storage_log.get());
bool should_prepare = global_context->getServerSettings().prepare_system_log_tables_on_startup;
try
{
for (auto & log : logs)
for (auto & log : getAllLogs())
{
log->startup();
if (should_prepare)
@ -394,20 +332,54 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
}
}
SystemLogs::~SystemLogs()
std::vector<ISystemLog *> SystemLogs::getAllLogs() const
{
#define GET_RAW_POINTERS(log_type, member, descr) \
(member).get(), \
std::vector<ISystemLog *> result = {
LIST_OF_ALL_SYSTEM_LOGS(GET_RAW_POINTERS)
};
#undef GET_RAW_POINTERS
auto last_it = std::remove(result.begin(), result.end(), nullptr);
result.erase(last_it, result.end());
return result;
}
void SystemLogs::flush(bool should_prepare_tables_anyway)
{
auto logs = getAllLogs();
std::vector<ISystemLog::Index> logs_indexes(logs.size(), 0);
for (size_t i = 0; i < logs.size(); ++i)
{
auto last_log_index = logs[i]->getLastLogIndex();
logs_indexes[i] = last_log_index;
logs[i]->notifyFlush(last_log_index, should_prepare_tables_anyway);
}
for (size_t i = 0; i < logs.size(); ++i)
logs[i]->flush(logs_indexes[i], should_prepare_tables_anyway);
}
void SystemLogs::flushAndShutdown()
{
flush(/* should_prepare_tables_anyway */ false);
shutdown();
}
void SystemLogs::shutdown()
{
auto logs = getAllLogs();
for (auto & log : logs)
log->shutdown();
}
void SystemLogs::handleCrash()
{
auto logs = getAllLogs();
for (auto & log : logs)
log->handleCrash();
}
@ -462,33 +434,26 @@ void SystemLog<LogElement>::savingThreadFunction()
{
setThreadName("SystemLogFlush");
std::vector<LogElement> to_flush;
bool exit_this_thread = false;
while (!exit_this_thread)
while (true)
{
try
{
// The end index (exclusive, like std end()) of the messages we are
// going to flush.
uint64_t to_flush_end = 0;
// Should we prepare table even if there are no new messages.
bool should_prepare_tables_anyway = false;
auto result = queue->pop();
to_flush_end = queue->pop(to_flush, should_prepare_tables_anyway, exit_this_thread);
if (to_flush.empty())
if (result.is_shutdown)
{
if (should_prepare_tables_anyway)
{
prepareTable();
LOG_TRACE(log, "Table created (force)");
queue->confirm(to_flush_end);
}
LOG_TRACE(log, "Terminating");
return;
}
else
if (!result.logs.empty())
{
flushImpl(to_flush, to_flush_end);
flushImpl(result.logs, result.last_log_index);
}
else if (result.create_table_force)
{
prepareTable();
queue->confirm(result.last_log_index);
}
}
catch (...)
@ -496,7 +461,6 @@ void SystemLog<LogElement>::savingThreadFunction()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_TRACE(log, "Terminating");
}

View File

@ -5,6 +5,32 @@
#include <Parsers/IAST.h>
#include <boost/noncopyable.hpp>
#include <vector>
#define LIST_OF_ALL_SYSTEM_LOGS(M) \
M(QueryLog, query_log, "Contains information about executed queries, for example, start time, duration of processing, error messages.") \
M(QueryThreadLog, query_thread_log, "Contains information about threads that execute queries, for example, thread name, thread start time, duration of query processing.") \
M(PartLog, part_log, "This table contains information about events that occurred with data parts in the MergeTree family tables, such as adding or merging data.") \
M(TraceLog, trace_log, "Contains stack traces collected by the sampling query profiler.") \
M(CrashLog, crash_log, "Contains information about stack traces for fatal errors. The table does not exist in the database by default, it is created only when fatal errors occur.") \
M(TextLog, text_log, "Contains logging entries which are normally written to a log file or to stdout.") \
M(MetricLog, metric_log, "Contains history of metrics values from tables system.metrics and system.events, periodically flushed to disk.") \
M(ErrorLog, error_log, "Contains history of error values from table system.errors, periodically flushed to disk.") \
M(FilesystemCacheLog, filesystem_cache_log, "Contains a history of all events occurred with filesystem cache for objects on a remote filesystem.") \
M(FilesystemReadPrefetchesLog, filesystem_read_prefetches_log, "Contains a history of all prefetches done during reading from MergeTables backed by a remote filesystem.") \
M(ObjectStorageQueueLog, s3queue_log, "Contains logging entries with the information files processes by S3Queue engine.") \
M(ObjectStorageQueueLog, azure_queue_log, "Contains logging entries with the information files processes by S3Queue engine.") \
M(AsynchronousMetricLog, asynchronous_metric_log, "Contains the historical values for system.asynchronous_metrics, once per time interval (one second by default).") \
M(OpenTelemetrySpanLog, opentelemetry_span_log, "Contains information about trace spans for executed queries.") \
M(QueryViewsLog, query_views_log, "Contains information about the dependent views executed when running a query, for example, the view type or the execution time.") \
M(ZooKeeperLog, zookeeper_log, "This table contains information about the parameters of the request to the ZooKeeper server and the response from it.") \
M(SessionLog, session_log, "Contains information about all successful and failed login and logout events.") \
M(TransactionsInfoLog, transactions_info_log, "Contains information about all transactions executed on a current server.") \
M(ProcessorsProfileLog, processors_profile_log, "Contains profiling information on processors level (building blocks for a pipeline for query execution.") \
M(AsynchronousInsertLog, asynchronous_insert_log, "Contains a history for all asynchronous inserts executed on current server.") \
M(BackupLog, backup_log, "Contains logging entries with the information about BACKUP and RESTORE operations.") \
M(BlobStorageLog, blob_storage_log, "Contains logging entries with information about various blob storage operations such as uploads and deletes.") \
namespace DB
{
@ -34,71 +60,37 @@ namespace DB
};
*/
class QueryLog;
class QueryThreadLog;
class PartLog;
class TextLog;
class TraceLog;
class CrashLog;
class ErrorLog;
class MetricLog;
class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class QueryViewsLog;
class ZooKeeperLog;
class SessionLog;
class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class ObjectStorageQueueLog;
class BlobStorageLog;
/// NOLINTBEGIN(bugprone-macro-parentheses)
#define FORWARD_DECLARATION(log_type, member, descr) \
class log_type; \
LIST_OF_ALL_SYSTEM_LOGS(FORWARD_DECLARATION)
#undef FORWARD_DECLARATION
/// NOLINTEND(bugprone-macro-parentheses)
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
class SystemLogs
{
public:
SystemLogs() = default;
SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConfiguration & config);
~SystemLogs();
SystemLogs(const SystemLogs & other) = default;
void flush(bool should_prepare_tables_anyway);
void flushAndShutdown();
void shutdown();
void handleCrash();
std::shared_ptr<QueryLog> query_log; /// Used to log queries.
std::shared_ptr<QueryThreadLog> query_thread_log; /// Used to log query threads.
std::shared_ptr<PartLog> part_log; /// Used to log operations with parts
std::shared_ptr<TraceLog> trace_log; /// Used to log traces from query profiler
std::shared_ptr<CrashLog> crash_log; /// Used to log server crashes.
std::shared_ptr<TextLog> text_log; /// Used to log all text messages.
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<ErrorLog> error_log; /// Used to log errors.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> azure_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.
std::shared_ptr<OpenTelemetrySpanLog> opentelemetry_span_log;
/// Used to log queries of materialized and live views
std::shared_ptr<QueryViewsLog> query_views_log;
/// Used to log all actions of ZooKeeper client
std::shared_ptr<ZooKeeperLog> zookeeper_log;
/// Login, LogOut and Login failure events
std::shared_ptr<SessionLog> session_log;
/// Events related to transactions
std::shared_ptr<TransactionsInfoLog> transactions_info_log;
/// Used to log processors profiling
std::shared_ptr<ProcessorsProfileLog> processors_profile_log;
std::shared_ptr<AsynchronousInsertLog> asynchronous_insert_log;
/// Backup and restore events
std::shared_ptr<BackupLog> backup_log;
/// Log blob storage operations
std::shared_ptr<BlobStorageLog> blob_storage_log;
#define DECLARE_PUBLIC_MEMBERS(log_type, member, descr) \
std::shared_ptr<log_type> member; \
std::vector<ISystemLog *> logs;
LIST_OF_ALL_SYSTEM_LOGS(DECLARE_PUBLIC_MEMBERS)
#undef DECLARE_PUBLIC_MEMBERS
private:
std::vector<ISystemLog *> getAllLogs() const;
};
struct SystemLogSettings

View File

@ -8,6 +8,7 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/OffsetStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/QueryPlanVisitor.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
@ -59,9 +60,10 @@ public:
if (typeid_cast<LimitStep *>(current_step)
|| typeid_cast<LimitByStep *>(current_step) /// (1) if there are LIMITs on top of ORDER BY, the ORDER BY is non-removable
|| typeid_cast<FillingStep *>(current_step) /// (2) if ORDER BY is with FILL WITH, it is non-removable
|| typeid_cast<SortingStep *>(current_step) /// (3) ORDER BY will change order of previous sorting
|| typeid_cast<AggregatingStep *>(current_step)) /// (4) aggregation change order
|| typeid_cast<OffsetStep *>(current_step) /// (2) OFFSET on top of ORDER BY, the ORDER BY is non-removable
|| typeid_cast<FillingStep *>(current_step) /// (3) if ORDER BY is with FILL WITH, it is non-removable
|| typeid_cast<SortingStep *>(current_step) /// (4) ORDER BY will change order of previous sorting
|| typeid_cast<AggregatingStep *>(current_step)) /// (5) aggregation change order
{
logStep("nodes_affect_order/push", current_node);
nodes_affect_order.push_back(current_node);

View File

@ -8,13 +8,15 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/ISimpleTransform.h>
#include <QueryPipeline/Pipe.h>
#include <boost/circular_buffer.hpp>
#include <ranges>
namespace DB
{
@ -68,11 +70,17 @@ static void makeFdBlocking(int fd)
static int pollWithTimeout(pollfd * pfds, size_t num, size_t timeout_milliseconds)
{
auto logger = getLogger("TimeoutReadBufferFromFileDescriptor");
auto describe_fd = [](const auto & pollfd) { return fmt::format("(fd={}, flags={})", pollfd.fd, fcntl(pollfd.fd, F_GETFL)); };
int res;
while (true)
{
Stopwatch watch;
LOG_TEST(logger, "Polling descriptors: {}", fmt::join(std::span(pfds, pfds + num) | std::views::transform(describe_fd), ", "));
res = poll(pfds, static_cast<nfds_t>(num), static_cast<int>(timeout_milliseconds));
if (res < 0)
@ -82,7 +90,10 @@ static int pollWithTimeout(pollfd * pfds, size_t num, size_t timeout_millisecond
const auto elapsed = watch.elapsedMilliseconds();
if (timeout_milliseconds <= elapsed)
{
LOG_TEST(logger, "Timeout exceeded: elapsed={}, timeout={}", elapsed, timeout_milliseconds);
break;
}
timeout_milliseconds -= elapsed;
}
else
@ -91,6 +102,12 @@ static int pollWithTimeout(pollfd * pfds, size_t num, size_t timeout_millisecond
}
}
LOG_TEST(
logger,
"Poll for descriptors: {} returned {}",
fmt::join(std::span(pfds, pfds + num) | std::views::transform(describe_fd), ", "),
res);
return res;
}
@ -200,12 +217,6 @@ public:
return true;
}
void reset() const
{
makeFdBlocking(stdout_fd);
makeFdBlocking(stderr_fd);
}
~TimeoutReadBufferFromFileDescriptor() override
{
tryMakeFdBlocking(stdout_fd);

View File

@ -749,8 +749,16 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
/// Probably there is something wrong with files of this part.
/// So it can be helpful to add to the error message some information about those files.
String files_in_part;
for (auto it = getDataPartStorage().iterate(); it->isValid(); it->next())
files_in_part += fmt::format("{}{} ({} bytes)", (files_in_part.empty() ? "" : ", "), it->name(), getDataPartStorage().getFileSize(it->name()));
{
std::string file_info;
if (!getDataPartStorage().isDirectory(it->name()))
file_info = fmt::format(" ({} bytes)", getDataPartStorage().getFileSize(it->name()));
files_in_part += fmt::format("{}{}{}", (files_in_part.empty() ? "" : ", "), it->name(), file_info);
}
if (!files_in_part.empty())
e->addMessage("Part contains files: {}", files_in_part);
if (isEmpty())
@ -2139,7 +2147,27 @@ void IMergeTreeDataPart::checkConsistencyBase() const
}
}
checksums.checkSizes(getDataPartStorage());
const auto & data_part_storage = getDataPartStorage();
for (const auto & [filename, checksum] : checksums.files)
{
try
{
checksum.checkSize(data_part_storage, filename);
}
catch (const Exception & ex)
{
/// For projection parts check will mark them broken in loadProjections
if (!parent_part && filename.ends_with(".proj"))
{
std::string projection_name = fs::path(filename).stem();
LOG_INFO(storage.log, "Projection {} doesn't exist on start for part {}, marking it as broken", projection_name, name);
if (hasProjection(projection_name))
markProjectionPartAsBroken(projection_name, ex.message(), ex.code());
}
else
throw;
}
}
}
else
{

View File

@ -1146,7 +1146,7 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
auto metadata_snapshot = getInMemoryMetadataPtr();
auto virtual_columns_block = getBlockWithVirtualsForFilter(metadata_snapshot, {parts[0]});
auto filter_dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag.getOutputs().at(0), nullptr);
auto filter_dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag.getOutputs().at(0), nullptr, /*allow_partial_result=*/ false);
if (!filter_dag)
return {};
@ -6932,7 +6932,8 @@ Block MergeTreeData::getMinMaxCountProjectionBlock(
const auto * predicate = filter_dag->getOutputs().at(0);
// Generate valid expressions for filtering
VirtualColumnUtils::filterBlockWithPredicate(predicate, virtual_columns_block, query_context);
VirtualColumnUtils::filterBlockWithPredicate(
predicate, virtual_columns_block, query_context, /*allow_filtering_with_partial_predicate =*/true);
rows = virtual_columns_block.rows();
part_name_column = virtual_columns_block.getByName("_part").column;

View File

@ -100,12 +100,6 @@ void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & r
}
}
void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const
{
for (const auto & [name, checksum] : files)
checksum.checkSize(storage, name);
}
UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const
{
UInt64 res = 0;

View File

@ -65,9 +65,6 @@ struct MergeTreeDataPartChecksums
static bool isBadChecksumsErrorCode(int code);
/// Checks that the directory contains all the needed files of the correct size. Does not check the checksum.
void checkSizes(const IDataPartStorage & storage) const;
/// Returns false if the checksum is too old.
bool read(ReadBuffer & in);
/// Assume that header with version (the first line) is read

View File

@ -21,7 +21,7 @@ const char * auto_config_build[]
"BUILD_COMPILE_DEFINITIONS", "@BUILD_COMPILE_DEFINITIONS@",
"USE_EMBEDDED_COMPILER", "@USE_EMBEDDED_COMPILER@",
"USE_GLIBC_COMPATIBILITY", "@GLIBC_COMPATIBILITY@",
"USE_JEMALLOC", "@ENABLE_JEMALLOC@",
"USE_JEMALLOC", "@USE_JEMALLOC@",
"USE_ICU", "@USE_ICU@",
"USE_H3", "@USE_H3@",
"USE_MYSQL", "@USE_MYSQL@",
@ -36,7 +36,7 @@ const char * auto_config_build[]
"USE_SSL", "@USE_SSL@",
"OPENSSL_VERSION", "@OPENSSL_VERSION@",
"OPENSSL_IS_BORING_SSL", "@OPENSSL_IS_BORING_SSL@",
"USE_VECTORSCAN", "@ENABLE_VECTORSCAN@",
"USE_VECTORSCAN", "@USE_VECTORSCAN@",
"USE_SIMDJSON", "@USE_SIMDJSON@",
"USE_ODBC", "@USE_ODBC@",
"USE_GRPC", "@USE_GRPC@",
@ -62,8 +62,8 @@ const char * auto_config_build[]
"USE_ARROW", "@USE_ARROW@",
"USE_ORC", "@USE_ORC@",
"USE_MSGPACK", "@USE_MSGPACK@",
"USE_QPL", "@ENABLE_QPL@",
"USE_QAT", "@ENABLE_QATLIB@",
"USE_QPL", "@USE_QPL@",
"USE_QATLIB", "@USE_QATLIB@",
"GIT_HASH", "@GIT_HASH@",
"GIT_BRANCH", R"IRjaNsZIL9Yh7FQ4(@GIT_BRANCH@)IRjaNsZIL9Yh7FQ4",
"GIT_DATE", "@GIT_DATE@",

View File

@ -1,51 +1,46 @@
#include <algorithm>
#include <Storages/VirtualColumnUtils.h>
#include <memory>
#include <stack>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/FilterDescription.h>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionsLogical.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/indexHint.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/VirtualColumnUtils.h>
#include <IO/WriteHelpers.h>
#include <Common/typeid_cast.h>
#include "Functions/FunctionsLogical.h"
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/ActionsVisitor.h>
namespace DB
@ -281,9 +276,7 @@ bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node)
}
static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
const ActionsDAG::Node * node,
const Block * allowed_inputs,
ActionsDAG::Nodes & additional_nodes)
const ActionsDAG::Node * node, const Block * allowed_inputs, ActionsDAG::Nodes & additional_nodes, bool allow_partial_result)
{
if (node->type == ActionsDAG::ActionType::FUNCTION)
{
@ -292,8 +285,15 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
auto & node_copy = additional_nodes.emplace_back(*node);
node_copy.children.clear();
for (const auto * child : node->children)
if (const auto * child_copy = splitFilterNodeForAllowedInputs(child, allowed_inputs, additional_nodes))
if (const auto * child_copy
= splitFilterNodeForAllowedInputs(child, allowed_inputs, additional_nodes, allow_partial_result))
node_copy.children.push_back(child_copy);
/// Expression like (now_allowed AND allowed) is not allowed if allow_partial_result = true. This is important for
/// trivial count optimization, otherwise we can get incorrect results. For example, if the query is
/// SELECT count() FROM table WHERE _partition_id = '0' AND rowNumberInBlock() = 1, we cannot apply
/// trivial count.
else if (!allow_partial_result)
return nullptr;
if (node_copy.children.empty())
return nullptr;
@ -301,7 +301,7 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
if (node_copy.children.size() == 1)
{
const ActionsDAG::Node * res = node_copy.children.front();
/// Expression like (not_allowed AND 256) can't be resuced to (and(256)) because AND requires
/// Expression like (not_allowed AND 256) can't be reduced to (and(256)) because AND requires
/// at least two arguments; also it can't be reduced to (256) because result type is different.
if (!res->result_type->equals(*node->result_type))
{
@ -319,7 +319,7 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
{
auto & node_copy = additional_nodes.emplace_back(*node);
for (auto & child : node_copy.children)
if (child = splitFilterNodeForAllowedInputs(child, allowed_inputs, additional_nodes); !child)
if (child = splitFilterNodeForAllowedInputs(child, allowed_inputs, additional_nodes, allow_partial_result); !child)
return nullptr;
return &node_copy;
@ -333,7 +333,8 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
auto index_hint_dag = index_hint->getActions().clone();
ActionsDAG::NodeRawConstPtrs atoms;
for (const auto & output : index_hint_dag.getOutputs())
if (const auto * child_copy = splitFilterNodeForAllowedInputs(output, allowed_inputs, additional_nodes))
if (const auto * child_copy
= splitFilterNodeForAllowedInputs(output, allowed_inputs, additional_nodes, allow_partial_result))
atoms.push_back(child_copy);
if (!atoms.empty())
@ -367,22 +368,24 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
return node;
}
std::optional<ActionsDAG> splitFilterDagForAllowedInputs(const ActionsDAG::Node * predicate, const Block * allowed_inputs)
std::optional<ActionsDAG>
splitFilterDagForAllowedInputs(const ActionsDAG::Node * predicate, const Block * allowed_inputs, bool allow_partial_result)
{
if (!predicate)
return {};
ActionsDAG::Nodes additional_nodes;
const auto * res = splitFilterNodeForAllowedInputs(predicate, allowed_inputs, additional_nodes);
const auto * res = splitFilterNodeForAllowedInputs(predicate, allowed_inputs, additional_nodes, allow_partial_result);
if (!res)
return {};
return ActionsDAG::cloneSubDAG({res}, true);
}
void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block, ContextPtr context)
void filterBlockWithPredicate(
const ActionsDAG::Node * predicate, Block & block, ContextPtr context, bool allow_filtering_with_partial_predicate)
{
auto dag = splitFilterDagForAllowedInputs(predicate, &block);
auto dag = splitFilterDagForAllowedInputs(predicate, &block, /*allow_partial_result=*/allow_filtering_with_partial_predicate);
if (dag)
filterBlockWithExpression(buildFilterExpression(std::move(*dag), context), block);
}

View File

@ -26,9 +26,13 @@ namespace VirtualColumnUtils
///
/// Otherwise calling filter*() outside applyFilters() will throw "Not-ready Set is passed"
/// if there are subqueries.
///
/// Similar to filterBlockWithExpression(buildFilterExpression(splitFilterDagForAllowedInputs(...)))./// Similar to filterBlockWithQuery, but uses ActionsDAG as a predicate.
/// Basically it is filterBlockWithDAG(splitFilterDagForAllowedInputs).
/// If allow_filtering_with_partial_predicate is true, then the filtering will be done even if some part of the predicate
/// cannot be evaluated using the columns from the block.
void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block, ContextPtr context, bool allow_filtering_with_partial_predicate = true);
/// Similar to filterBlockWithExpression(buildFilterExpression(splitFilterDagForAllowedInputs(...))).
void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block, ContextPtr context);
/// Just filters block. Block should contain all the required columns.
ExpressionActionsPtr buildFilterExpression(ActionsDAG dag, ContextPtr context);
@ -41,7 +45,15 @@ void buildSetsForDAG(const ActionsDAG & dag, const ContextPtr & context);
bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node);
/// Extract a part of predicate that can be evaluated using only columns from input_names.
std::optional<ActionsDAG> splitFilterDagForAllowedInputs(const ActionsDAG::Node * predicate, const Block * allowed_inputs);
/// When allow_partial_result is false, then the result will be empty if any part of if cannot be evaluated deterministically
/// on the given inputs.
/// allow_partial_result must be false when we are going to use the result to filter parts in
/// MergeTreeData::totalRowsByPartitionPredicateImp. For example, if the query is
/// `SELECT count() FROM table WHERE _partition_id = '0' AND rowNumberInBlock() = 1`
/// The predicate will be `_partition_id = '0' AND rowNumberInBlock() = 1`, and `rowNumberInBlock()` is
/// non-deterministic. If we still extract the part `_partition_id = '0'` for filtering parts, then trivial
/// count optimization will be mistakenly applied to the query.
std::optional<ActionsDAG> splitFilterDagForAllowedInputs(const ActionsDAG::Node * predicate, const Block * allowed_inputs, bool allow_partial_result = true);
/// Extract from the input stream a set of `name` column values
template <typename T>

View File

@ -135,6 +135,12 @@ endif()
if (TARGET ch_contrib::vectorscan)
set(USE_VECTORSCAN 1)
endif()
if (TARGET ch_contrib::qpl)
set(USE_QPL 1)
endif()
if (TARGET ch_contrib::qatlib)
set(USE_QATLIB 1)
endif()
if (TARGET ch_contrib::avrocpp)
set(USE_AVRO 1)
endif()
@ -161,8 +167,8 @@ endif()
if (TARGET ch_contrib::ssh)
set(USE_SSH 1)
endif()
if (TARGET ch_contrib::fiu)
set(FIU_ENABLE 1)
if (TARGET ch_contrib::libfiu)
set(USE_LIBFIU 1)
endif()
if (TARGET ch_contrib::libarchive)
set(USE_LIBARCHIVE 1)

View File

@ -738,7 +738,7 @@ def create_test_html_report(
if test_results:
rows_part = []
num_fails = 0
has_test_time = False
has_test_time = any(tr.time is not None for tr in test_results)
has_log_urls = False
# Display entires with logs at the top (they correspond to failed tests)
@ -770,12 +770,12 @@ def create_test_html_report(
row.append(f'<td {fail_id}style="{style}">{test_result.status}</td>')
colspan += 1
row.append("<td>")
if test_result.time is not None:
has_test_time = True
row.append(str(test_result.time))
row.append("</td>")
colspan += 1
if has_test_time:
if test_result.time is not None:
row.append(f"<td>{test_result.time}</td>")
else:
row.append("<td></td>")
colspan += 1
if test_result.log_urls is not None:
has_log_urls = True

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<clickhouse>
<merge_tree>
<max_suspicious_broken_parts_bytes>0</max_suspicious_broken_parts_bytes>
</merge_tree>
</clickhouse>

View File

@ -4,6 +4,7 @@ import logging
import string
import random
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
@ -18,6 +19,12 @@ def cluster():
stay_alive=True,
with_zookeeper=True,
)
cluster.add_instance(
"node_restart",
main_configs=["config.d/dont_start_broken.xml"],
stay_alive=True,
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
@ -632,6 +639,49 @@ def test_broken_on_start(cluster):
check(node, table_name, 0)
def test_disappeared_projection_on_start(cluster):
node = cluster.instances["node_restart"]
table_name = "test_disapperead_projection"
create_table(node, table_name, 1)
node.query(f"SYSTEM STOP MERGES {table_name}")
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
def drop_projection():
node.query(
f"ALTER TABLE {table_name} DROP PROJECTION proj2",
settings={"mutations_sync": "0"},
)
p = Pool(2)
p.apply_async(drop_projection)
for i in range(30):
create_query = node.query(f"SHOW CREATE TABLE {table_name}")
if "proj2" not in create_query:
break
time.sleep(0.5)
assert "proj2" not in create_query
# Remove 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "part")
node.restart_clickhouse()
# proj2 is not broken, it doesn't exist, but ok
check(node, table_name, 0, expect_broken_part="proj2", do_check_command=0)
def test_mutation_with_broken_projection(cluster):
node = cluster.instances["node"]

View File

@ -176,7 +176,7 @@ def test_query_is_permanent(transaction, permanent, exclusive_table):
select_handler = node.get_query_request(
f"""
SELECT sleepEachRow(3) FROM {exclusive_table} SETTINGS function_sleep_max_microseconds_per_block = 0;
SELECT sleepEachRow(3) FROM {exclusive_table} SETTINGS function_sleep_max_microseconds_per_block = 0, max_threads=1;
""",
query_id=query_id,
)

View File

@ -4,7 +4,7 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from helpers.test_tools import assert_eq_with_retry, assert_logs_contain_with_retry, TSV
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
@ -12,17 +12,6 @@ node = cluster.add_instance(
stay_alive=True,
)
system_logs = [
# enabled by default
("system.text_log", 1),
("system.query_log", 1),
("system.query_thread_log", 1),
("system.part_log", 1),
("system.trace_log", 1),
("system.metric_log", 1),
("system.error_log", 1),
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
@ -33,22 +22,28 @@ def start_cluster():
cluster.shutdown()
@pytest.fixture(scope="function")
def flush_logs():
def test_system_logs_exists():
system_logs = [
("system.text_log", 1),
("system.query_log", 1),
("system.query_thread_log", 1),
("system.part_log", 1),
("system.trace_log", 1),
("system.metric_log", 1),
("system.error_log", 1),
]
node.query("SYSTEM FLUSH LOGS")
@pytest.mark.parametrize("table,exists", system_logs)
def test_system_logs(flush_logs, table, exists):
q = "SELECT * FROM {}".format(table)
if exists:
node.query(q)
else:
response = node.query_and_get_error(q)
assert (
"Table {} does not exist".format(table) in response
or "Unknown table expression identifier '{}'".format(table) in response
)
for table, exists in system_logs:
q = "SELECT * FROM {}".format(table)
if exists:
node.query(q)
else:
response = node.query_and_get_error(q)
assert (
"Table {} does not exist".format(table) in response
or "Unknown table expression identifier '{}'".format(table) in response
)
# Logic is tricky, let's check that there is no hang in case of message queue
@ -67,14 +62,19 @@ def test_system_logs_non_empty_queue():
def test_system_suspend():
node.query("CREATE TABLE t (x DateTime) ENGINE=Memory;")
node.query("INSERT INTO t VALUES (now());")
node.query("SYSTEM SUSPEND FOR 1 SECOND;")
node.query("INSERT INTO t VALUES (now());")
assert "1\n" == node.query("SELECT max(x) - min(x) >= 1 FROM t;")
try:
node.query("CREATE TABLE t (x DateTime) ENGINE=Memory;")
node.query("INSERT INTO t VALUES (now());")
node.query("SYSTEM SUSPEND FOR 1 SECOND;")
node.query("INSERT INTO t VALUES (now());")
assert "1\n" == node.query("SELECT max(x) - min(x) >= 1 FROM t;")
finally:
node.query("DROP TABLE IF EXISTS t;")
def test_log_max_size(start_cluster):
# we do misconfiguration here: buffer_size_rows_flush_threshold > max_size_rows, flush_interval_milliseconds is huge
# no auto flush by size not by time has a chance
node.exec_in_container(
[
"bash",
@ -83,6 +83,7 @@ def test_log_max_size(start_cluster):
<clickhouse>
<query_log>
<flush_interval_milliseconds replace=\\"replace\\">1000000</flush_interval_milliseconds>
<buffer_size_rows_flush_threshold replace=\\"replace\\">1000000</buffer_size_rows_flush_threshold>
<max_size_rows replace=\\"replace\\">10</max_size_rows>
<reserved_size_rows replace=\\"replace\\">10</reserved_size_rows>
</query_log>
@ -91,11 +92,24 @@ def test_log_max_size(start_cluster):
""",
]
)
node.restart_clickhouse()
for i in range(10):
node.query(f"select {i}")
assert node.query("select count() >= 10 from system.query_log") == "1\n"
node.query("SYSTEM FLUSH LOGS")
node.query(f"TRUNCATE TABLE IF EXISTS system.query_log")
node.restart_clickhouse()
# all logs records above max_size_rows are lost
# The accepted logs records are never flushed until system flush logs is called by us
for i in range(21):
node.query(f"select {i}")
node.query("system flush logs")
assert_logs_contain_with_retry(
node, "Queue had been full at 0, accepted 10 logs, ignored 34 logs."
)
assert node.query(
"select count() >= 10, count() < 20 from system.query_log"
) == TSV([[1, 1]])
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/yyy-override-query_log.xml"]
)

View File

@ -33,124 +33,139 @@ def test_system_logs_recreate():
"error_log",
]
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" in node.query(f"SHOW CREATE TABLE system.{table}")
assert "ENGINE = Null" not in node.query(f"SHOW CREATE TABLE system.{table}")
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
try:
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" in node.query(
f"SHOW CREATE TABLE system.{table}"
)
== 1
)
# NOTE: we use zzz- prefix to make it the last file,
# so that it will be applied last.
for table in system_logs:
node.exec_in_container(
[
"bash",
"-c",
f"""echo "
<clickhouse>
<{table}>
<engine>ENGINE = Null</engine>
<partition_by remove='remove'/>
</{table}>
</clickhouse>
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
""",
]
)
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" not in node.query(
f"SHOW CREATE TABLE system.{table}"
)
assert "ENGINE = Null" in node.query(f"SHOW CREATE TABLE system.{table}")
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
assert "ENGINE = Null" not in node.query(
f"SHOW CREATE TABLE system.{table}"
)
== 2
)
# apply only storage_policy for all system tables
for table in system_logs:
node.exec_in_container(
[
"bash",
"-c",
f"""echo "
<clickhouse>
<{table}>
<storage_policy>system_tables</storage_policy>
</{table}>
</clickhouse>
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
""",
]
)
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
import logging
for table in system_logs:
create_table_sql = node.query(f"SHOW CREATE TABLE system.{table} FORMAT TSVRaw")
logging.debug(
"With storage policy, SHOW CREATE TABLE system.%s is: %s",
table,
create_table_sql,
)
assert "ENGINE = MergeTree" in create_table_sql
assert "ENGINE = Null" not in create_table_sql
assert "SETTINGS storage_policy = 'system_tables'" in create_table_sql
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
)
== 1
)
== 3
)
for table in system_logs:
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/zzz-override-{table}.xml"]
)
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" in node.query(f"SHOW CREATE TABLE system.{table}")
assert "ENGINE = Null" not in node.query(f"SHOW CREATE TABLE system.{table}")
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
# NOTE: we use zzz- prefix to make it the last file,
# so that it will be applied last.
for table in system_logs:
node.exec_in_container(
[
"bash",
"-c",
f"""echo "
<clickhouse>
<{table}>
<engine>ENGINE = Null</engine>
<partition_by remove='remove'/>
</{table}>
</clickhouse>
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
""",
]
)
== 4
)
node.query("SYSTEM FLUSH LOGS")
# Ensure that there was no superfluous RENAME's
# IOW that the table created only when the structure is indeed different.
for table in system_logs:
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" not in node.query(
f"SHOW CREATE TABLE system.{table}"
)
== 4
)
assert "ENGINE = Null" in node.query(f"SHOW CREATE TABLE system.{table}")
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
)
== 2
)
# apply only storage_policy for all system tables
for table in system_logs:
node.exec_in_container(
[
"bash",
"-c",
f"""echo "
<clickhouse>
<{table}>
<storage_policy>system_tables</storage_policy>
</{table}>
</clickhouse>
" > /etc/clickhouse-server/config.d/zzz-override-{table}.xml
""",
]
)
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
import logging
for table in system_logs:
create_table_sql = node.query(
f"SHOW CREATE TABLE system.{table} FORMAT TSVRaw"
)
logging.debug(
"With storage policy, SHOW CREATE TABLE system.%s is: %s",
table,
create_table_sql,
)
assert "ENGINE = MergeTree" in create_table_sql
assert "ENGINE = Null" not in create_table_sql
assert "SETTINGS storage_policy = 'system_tables'" in create_table_sql
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
)
== 3
)
for table in system_logs:
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/zzz-override-{table}.xml"]
)
node.restart_clickhouse()
node.query("SYSTEM FLUSH LOGS")
for table in system_logs:
assert "ENGINE = MergeTree" in node.query(
f"SHOW CREATE TABLE system.{table}"
)
assert "ENGINE = Null" not in node.query(
f"SHOW CREATE TABLE system.{table}"
)
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
)
== 4
)
node.query("SYSTEM FLUSH LOGS")
# Ensure that there was no superfluous RENAME's
# IOW that the table created only when the structure is indeed different.
for table in system_logs:
assert (
len(
node.query(f"SHOW TABLES FROM system LIKE '{table}%'")
.strip()
.split("\n")
)
== 4
)
finally:
for table in system_logs:
for syffix in range(3):
node.query(f"DROP TABLE IF EXISTS system.{table}_{syffix} sync")
def test_drop_system_log():
@ -173,11 +188,20 @@ def test_drop_system_log():
node.query("system flush logs")
node.query("select 2")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
assert node.query("select count() >= 2 from system.query_log") == "1\n"
node.query("drop table system.query_log sync")
node.query("select 3")
node.query("system flush logs")
assert node.query("select count() > 0 from system.query_log") == "1\n"
assert node.query("select count() >= 1 from system.query_log") == "1\n"
node.query("drop table system.query_log sync")
node.restart_clickhouse()
node.query("system flush logs")
assert (
node.query("select count() >= 0 from system.query_log") == "1\n"
) # we check that query_log just exists
node.exec_in_container(
["rm", f"/etc/clickhouse-server/config.d/yyy-override-query_log.xml"]
)

View File

@ -0,0 +1,26 @@
<test>
<settings>
<max_insert_threads>4</max_insert_threads>
</settings>
<create_query>
CREATE TABLE t_subcolumns (a Array(UInt64), s Nullable(String), m Map(String, UInt64)) ENGINE = MergeTree ORDER BY tuple()
</create_query>
<fill_query>
INSERT INTO t_subcolumns SELECT range(number % 20), toString(number), mapFromArrays(range(number % 20), range(number % 20)) FROM numbers_mt(50000000)
</fill_query>
<fill_query>
OPTIMIZE TABLE t_subcolumns FINAL
</fill_query>
<query>SELECT count() FROM t_subcolumns WHERE NOT ignore(length(a))</query>
<query>SELECT count() FROM t_subcolumns WHERE notEmpty(a)</query>
<query>SELECT count() FROM t_subcolumns WHERE NOT ignore(length(m))</query>
<query>SELECT count() FROM t_subcolumns WHERE notEmpty(m)</query>
<query>SELECT count() FROM t_subcolumns WHERE isNotNull(s)</query>
<query>SELECT count(s) FROM t_subcolumns</query>
<drop_query>DROP TABLE t_subcolumns</drop_query>
</test>

View File

@ -67,3 +67,7 @@ SELECT toDateTime(hopEnd(toDateTime('2020-01-09 12:00:01', 'US/Samoa'), INTERVAL
2020-01-10 00:00:00
SELECT hopEnd(hop(toDateTime('2019-01-09 12:00:01', 'US/Samoa'), INTERVAL '1' DAY, INTERVAL '3' DAY, 'US/Samoa'));
2019-01-10 00:00:00
SELECT hopStart(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT hopEnd(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT tumbleStart(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT tumbleEnd(tuple()); -- { serverError ILLEGAL_COLUMN }

View File

@ -36,3 +36,8 @@ SELECT hopEnd(toDateTime('2020-01-09 12:00:01', 'US/Samoa'), INTERVAL '1' DAY, I
SELECT toDateTime(hopEnd(toDateTime('2020-01-09 12:00:01', 'US/Samoa'), INTERVAL '1' DAY, INTERVAL '3' DAY, 'US/Samoa'), 'US/Samoa');
SELECT toDateTime(hopEnd(toDateTime('2020-01-09 12:00:01', 'US/Samoa'), INTERVAL '1' DAY, INTERVAL '3' DAY, 'US/Samoa'), 'US/Samoa');
SELECT hopEnd(hop(toDateTime('2019-01-09 12:00:01', 'US/Samoa'), INTERVAL '1' DAY, INTERVAL '3' DAY, 'US/Samoa'));
SELECT hopStart(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT hopEnd(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT tumbleStart(tuple()); -- { serverError ILLEGAL_COLUMN }
SELECT tumbleEnd(tuple()); -- { serverError ILLEGAL_COLUMN }

View File

@ -27,7 +27,7 @@ function wait_until()
function get_buffer_delay()
{
local buffer_insert_id=$1 && shift
query "SYSTEM FLUSH LOGS"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
query "
WITH
(SELECT event_time_microseconds FROM system.query_log WHERE current_database = '$CLICKHOUSE_DATABASE' AND type = 'QueryStart' AND query_id = '$buffer_insert_id') AS begin_,

View File

@ -1,10 +1,16 @@
#!/usr/bin/env bash
# Tags: long
# Tags: long, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
function query()
{
# NOTE: database_atomic_wait_for_drop_and_detach_synchronously needed only for local env, CI has it ON
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&database_atomic_wait_for_drop_and_detach_synchronously=1" -d "$*"
}
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
verify_sql="SELECT
(SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics)
@ -18,13 +24,13 @@ verify()
{
for i in {1..5000}
do
result=$( $CLICKHOUSE_CLIENT --query="$verify_sql" )
result=$( query "$verify_sql" )
[ "$result" = "1" ] && echo "$result" && break
sleep 0.1
if [[ $i -eq 5000 ]]
then
$CLICKHOUSE_CLIENT "
query "
SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics;
SELECT sum(active), sum(NOT active) FROM system.parts;
SELECT sum(active), sum(NOT active) FROM system.projection_parts;
@ -34,17 +40,17 @@ verify()
done
}
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=1 --query="DROP TABLE IF EXISTS test_table"
$CLICKHOUSE_CLIENT --query="CREATE TABLE test_table (data Date) ENGINE = MergeTree PARTITION BY toYear(data) ORDER BY data;"
query "DROP TABLE IF EXISTS test_table"
query "CREATE TABLE test_table (data Date) ENGINE = MergeTree PARTITION BY toYear(data) ORDER BY data;"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_table VALUES ('1992-01-01')"
query "INSERT INTO test_table VALUES ('1992-01-01')"
verify
$CLICKHOUSE_CLIENT --query="INSERT INTO test_table VALUES ('1992-01-02')"
query "INSERT INTO test_table VALUES ('1992-01-02')"
verify
$CLICKHOUSE_CLIENT --query="OPTIMIZE TABLE test_table FINAL"
query "OPTIMIZE TABLE test_table FINAL"
verify
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=1 --query="DROP TABLE test_table"
query "DROP TABLE test_table"
verify

View File

@ -1,4 +1,5 @@
-- Tags: long
-- Tags: long, no-parallel
-- set no-parallel tag is to prevent timeout of this test
drop table if exists t;

View File

@ -1,4 +1,6 @@
Code: 159
0
Code: 159
query_duration 1
0
query_duration 1
Code: 159
0

View File

@ -1,27 +1,23 @@
#!/usr/bin/env bash
# Tags: no-debug
# no-debug: Query is canceled by timeout after max_execution_time,
# but sending an exception to the client may hang
# for more than MAX_PROCESS_WAIT seconds in a slow debug build,
# and test will fail.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
MAX_PROCESS_WAIT=5
IS_SANITIZER=$($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.warnings WHERE message like '%built with sanitizer%'")
if [ "$IS_SANITIZER" -gt 0 ]; then
# Query may hang for more than 5 seconds, especially in tsan build
MAX_PROCESS_WAIT=15
TIMEOUT=5
IS_SANITIZER_OR_DEBUG=$($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.warnings WHERE message like '%built with sanitizer%' or message like '%built in debug mode%'")
if [ "$IS_SANITIZER_OR_DEBUG" -gt 0 ]; then
# Increase the timeout due to in debug/sanitizers build:
# - client is slow
# - stacktrace resolving is slow
TIMEOUT=15
fi
# TCP CLIENT: As of today (02/12/21) uses PullingAsyncPipelineExecutor
### Should be cancelled after 1 second and return a 159 exception (timeout)
timeout -s KILL $MAX_PROCESS_WAIT $CLICKHOUSE_CLIENT --max_execution_time 1 -q \
"SELECT * FROM
query_id=$(random_str 12)
$CLICKHOUSE_CLIENT --query_id "$query_id" --max_execution_time 1 -q "
SELECT * FROM
(
SELECT a.name as n
FROM
@ -34,28 +30,35 @@ timeout -s KILL $MAX_PROCESS_WAIT $CLICKHOUSE_CLIENT --max_execution_time 1 -q \
GROUP BY n
)
LIMIT 20
FORMAT Null" 2>&1 | grep -o "Code: 159" | sort | uniq
FORMAT Null
" 2>&1 | grep -m1 -o "Code: 159"
$CLICKHOUSE_CLIENT -q "system flush logs"
${CLICKHOUSE_CURL} -q -sS "$CLICKHOUSE_URL" -d "select 'query_duration', round(query_duration_ms/1000) from system.query_log where current_database = '$CLICKHOUSE_DATABASE' and query_id = '$query_id' and type != 'QueryStart'"
### Should stop pulling data and return what has been generated already (return code 0)
timeout -s KILL $MAX_PROCESS_WAIT $CLICKHOUSE_CLIENT -q \
"SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name2, number FROM system.numbers LIMIT 2000000
) as b
FORMAT Null
SETTINGS max_execution_time = 1, timeout_overflow_mode = 'break'
"
query_id=$(random_str 12)
$CLICKHOUSE_CLIENT --query_id "$query_id" -q "
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
) AS a,
(
SELECT 'Name' as name2, number FROM system.numbers LIMIT 2000000
) as b
FORMAT Null
SETTINGS max_execution_time = 1, timeout_overflow_mode = 'break'
"
echo $?
$CLICKHOUSE_CLIENT -q "system flush logs"
${CLICKHOUSE_CURL} -q -sS "$CLICKHOUSE_URL" -d "select 'query_duration', round(query_duration_ms/1000) from system.query_log where current_database = '$CLICKHOUSE_DATABASE' and query_id = '$query_id' and type != 'QueryStart'"
# HTTP CLIENT: As of today (02/12/21) uses PullingPipelineExecutor
### Should be cancelled after 1 second and return a 159 exception (timeout)
${CLICKHOUSE_CURL} -q --max-time $MAX_PROCESS_WAIT -sS "$CLICKHOUSE_URL&max_execution_time=1" -d \
"SELECT * FROM
${CLICKHOUSE_CURL} -q --max-time $TIMEOUT -sS "$CLICKHOUSE_URL&max_execution_time=1" -d "
SELECT * FROM
(
SELECT a.name as n
FROM
@ -68,12 +71,13 @@ ${CLICKHOUSE_CURL} -q --max-time $MAX_PROCESS_WAIT -sS "$CLICKHOUSE_URL&max_exec
GROUP BY n
)
LIMIT 20
FORMAT Null" 2>&1 | grep -o "Code: 159" | sort | uniq
FORMAT Null
" 2>&1 | grep -o "Code: 159" | sort | uniq
### Should stop pulling data and return what has been generated already (return code 0)
${CLICKHOUSE_CURL} -q --max-time $MAX_PROCESS_WAIT -sS "$CLICKHOUSE_URL" -d \
"SELECT a.name as n
${CLICKHOUSE_CURL} -q --max-time $TIMEOUT -sS "$CLICKHOUSE_URL" -d "
SELECT a.name as n
FROM
(
SELECT 'Name' as name, number FROM system.numbers LIMIT 2000000
@ -83,5 +87,5 @@ ${CLICKHOUSE_CURL} -q --max-time $MAX_PROCESS_WAIT -sS "$CLICKHOUSE_URL" -d \
) as b
FORMAT Null
SETTINGS max_execution_time = 1, timeout_overflow_mode = 'break'
"
"
echo $?

View File

@ -465,6 +465,37 @@ Expression ((Projection + Before ORDER BY))
ReadFromStorage (SystemOne)
-- execute
Float64 9007199254740994
-- presence of an inner OFFSET retains the ORDER BY
-- query
WITH
t1 AS (
SELECT a, b
FROM
VALUES (
'b UInt32, a Int32',
(1, 1),
(2, 0)
)
)
SELECT
SUM(a)
FROM (
SELECT a, b
FROM t1
ORDER BY 1 DESC, 2
OFFSET 1
) t2
-- explain
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
Offset
Expression (Projection)
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + Before ORDER BY)))
ReadFromStorage (Values)
-- execute
0
-- disable common optimization to avoid functions to be lifted up (liftUpFunctions optimization), needed for testing with stateful function
-- neighbor() as stateful function prevents removing inner ORDER BY since its result depends on order
-- query

View File

@ -302,6 +302,27 @@ FROM
)"
run_query "$query"
echo "-- presence of an inner OFFSET retains the ORDER BY"
query="WITH
t1 AS (
SELECT a, b
FROM
VALUES (
'b UInt32, a Int32',
(1, 1),
(2, 0)
)
)
SELECT
SUM(a)
FROM (
SELECT a, b
FROM t1
ORDER BY 1 DESC, 2
OFFSET 1
) t2"
run_query "$query"
echo "-- disable common optimization to avoid functions to be lifted up (liftUpFunctions optimization), needed for testing with stateful function"
ENABLE_OPTIMIZATION="SET query_plan_enable_optimizations=0;$ENABLE_OPTIMIZATION"
echo "-- neighbor() as stateful function prevents removing inner ORDER BY since its result depends on order"

View File

@ -464,6 +464,36 @@ Expression ((Project names + Projection))
ReadFromStorage (SystemOne)
-- execute
Float64 9007199254740994
-- presence of an inner OFFSET retains the ORDER BY
-- query
WITH
t1 AS (
SELECT a, b
FROM
VALUES (
'b UInt32, a Int32',
(1, 1),
(2, 0)
)
)
SELECT
SUM(a)
FROM (
SELECT a, b
FROM t1
ORDER BY 1 DESC, 2
OFFSET 1
) t2
-- explain
Expression ((Project names + Projection))
Aggregating
Expression ((Before GROUP BY + (Change column names to column identifiers + Project names)))
Offset
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Projection + (Change column names to column identifiers + (Project names + (Projection + Change column names to column identifiers))))))
ReadFromStorage (Values)
-- execute
0
-- disable common optimization to avoid functions to be lifted up (liftUpFunctions optimization), needed for testing with stateful function
-- neighbor() as stateful function prevents removing inner ORDER BY since its result depends on order
-- query

View File

@ -6,14 +6,18 @@ CREATE TABLE test_table_1
(
id UInt64,
value String
) ENGINE=MergeTree ORDER BY id;
) ENGINE=MergeTree ORDER BY id
SETTINGS index_granularity = 16 # We have number of granules in the `EXPLAIN` output in reference file
;
DROP TABLE IF EXISTS test_table_2;
CREATE TABLE test_table_2
(
id UInt64,
value String
) ENGINE=MergeTree ORDER BY id;
) ENGINE=MergeTree ORDER BY id
SETTINGS index_granularity = 16
;
INSERT INTO test_table_1 VALUES (1, 'Value_1'), (2, 'Value_2');
INSERT INTO test_table_2 VALUES (2, 'Value_2'), (3, 'Value_3');

View File

@ -0,0 +1,4 @@
CREATE TABLE t (p UInt8, x UInt64) Engine = MergeTree PARTITION BY p ORDER BY x;
INSERT INTO t SELECT 0, number FROM numbers(10) SETTINGS max_block_size = 100;
SELECT count() FROM t WHERE p = 0 AND rowNumberInAllBlocks() = 1 SETTINGS allow_experimental_analyzer = 0;
SELECT count() FROM t WHERE p = 0 AND rowNumberInAllBlocks() = 1 SETTINGS allow_experimental_analyzer = 1;

View File

@ -0,0 +1,6 @@
Expression ((Project names + Projection))
Aggregating
Expression (Before GROUP BY)
ReadFromMerge
Filter (( + ( + )))
ReadFromMergeTree (default.test_03217_merge_replica_1)

View File

@ -0,0 +1,16 @@
CREATE TABLE test_03217_merge_replica_1(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_03217_merge_replica', 'r1')
ORDER BY x;
CREATE TABLE test_03217_merge_replica_2(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_03217_merge_replica', 'r2')
ORDER BY x;
CREATE TABLE test_03217_all_replicas (x UInt32)
ENGINE = Merge(currentDatabase(), 'test_03217_merge_replica_*');
INSERT INTO test_03217_merge_replica_1 SELECT number AS x FROM numbers(10);
SYSTEM SYNC REPLICA test_03217_merge_replica_2;
-- If the filter on _table is not applied, then the plan will show both replicas
EXPLAIN SELECT _table, count() FROM test_03217_all_replicas WHERE _table = 'test_03217_merge_replica_1' AND x >= 0 GROUP BY _table SETTINGS allow_experimental_analyzer=1;

View File

@ -0,0 +1,6 @@
information_schema tables
both default test_03217_system_tables_replica_1 r1
both default test_03217_system_tables_replica_2 r2
default test_03217_system_tables_replica_1 r1
1
1

View File

@ -0,0 +1,30 @@
-- If filtering is not done correctly on databases, then this query report to read 3 rows, which are: `system.tables`, `information_schema.tables` and `INFORMATION_SCHEMA.tables`
SELECT database, table FROM system.tables WHERE database = 'information_schema' AND table = 'tables';
CREATE TABLE test_03217_system_tables_replica_1(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_03217_system_tables_replica', 'r1')
ORDER BY x;
CREATE TABLE test_03217_system_tables_replica_2(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_03217_system_tables_replica', 'r2')
ORDER BY x;
-- Make sure we can read both replicas
-- The replica name might be altered because of `_functional_tests_helper_database_replicated_replace_args_macros`,
-- thus we need to use `left`
SELECT 'both', database, table, left(replica_name, 2) FROM system.replicas WHERE database = currentDatabase();
-- If filtering is not done correctly on database-table column, then this query report to read 2 rows, which are the above tables
SELECT database, table, left(replica_name, 2) FROM system.replicas WHERE database = currentDatabase() AND table = 'test_03217_system_tables_replica_1' AND replica_name LIKE 'r1%';
SYSTEM FLUSH LOGS;
-- argMax is necessary to make the test repeatable
-- StorageSystemTables
SELECT argMax(read_rows, event_time_microseconds) FROM system.query_log WHERE 1
AND current_database = currentDatabase()
AND query LIKE '%SELECT database, table FROM system.tables WHERE database = \'information_schema\' AND table = \'tables\';'
AND type = 'QueryFinish';
-- StorageSystemReplicas
SELECT argMax(read_rows, event_time_microseconds) FROM system.query_log WHERE 1
AND current_database = currentDatabase()
AND query LIKE '%SELECT database, table, left(replica_name, 2) FROM system.replicas WHERE database = currentDatabase() AND table = \'test_03217_system_tables_replica_1\' AND replica_name LIKE \'r1\%\';'
AND type = 'QueryFinish';

View File

@ -0,0 +1 @@
[(NULL,'11\01111111\011111','1111')] -2147483648 \N

View File

@ -0,0 +1,23 @@
SET enable_analyzer = 1;
SELECT
materialize([(NULL, '11\01111111\011111', '1111')]) AS t,
(t[1048576]).2,
materialize(-2147483648),
(t[-2147483648]).1
GROUP BY
materialize([(NULL, '1')]),
'',
(materialize((t[1023]).2), (materialize(''), (t[2147483647]).1, materialize(9223372036854775807)), (materialize(''), materialize(NULL, 2147483647, t[65535], 256)), materialize(NULL))
; -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH}
SELECT
materialize([(NULL, '11\01111111\011111', '1111')]) AS t,
(t[1048576]).2,
materialize(-2147483648),
(t[-2147483648]).1
GROUP BY
materialize([(NULL, '1')]),
'',
(materialize((t[1023]).2), (materialize(''), (t[2147483647]).1, materialize(9223372036854775807)), (materialize(''), materialize(NULL), materialize(2147483647), materialize(t[65535]), materialize(256)), materialize(NULL))
;