Merge remote-tracking branch 'origin/master' into pr-local-plan

This commit is contained in:
Igor Nikonov 2024-07-30 16:27:50 +00:00
commit 7d443c6b97
80 changed files with 3839 additions and 971 deletions

3
.gitmodules vendored
View File

@ -372,3 +372,6 @@
[submodule "contrib/double-conversion"]
path = contrib/double-conversion
url = https://github.com/ClickHouse/double-conversion.git
[submodule "contrib/numactl"]
path = contrib/numactl
url = https://github.com/ClickHouse/numactl.git

View File

@ -32,6 +32,7 @@ set (SRCS
StringRef.cpp
safeExit.cpp
throwError.cpp
Numa.cpp
)
add_library (common ${SRCS})
@ -46,6 +47,10 @@ if (TARGET ch_contrib::crc32_s390x)
target_link_libraries(common PUBLIC ch_contrib::crc32_s390x)
endif()
if (TARGET ch_contrib::numactl)
target_link_libraries(common PUBLIC ch_contrib::numactl)
endif()
target_include_directories(common PUBLIC .. "${CMAKE_CURRENT_BINARY_DIR}/..")
target_link_libraries (common

37
base/base/Numa.cpp Normal file
View File

@ -0,0 +1,37 @@
#include <base/Numa.h>
#include "config.h"
#if USE_NUMACTL
# include <numa.h>
#endif
namespace DB
{
std::optional<size_t> getNumaNodesTotalMemory()
{
std::optional<size_t> total_memory;
#if USE_NUMACTL
if (numa_available() != -1)
{
auto * membind = numa_get_membind();
if (!numa_bitmask_equal(membind, numa_all_nodes_ptr))
{
total_memory.emplace(0);
auto max_node = numa_max_node();
for (int i = 0; i <= max_node; ++i)
{
if (numa_bitmask_isbitset(membind, i))
*total_memory += numa_node_size(i, nullptr);
}
}
numa_bitmask_free(membind);
}
#endif
return total_memory;
}
}

12
base/base/Numa.h Normal file
View File

@ -0,0 +1,12 @@
#pragma once
#include <optional>
namespace DB
{
/// return total memory of NUMA nodes the process is bound to
/// if NUMA is not supported or process can use all nodes, std::nullopt is returned
std::optional<size_t> getNumaNodesTotalMemory();
}

View File

@ -2,15 +2,14 @@
#include <base/cgroupsv2.h>
#include <base/getPageSize.h>
#include <base/Numa.h>
#include <fstream>
#include <stdexcept>
#include <unistd.h>
#include <sys/types.h>
#include <sys/param.h>
namespace
{
@ -63,6 +62,9 @@ uint64_t getMemoryAmountOrZero()
uint64_t memory_amount = num_pages * page_size;
if (auto total_numa_memory = DB::getNumaNodesTotalMemory(); total_numa_memory.has_value())
memory_amount = *total_numa_memory;
/// Respect the memory limit set by cgroups v2.
auto limit_v2 = getCgroupsV2MemoryLimit();
if (limit_v2.has_value() && *limit_v2 < memory_amount)

View File

@ -261,6 +261,11 @@ namespace Util
///
/// Throws a NullPointerException if no Application instance exists.
static Application * instanceRawPtr();
/// Returns a raw pointer to the Application singleton.
///
/// The caller should check whether the result is nullptr.
const Poco::Timestamp & startTime() const;
/// Returns the application start time (UTC).
@ -448,6 +453,12 @@ namespace Util
}
inline Application * Application::instanceRawPtr()
{
return _pInstance;
}
inline const Poco::Timestamp & Application::startTime() const
{
return _startTime;

View File

@ -230,6 +230,8 @@ add_contrib (libssh-cmake libssh)
add_contrib (prometheus-protobufs-cmake prometheus-protobufs prometheus-protobufs-gogo)
add_contrib(numactl-cmake numactl)
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear
# in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually,

1
contrib/numactl vendored Submodule

@ -0,0 +1 @@
Subproject commit 8d13d63a05f0c3cd88bf777cbb61541202b7da08

View File

@ -0,0 +1,30 @@
if (NOT (
OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64 OR ARCH_LOONGARCH64))
)
if (ENABLE_NUMACTL)
message (${RECONFIGURE_MESSAGE_LEVEL}
"numactl is disabled implicitly because the OS or architecture is not supported. Use -DENABLE_NUMACTL=0")
endif ()
set (ENABLE_NUMACTL OFF)
else()
option (ENABLE_NUMACTL "Enable numactl" ${ENABLE_LIBRARIES})
endif()
if (NOT ENABLE_NUMACTL)
message (STATUS "Not using numactl")
return()
endif ()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/numactl")
set (SRCS
"${LIBRARY_DIR}/libnuma.c"
"${LIBRARY_DIR}/syscall.c"
)
add_library(_numactl ${SRCS})
target_include_directories(_numactl SYSTEM PRIVATE include)
target_include_directories(_numactl SYSTEM PUBLIC "${LIBRARY_DIR}")
add_library(ch_contrib::numactl ALIAS _numactl)

View File

@ -0,0 +1,82 @@
/* config.h. Generated from config.h.in by configure. */
/* config.h.in. Generated from configure.ac by autoheader. */
/* Checking for symver attribute */
#define HAVE_ATTRIBUTE_SYMVER 0
/* Define to 1 if you have the <dlfcn.h> header file. */
#define HAVE_DLFCN_H 1
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H 1
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H 1
/* Define to 1 if you have the <stdio.h> header file. */
#define HAVE_STDIO_H 1
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H 1
/* Define to 1 if you have the <strings.h> header file. */
#define HAVE_STRINGS_H 1
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H 1
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H 1
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H 1
/* Define to 1 if you have the <unistd.h> header file. */
#define HAVE_UNISTD_H 1
/* Define to the sub-directory where libtool stores uninstalled libraries. */
#define LT_OBJDIR ".libs/"
/* Name of package */
#define PACKAGE "numactl"
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT ""
/* Define to the full name of this package. */
#define PACKAGE_NAME "numactl"
/* Define to the full name and version of this package. */
#define PACKAGE_STRING "numactl 2.1"
/* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME "numactl"
/* Define to the home page for this package. */
#define PACKAGE_URL ""
/* Define to the version of this package. */
#define PACKAGE_VERSION "2.1"
/* Define to 1 if all of the C89 standard headers exist (not just the ones
required in a freestanding environment). This macro is provided for
backward compatibility; new code need not use it. */
#define STDC_HEADERS 1
/* If the compiler supports a TLS storage class define it to that here */
#define TLS __thread
/* Version number of package */
#define VERSION "2.1"
/* Number of bits in a file offset, on hosts where this is settable. */
/* #undef _FILE_OFFSET_BITS */
/* Define to 1 on platforms where this makes off_t a 64-bit type. */
/* #undef _LARGE_FILES */
/* Number of bits in time_t, on hosts where this is settable. */
/* #undef _TIME_BITS */
/* Define to 1 on platforms where this makes time_t a 64-bit type. */
/* #undef __MINGW_USE_VC2005_COMPAT */

View File

@ -13,6 +13,7 @@ entry="/usr/share/clickhouse-test/performance/scripts/entrypoint.sh"
# https://www.kernel.org/doc/Documentation/filesystems/tmpfs.txt
# Double-escaped backslashes are a tribute to the engineering wonder of docker --
# it gives '/bin/sh: 1: [bash,: not found' otherwise.
numactl --hardware
node=$(( RANDOM % $(numactl --hardware | sed -n 's/^.*available:\(.*\)nodes.*$/\1/p') ));
echo Will bind to NUMA node $node;
numactl --cpunodebind=$node --membind=$node $entry

View File

@ -5608,3 +5608,15 @@ Default value: `10000000`.
Minimal size of block to compress in CROSS JOIN. Zero value means - disable this threshold. This block is compressed when any of the two thresholds (by rows or by bytes) are reached.
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`

View File

@ -4922,13 +4922,13 @@ This function is the opposite operation of function [formatDateTime](../function
**Syntax**
``` sql
parseDateTime(str, format[, timezone])
parseDateTime(str[, format[, timezone]])
```
**Arguments**
- `str`the String to be parsed
- `format`the format string
- `str`The String to be parsed
- `format`The format string. Optional. `%Y-%m-%d %H:%i:%s` if not specified.
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**
@ -4971,13 +4971,13 @@ This function is the opposite operation of function [formatDateTimeInJodaSyntax]
**Syntax**
``` sql
parseDateTimeInJodaSyntax(str, format[, timezone])
parseDateTimeInJodaSyntax(str[, format[, timezone]])
```
**Arguments**
- `str`the String to be parsed
- `format`the format string
- `str`The String to be parsed
- `format`The format string. Optional. `yyyy-MM-dd HH:mm:ss` if not specified.
- `timezone` — [Timezone](/docs/en/operations/server-configuration-parameters/settings.md/#server_configuration_parameters-timezone). Optional.
**Returned value(s)**

View File

@ -1,14 +1,16 @@
#pragma once
#include <Client/ClientBase.h>
#include <Client/ClientApplicationBase.h>
namespace DB
{
class Client : public ClientBase
class Client : public ClientApplicationBase
{
public:
using Arguments = ClientApplicationBase::Arguments;
Client() = default;
void initialize(Poco::Util::Application & self) override;

View File

@ -19,6 +19,7 @@
#include <base/getMemoryAmount.h>
#include <base/scope_guard.h>
#include <base/safeExit.h>
#include <base/Numa.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/TCPServerParams.h>
#include <Poco/Net/TCPServer.h>
@ -311,6 +312,12 @@ try
MainThreadStatus::getInstance();
if (auto total_numa_memory = getNumaNodesTotalMemory(); total_numa_memory.has_value())
{
LOG_INFO(
log, "Keeper is bound to a subset of NUMA nodes. Total memory of all available nodes: {}", ReadableSize(*total_numa_memory));
}
#if !defined(NDEBUG) || !defined(__OPTIMIZE__)
LOG_WARNING(log, "Keeper was built in debug mode. It will work slowly.");
#endif

View File

@ -1,6 +1,6 @@
#pragma once
#include <Client/ClientBase.h>
#include <Client/ClientApplicationBase.h>
#include <Client/LocalConnection.h>
#include <Core/ServerSettings.h>
@ -21,7 +21,7 @@ namespace DB
/// Lightweight Application for clickhouse-local
/// No networking, no extra configs and working directories, no pid and status files, no dictionaries, no logging.
/// Quiet mode by default
class LocalServer : public ClientBase, public Loggers
class LocalServer : public ClientApplicationBase, public Loggers
{
public:
LocalServer() = default;

View File

@ -22,6 +22,7 @@
#include <base/coverage.h>
#include <base/getFQDNOrHostName.h>
#include <base/safeExit.h>
#include <base/Numa.h>
#include <Common/PoolId.h>
#include <Common/MemoryTracker.h>
#include <Common/ClickHouseRevision.h>
@ -140,6 +141,7 @@
# include <azure/core/diagnostics/logger.hpp>
#endif
#include <incbin.h>
/// A minimal file used when the server is run without installation
INCBIN(resource_embedded_xml, SOURCE_DIR "/programs/server/embedded.xml");
@ -754,6 +756,12 @@ try
setenv("OPENSSL_CONF", config_dir.c_str(), true); /// NOLINT
}
if (auto total_numa_memory = getNumaNodesTotalMemory(); total_numa_memory.has_value())
{
LOG_INFO(
log, "ClickHouse is bound to a subset of NUMA nodes. Total memory of all available nodes: {}", ReadableSize(*total_numa_memory));
}
registerInterpreters();
registerFunctions();
registerAggregateFunctions();
@ -1582,6 +1590,8 @@ try
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);
global_context->setDashboardsConfig(config);
if (global_context->isServerCompletelyStarted())
{
/// It does not make sense to reload anything before server has started.

View File

@ -1312,6 +1312,31 @@
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
<!-- Configure system.dashboards for dashboard.html.
Could have any query parameters, for which there will be an input on the page.
For instance an example from comments have the following:
- seconds
- rounding
NOTE: All default dashboards will be overwritten if it was set here. -->
<!-- Here is an example without merge() function, to make it work with readonly user -->
<!--
<dashboards>
<dashboard>
<dashboard>Overview</dashboard>
<title>Queries/second</title>
<query>
SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(ProfileEvent_Query)
FROM system.metric_log
WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32}
GROUP BY t
ORDER BY t WITH FILL STEP {rounding:UInt32}
</query>
</dashboard>
</dashboards>
-->
<!-- <top_level_domains_path>/var/lib/clickhouse/top_level_domains/</top_level_domains_path> -->
<!-- Custom TLD lists.
Format: <name>/path/to/file</name>

View File

@ -0,0 +1,395 @@
#include <Client/ClientApplicationBase.h>
#include <base/argsToConfig.h>
#include <base/safeExit.h>
#include <Core/BaseSettingsProgramOptions.h>
#include <Common/clearPasswordFromCommandLine.h>
#include <Common/TerminalSize.h>
#include <Common/Exception.h>
#include <Common/SignalHandlers.h>
#include <Common/config_version.h>
#include "config.h"
#include <unordered_set>
#include <string>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
using namespace std::literals;
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int CANNOT_SET_SIGNAL_HANDLER;
}
static ClientInfo::QueryKind parseQueryKind(const String & query_kind)
{
if (query_kind == "initial_query")
return ClientInfo::QueryKind::INITIAL_QUERY;
if (query_kind == "secondary_query")
return ClientInfo::QueryKind::SECONDARY_QUERY;
if (query_kind == "no_query")
return ClientInfo::QueryKind::NO_QUERY;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown query kind {}", query_kind);
}
/// This signal handler is set only for SIGINT and SIGQUIT.
void interruptSignalHandler(int signum)
{
/// Signal handler might be called even before the setup is fully finished
/// and client application started to process the query.
/// Because of that we have to manually check it.
if (auto * instance = ClientApplicationBase::instanceRawPtr(); instance)
if (auto * base = dynamic_cast<ClientApplicationBase *>(instance); base)
if (base->tryStopQuery())
safeExit(128 + signum);
}
ClientApplicationBase::~ClientApplicationBase()
{
try
{
writeSignalIDtoSignalPipe(SignalListener::StopThread);
signal_listener_thread.join();
HandledSignals::instance().reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
ClientApplicationBase::ClientApplicationBase() : ClientBase(STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO, std::cin, std::cout, std::cerr) {}
ClientApplicationBase & ClientApplicationBase::getInstance()
{
return dynamic_cast<ClientApplicationBase&>(Poco::Util::Application::instance());
}
void ClientApplicationBase::setupSignalHandler()
{
ClientApplicationBase::getInstance().stopQuery();
struct sigaction new_act;
memset(&new_act, 0, sizeof(new_act));
new_act.sa_handler = interruptSignalHandler;
new_act.sa_flags = 0;
#if defined(OS_DARWIN)
sigemptyset(&new_act.sa_mask);
#else
if (sigemptyset(&new_act.sa_mask))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
#endif
if (sigaction(SIGINT, &new_act, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
if (sigaction(SIGQUIT, &new_act, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
}
void ClientApplicationBase::addMultiquery(std::string_view query, Arguments & common_arguments) const
{
common_arguments.emplace_back("--multiquery");
common_arguments.emplace_back("-q");
common_arguments.emplace_back(query);
}
Poco::Util::LayeredConfiguration & ClientApplicationBase::getClientConfiguration()
{
return config();
}
void ClientApplicationBase::init(int argc, char ** argv)
{
namespace po = boost::program_options;
/// Don't parse options with Poco library, we prefer neat boost::program_options.
stopOptionsProcessing();
stdin_is_a_tty = isatty(STDIN_FILENO);
stdout_is_a_tty = isatty(STDOUT_FILENO);
stderr_is_a_tty = isatty(STDERR_FILENO);
terminal_width = getTerminalWidth();
std::vector<Arguments> external_tables_arguments;
Arguments common_arguments = {""}; /// 0th argument is ignored.
std::vector<Arguments> hosts_and_ports_arguments;
if (argc)
argv0 = argv[0];
readArguments(argc, argv, common_arguments, external_tables_arguments, hosts_and_ports_arguments);
/// Support for Unicode dashes
/// Interpret Unicode dashes as default double-hyphen
for (auto & arg : common_arguments)
{
// replace em-dash(U+2014)
boost::replace_all(arg, "", "--");
// replace en-dash(U+2013)
boost::replace_all(arg, "", "--");
// replace mathematical minus(U+2212)
boost::replace_all(arg, "", "--");
}
OptionsDescription options_description;
options_description.main_description.emplace(createOptionsDescription("Main options", terminal_width));
/// Common options for clickhouse-client and clickhouse-local.
options_description.main_description->add_options()
("help", "print usage summary, combine with --verbose to display all options")
("verbose", "print query and other debugging info")
("version,V", "print version information and exit")
("version-clean", "print version in machine-readable format and exit")
("config-file,C", po::value<std::string>(), "config-file path")
("query,q", po::value<std::vector<std::string>>()->multitoken(), R"(Query. Can be specified multiple times (--query "SELECT 1" --query "SELECT 2") or once with multiple comma-separated queries (--query "SELECT 1; SELECT 2;"). In the latter case, INSERT queries with non-VALUE format must be separated by empty lines.)")
("queries-file", po::value<std::vector<std::string>>()->multitoken(), "file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("multiquery,n", "Obsolete, does nothing")
("multiline,m", "If specified, allow multiline queries (do not send the query on Enter)")
("database,d", po::value<std::string>(), "database")
("query_kind", po::value<std::string>()->default_value("initial_query"), "One of initial_query/secondary_query/no_query")
("query_id", po::value<std::string>(), "query_id")
("history_file", po::value<std::string>(), "path to history file")
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("progress", po::value<ProgressOption>()->implicit_value(ProgressOption::TTY, "tty")->default_value(ProgressOption::DEFAULT, "default"), "Print progress of queries execution - to TTY: tty|on|1|true|yes; to STDERR non-interactive mode: err; OFF: off|0|false|no; DEFAULT - interactive to TTY, non-interactive is off")
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("wait_for_suggestions_to_load", "Load suggestion data synchonously.")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
("memory-usage", po::value<std::string>()->implicit_value("default")->default_value("none"), "print memory usage to stderr in non-interactive mode (for benchmarks). Values: 'none', 'default', 'readable'")
("echo", "in batch mode, print query before execution")
("log-level", po::value<std::string>(), "log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
("suggestion_limit", po::value<int>()->default_value(10000), "Suggestion limit for how many databases, tables and columns to fetch.")
("format,f", po::value<std::string>(), "default output format (and input format for clickhouse-local)")
("output-format", po::value<std::string>(), "default output format (this option has preference over --format)")
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
("highlight", po::value<bool>()->default_value(true), "enable or disable basic syntax highlight in interactive command line")
("ignore-error", "do not stop processing when an error occurs")
("stacktrace", "print stack traces of exceptions")
("hardware-utilization", "print hardware utilization information in progress bar")
("print-profile-events", po::value(&profile_events.print)->zero_tokens(), "Printing ProfileEvents packets")
("profile-events-delay-ms", po::value<UInt64>()->default_value(profile_events.delay_ms), "Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet)")
("processed-rows", "print the number of locally processed rows")
("interactive", "Process queries-file or --query query and start interactive mode")
("pager", po::value<std::string>(), "Pipe all output into this command (less or similar)")
("max_memory_usage_in_client", po::value<std::string>(), "Set memory limit in client/local server")
("fuzzer-args", po::value<std::string>(), "Command line arguments for the LLVM's libFuzzer driver. Only relevant if the application is compiled with libFuzzer.")
("client_logs_file", po::value<std::string>(), "Path to a file for writing client logs. Currently we only have fatal logs (when the client crashes)")
;
addOptions(options_description);
OptionsDescription options_description_non_verbose = options_description;
auto getter = [](const auto & op)
{
String op_long_name = op->long_name();
return "--" + String(op_long_name);
};
if (options_description.main_description)
{
const auto & main_options = options_description.main_description->options();
std::transform(main_options.begin(), main_options.end(), std::back_inserter(cmd_options), getter);
}
if (options_description.external_description)
{
const auto & external_options = options_description.external_description->options();
std::transform(external_options.begin(), external_options.end(), std::back_inserter(cmd_options), getter);
}
po::variables_map options;
parseAndCheckOptions(options_description, options, common_arguments);
po::notify(options);
if (options.count("version") || options.count("V"))
{
showClientVersion();
exit(0); // NOLINT(concurrency-mt-unsafe)
}
if (options.count("version-clean"))
{
output_stream << VERSION_STRING;
exit(0); // NOLINT(concurrency-mt-unsafe)
}
if (options.count("verbose"))
getClientConfiguration().setBool("verbose", true);
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
{
if (getClientConfiguration().getBool("verbose", false))
printHelpMessage(options_description, true);
else
printHelpMessage(options_description_non_verbose, false);
exit(0); // NOLINT(concurrency-mt-unsafe)
}
/// Common options for clickhouse-client and clickhouse-local.
/// Output execution time to stderr in batch mode.
if (options.count("time"))
getClientConfiguration().setBool("print-time-to-stderr", true);
if (options.count("memory-usage"))
{
const auto & memory_usage_mode = options["memory-usage"].as<std::string>();
if (memory_usage_mode != "none" && memory_usage_mode != "default" && memory_usage_mode != "readable")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown memory-usage mode: {}", memory_usage_mode);
getClientConfiguration().setString("print-memory-to-stderr", memory_usage_mode);
}
if (options.count("query"))
queries = options["query"].as<std::vector<std::string>>();
if (options.count("query_id"))
getClientConfiguration().setString("query_id", options["query_id"].as<std::string>());
if (options.count("database"))
getClientConfiguration().setString("database", options["database"].as<std::string>());
if (options.count("config-file"))
getClientConfiguration().setString("config-file", options["config-file"].as<std::string>());
if (options.count("queries-file"))
queries_files = options["queries-file"].as<std::vector<std::string>>();
if (options.count("multiline"))
getClientConfiguration().setBool("multiline", true);
if (options.count("ignore-error"))
getClientConfiguration().setBool("ignore-error", true);
if (options.count("format"))
getClientConfiguration().setString("format", options["format"].as<std::string>());
if (options.count("output-format"))
getClientConfiguration().setString("output-format", options["output-format"].as<std::string>());
if (options.count("vertical"))
getClientConfiguration().setBool("vertical", true);
if (options.count("stacktrace"))
getClientConfiguration().setBool("stacktrace", true);
if (options.count("print-profile-events"))
getClientConfiguration().setBool("print-profile-events", true);
if (options.count("profile-events-delay-ms"))
getClientConfiguration().setUInt64("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
/// Whether to print the number of processed rows at
if (options.count("processed-rows"))
getClientConfiguration().setBool("print-num-processed-rows", true);
if (options.count("progress"))
{
switch (options["progress"].as<ProgressOption>())
{
case DEFAULT:
getClientConfiguration().setString("progress", "default");
break;
case OFF:
getClientConfiguration().setString("progress", "off");
break;
case TTY:
getClientConfiguration().setString("progress", "tty");
break;
case ERR:
getClientConfiguration().setString("progress", "err");
break;
}
}
if (options.count("echo"))
getClientConfiguration().setBool("echo", true);
if (options.count("disable_suggestion"))
getClientConfiguration().setBool("disable_suggestion", true);
if (options.count("wait_for_suggestions_to_load"))
getClientConfiguration().setBool("wait_for_suggestions_to_load", true);
if (options.count("suggestion_limit"))
getClientConfiguration().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
if (options.count("highlight"))
getClientConfiguration().setBool("highlight", options["highlight"].as<bool>());
if (options.count("history_file"))
getClientConfiguration().setString("history_file", options["history_file"].as<std::string>());
if (options.count("interactive"))
getClientConfiguration().setBool("interactive", true);
if (options.count("pager"))
getClientConfiguration().setString("pager", options["pager"].as<std::string>());
if (options.count("log-level"))
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
query_kind = parseQueryKind(options["query_kind"].as<std::string>());
profile_events.print = options.count("print-profile-events");
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
processOptions(options_description, options, external_tables_arguments, hosts_and_ports_arguments);
{
std::unordered_set<std::string> alias_names;
alias_names.reserve(options_description.main_description->options().size());
for (const auto& option : options_description.main_description->options())
alias_names.insert(option->long_name());
argsToConfig(common_arguments, getClientConfiguration(), 100, &alias_names);
}
clearPasswordFromCommandLine(argc, argv);
/// Limit on total memory usage
std::string max_client_memory_usage = getClientConfiguration().getString("max_memory_usage_in_client", "0" /*default value*/);
if (max_client_memory_usage != "0")
{
UInt64 max_client_memory_usage_int = parseWithSizeSuffix<UInt64>(max_client_memory_usage.c_str(), max_client_memory_usage.length());
total_memory_tracker.setHardLimit(max_client_memory_usage_int);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
/// Print stacktrace in case of crash
HandledSignals::instance().setupTerminateHandler();
HandledSignals::instance().setupCommonDeadlySignalHandlers();
/// We don't setup signal handlers for SIGINT, SIGQUIT, SIGTERM because we don't
/// have an option for client to shutdown gracefully.
fatal_channel_ptr = new Poco::SplitterChannel;
fatal_console_channel_ptr = new Poco::ConsoleChannel;
fatal_channel_ptr->addChannel(fatal_console_channel_ptr);
if (options.count("client_logs_file"))
{
fatal_file_channel_ptr = new Poco::SimpleFileChannel(options["client_logs_file"].as<std::string>());
fatal_channel_ptr->addChannel(fatal_file_channel_ptr);
}
fatal_log = createLogger("ClientBase", fatal_channel_ptr.get(), Poco::Message::PRIO_FATAL);
signal_listener = std::make_unique<SignalListener>(nullptr, fatal_log);
signal_listener_thread.start(*signal_listener);
#if USE_GWP_ASAN
GWPAsan::initFinished();
#endif
}
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Poco/Util/Application.h>
#include <Client/ClientBase.h>
#include <Client/Suggest.h>
#include <Common/NamePrompter.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Poco/SplitterChannel.h>
#include <boost/program_options.hpp>
#include <vector>
namespace po = boost::program_options;
namespace DB
{
void interruptSignalHandler(int signum);
/**
* The base class for client appliucations such as
* clickhouse-client or clickhouse-local.
* The main purpose and responsibility of it is dealing with
* application-specific stuff such as command line arguments parsing
* and setting up signal handlers, so queries will be cancelled after
* Ctrl+C is pressed.
*/
class ClientApplicationBase : public ClientBase, public Poco::Util::Application, public IHints<2>
{
public:
using ClientBase::processOptions;
using Arguments = ClientBase::Arguments;
static ClientApplicationBase & getInstance();
ClientApplicationBase();
~ClientApplicationBase() override;
void init(int argc, char ** argv);
std::vector<String> getAllRegisteredNames() const override { return cmd_options; }
protected:
Poco::Util::LayeredConfiguration & getClientConfiguration() override;
void setupSignalHandler() override;
void addMultiquery(std::string_view query, Arguments & common_arguments) const;
private:
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
std::vector<String> cmd_options;
LoggerPtr fatal_log;
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_file_channel_ptr;
Poco::Thread signal_listener_thread;
std::unique_ptr<Poco::Runnable> signal_listener;
};
}

View File

@ -5,7 +5,6 @@
#include <Client/InternalTextLogs.h>
#include <Client/TestTags.h>
#include <base/argsToConfig.h>
#include <base/safeExit.h>
#include <Core/Block.h>
#include <Core/BaseSettingsProgramOptions.h>
@ -17,7 +16,6 @@
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/typeid_cast.h>
#include <Common/TerminalSize.h>
#include <Common/clearPasswordFromCommandLine.h>
#include <Common/StringUtils.h>
#include <Common/filesystemHelpers.h>
#include <Common/NetException.h>
@ -90,12 +88,6 @@
namespace fs = std::filesystem;
using namespace std::literals;
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
}
namespace DB
{
@ -158,17 +150,6 @@ std::istream& operator>> (std::istream & in, ProgressOption & progress)
return in;
}
static ClientInfo::QueryKind parseQueryKind(const String & query_kind)
{
if (query_kind == "initial_query")
return ClientInfo::QueryKind::INITIAL_QUERY;
if (query_kind == "secondary_query")
return ClientInfo::QueryKind::SECONDARY_QUERY;
if (query_kind == "no_query")
return ClientInfo::QueryKind::NO_QUERY;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown query kind {}", query_kind);
}
static void incrementProfileEventsBlock(Block & dst, const Block & src)
{
if (!dst)
@ -269,36 +250,6 @@ static void incrementProfileEventsBlock(Block & dst, const Block & src)
dst.setColumns(std::move(mutable_columns));
}
std::atomic<Int32> exit_after_signals = 0;
class QueryInterruptHandler : private boost::noncopyable
{
public:
/// Store how much interrupt signals can be before stopping the query
/// by default stop after the first interrupt signal.
static void start(Int32 signals_before_stop = 1) { exit_after_signals.store(signals_before_stop); }
/// Set value not greater then 0 to mark the query as stopped.
static void stop() { exit_after_signals.store(0); }
/// Return true if the query was stopped.
/// Query was stopped if it received at least "signals_before_stop" interrupt signals.
static bool try_stop() { return exit_after_signals.fetch_sub(1) <= 0; }
static bool cancelled() { return exit_after_signals.load() <= 0; }
/// Return how much interrupt signals remain before stop.
static Int32 cancelled_status() { return exit_after_signals.load(); }
};
/// This signal handler is set for SIGINT and SIGQUIT.
void interruptSignalHandler(int signum)
{
if (QueryInterruptHandler::try_stop())
safeExit(128 + signum);
}
/// To cancel the query on local format error.
class LocalFormatError : public DB::Exception
{
@ -307,19 +258,7 @@ public:
};
ClientBase::~ClientBase()
{
try
{
writeSignalIDtoSignalPipe(SignalListener::StopThread);
signal_listener_thread.join();
HandledSignals::instance().reset();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
ClientBase::~ClientBase() = default;
ClientBase::ClientBase(
int in_fd_,
@ -345,31 +284,6 @@ ClientBase::ClientBase(
terminal_width = getTerminalWidth(in_fd, err_fd);
}
void ClientBase::setupSignalHandler()
{
QueryInterruptHandler::stop();
struct sigaction new_act;
memset(&new_act, 0, sizeof(new_act));
new_act.sa_handler = interruptSignalHandler;
new_act.sa_flags = 0;
#if defined(OS_DARWIN)
sigemptyset(&new_act.sa_mask);
#else
if (sigemptyset(&new_act.sa_mask))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
#endif
if (sigaction(SIGINT, &new_act, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
if (sigaction(SIGQUIT, &new_act, nullptr))
throw ErrnoException(ErrorCodes::CANNOT_SET_SIGNAL_HANDLER, "Cannot set signal handler");
}
ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements)
{
std::unique_ptr<IParserBase> parser;
@ -1113,8 +1027,8 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
{
try
{
QueryInterruptHandler::start(signals_before_stop);
SCOPE_EXIT({ QueryInterruptHandler::stop(); });
query_interrupt_handler.start(signals_before_stop);
SCOPE_EXIT({ query_interrupt_handler.stop(); });
connection->sendQuery(
connection_parameters.timeouts,
@ -1178,13 +1092,13 @@ void ClientBase::receiveResult(ASTPtr parsed_query, Int32 signals_before_stop, b
/// to avoid losing sync.
if (!cancelled)
{
if (partial_result_on_first_cancel && QueryInterruptHandler::cancelled_status() == signals_before_stop - 1)
if (partial_result_on_first_cancel && query_interrupt_handler.cancelled_status() == signals_before_stop - 1)
{
connection->sendCancel();
/// First cancel reading request was sent. Next requests will only be with a full cancel
partial_result_on_first_cancel = false;
}
else if (QueryInterruptHandler::cancelled())
else if (query_interrupt_handler.cancelled())
{
cancelQuery();
}
@ -1563,8 +1477,8 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
return;
}
QueryInterruptHandler::start();
SCOPE_EXIT({ QueryInterruptHandler::stop(); });
query_interrupt_handler.start();
SCOPE_EXIT({ query_interrupt_handler.stop(); });
connection->sendQuery(
connection_parameters.timeouts,
@ -1775,7 +1689,7 @@ try
Block block;
while (executor.pull(block))
{
if (!cancelled && QueryInterruptHandler::cancelled())
if (!cancelled && query_interrupt_handler.cancelled())
{
cancelQuery();
executor.cancel();
@ -2857,7 +2771,6 @@ void ClientBase::runLibFuzzer()
void ClientBase::runLibFuzzer() {}
#endif
void ClientBase::clearTerminal()
{
/// Clear from cursor until end of screen.
@ -2867,288 +2780,9 @@ void ClientBase::clearTerminal()
output_stream << "\033[0J" "\033[?25h";
}
void ClientBase::showClientVersion()
{
output_stream << VERSION_NAME << " " + getName() + " version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
}
void ClientBase::init(int argc, char ** argv)
{
namespace po = boost::program_options;
/// Don't parse options with Poco library, we prefer neat boost::program_options.
stopOptionsProcessing();
stdin_is_a_tty = isatty(STDIN_FILENO);
stdout_is_a_tty = isatty(STDOUT_FILENO);
stderr_is_a_tty = isatty(STDERR_FILENO);
terminal_width = getTerminalWidth();
std::vector<Arguments> external_tables_arguments;
Arguments common_arguments = {""}; /// 0th argument is ignored.
std::vector<Arguments> hosts_and_ports_arguments;
if (argc)
argv0 = argv[0];
readArguments(argc, argv, common_arguments, external_tables_arguments, hosts_and_ports_arguments);
/// Support for Unicode dashes
/// Interpret Unicode dashes as default double-hyphen
for (auto & arg : common_arguments)
{
// replace em-dash(U+2014)
boost::replace_all(arg, "", "--");
// replace en-dash(U+2013)
boost::replace_all(arg, "", "--");
// replace mathematical minus(U+2212)
boost::replace_all(arg, "", "--");
}
OptionsDescription options_description;
options_description.main_description.emplace(createOptionsDescription("Main options", terminal_width));
/// Common options for clickhouse-client and clickhouse-local.
options_description.main_description->add_options()
("help", "print usage summary, combine with --verbose to display all options")
("verbose", "print query and other debugging info")
("version,V", "print version information and exit")
("version-clean", "print version in machine-readable format and exit")
("config-file,C", po::value<std::string>(), "config-file path")
("query,q", po::value<std::vector<std::string>>()->multitoken(), R"(Query. Can be specified multiple times (--query "SELECT 1" --query "SELECT 2") or once with multiple comma-separated queries (--query "SELECT 1; SELECT 2;"). In the latter case, INSERT queries with non-VALUE format must be separated by empty lines.)")
("queries-file", po::value<std::vector<std::string>>()->multitoken(), "file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("multiquery,n", "Obsolete, does nothing")
("multiline,m", "If specified, allow multiline queries (do not send the query on Enter)")
("database,d", po::value<std::string>(), "database")
("query_kind", po::value<std::string>()->default_value("initial_query"), "One of initial_query/secondary_query/no_query")
("query_id", po::value<std::string>(), "query_id")
("history_file", po::value<std::string>(), "path to history file")
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("progress", po::value<ProgressOption>()->implicit_value(ProgressOption::TTY, "tty")->default_value(ProgressOption::DEFAULT, "default"), "Print progress of queries execution - to TTY: tty|on|1|true|yes; to STDERR non-interactive mode: err; OFF: off|0|false|no; DEFAULT - interactive to TTY, non-interactive is off")
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("wait_for_suggestions_to_load", "Load suggestion data synchonously.")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
("memory-usage", po::value<std::string>()->implicit_value("default")->default_value("none"), "print memory usage to stderr in non-interactive mode (for benchmarks). Values: 'none', 'default', 'readable'")
("echo", "in batch mode, print query before execution")
("log-level", po::value<std::string>(), "log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
("suggestion_limit", po::value<int>()->default_value(10000), "Suggestion limit for how many databases, tables and columns to fetch.")
("format,f", po::value<std::string>(), "default output format (and input format for clickhouse-local)")
("output-format", po::value<std::string>(), "default output format (this option has preference over --format)")
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
("highlight", po::value<bool>()->default_value(true), "enable or disable basic syntax highlight in interactive command line")
("ignore-error", "do not stop processing when an error occurs")
("stacktrace", "print stack traces of exceptions")
("hardware-utilization", "print hardware utilization information in progress bar")
("print-profile-events", po::value(&profile_events.print)->zero_tokens(), "Printing ProfileEvents packets")
("profile-events-delay-ms", po::value<UInt64>()->default_value(profile_events.delay_ms), "Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet)")
("processed-rows", "print the number of locally processed rows")
("interactive", "Process queries-file or --query query and start interactive mode")
("pager", po::value<std::string>(), "Pipe all output into this command (less or similar)")
("max_memory_usage_in_client", po::value<std::string>(), "Set memory limit in client/local server")
("fuzzer-args", po::value<std::string>(), "Command line arguments for the LLVM's libFuzzer driver. Only relevant if the application is compiled with libFuzzer.")
("client_logs_file", po::value<std::string>(), "Path to a file for writing client logs. Currently we only have fatal logs (when the client crashes)")
;
addOptions(options_description);
OptionsDescription options_description_non_verbose = options_description;
auto getter = [](const auto & op)
{
String op_long_name = op->long_name();
return "--" + String(op_long_name);
};
if (options_description.main_description)
{
const auto & main_options = options_description.main_description->options();
std::transform(main_options.begin(), main_options.end(), std::back_inserter(cmd_options), getter);
}
if (options_description.external_description)
{
const auto & external_options = options_description.external_description->options();
std::transform(external_options.begin(), external_options.end(), std::back_inserter(cmd_options), getter);
}
po::variables_map options;
parseAndCheckOptions(options_description, options, common_arguments);
po::notify(options);
if (options.count("version") || options.count("V"))
{
showClientVersion();
exit(0); // NOLINT(concurrency-mt-unsafe)
}
if (options.count("version-clean"))
{
output_stream << VERSION_STRING;
exit(0); // NOLINT(concurrency-mt-unsafe)
}
if (options.count("verbose"))
getClientConfiguration().setBool("verbose", true);
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
{
if (getClientConfiguration().getBool("verbose", false))
printHelpMessage(options_description, true);
else
printHelpMessage(options_description_non_verbose, false);
exit(0); // NOLINT(concurrency-mt-unsafe)
}
/// Common options for clickhouse-client and clickhouse-local.
/// Output execution time to stderr in batch mode.
if (options.count("time"))
getClientConfiguration().setBool("print-time-to-stderr", true);
if (options.count("memory-usage"))
{
const auto & memory_usage_mode = options["memory-usage"].as<std::string>();
if (memory_usage_mode != "none" && memory_usage_mode != "default" && memory_usage_mode != "readable")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown memory-usage mode: {}", memory_usage_mode);
getClientConfiguration().setString("print-memory-to-stderr", memory_usage_mode);
}
if (options.count("query"))
queries = options["query"].as<std::vector<std::string>>();
if (options.count("query_id"))
getClientConfiguration().setString("query_id", options["query_id"].as<std::string>());
if (options.count("database"))
getClientConfiguration().setString("database", options["database"].as<std::string>());
if (options.count("config-file"))
getClientConfiguration().setString("config-file", options["config-file"].as<std::string>());
if (options.count("queries-file"))
queries_files = options["queries-file"].as<std::vector<std::string>>();
if (options.count("multiline"))
getClientConfiguration().setBool("multiline", true);
if (options.count("ignore-error"))
getClientConfiguration().setBool("ignore-error", true);
if (options.count("format"))
getClientConfiguration().setString("format", options["format"].as<std::string>());
if (options.count("output-format"))
getClientConfiguration().setString("output-format", options["output-format"].as<std::string>());
if (options.count("vertical"))
getClientConfiguration().setBool("vertical", true);
if (options.count("stacktrace"))
getClientConfiguration().setBool("stacktrace", true);
if (options.count("print-profile-events"))
getClientConfiguration().setBool("print-profile-events", true);
if (options.count("profile-events-delay-ms"))
getClientConfiguration().setUInt64("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
/// Whether to print the number of processed rows at
if (options.count("processed-rows"))
getClientConfiguration().setBool("print-num-processed-rows", true);
if (options.count("progress"))
{
switch (options["progress"].as<ProgressOption>())
{
case DEFAULT:
getClientConfiguration().setString("progress", "default");
break;
case OFF:
getClientConfiguration().setString("progress", "off");
break;
case TTY:
getClientConfiguration().setString("progress", "tty");
break;
case ERR:
getClientConfiguration().setString("progress", "err");
break;
}
}
if (options.count("echo"))
getClientConfiguration().setBool("echo", true);
if (options.count("disable_suggestion"))
getClientConfiguration().setBool("disable_suggestion", true);
if (options.count("wait_for_suggestions_to_load"))
getClientConfiguration().setBool("wait_for_suggestions_to_load", true);
if (options.count("suggestion_limit"))
getClientConfiguration().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
if (options.count("highlight"))
getClientConfiguration().setBool("highlight", options["highlight"].as<bool>());
if (options.count("history_file"))
getClientConfiguration().setString("history_file", options["history_file"].as<std::string>());
if (options.count("interactive"))
getClientConfiguration().setBool("interactive", true);
if (options.count("pager"))
getClientConfiguration().setString("pager", options["pager"].as<std::string>());
if (options.count("log-level"))
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
query_kind = parseQueryKind(options["query_kind"].as<std::string>());
profile_events.print = options.count("print-profile-events");
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
processOptions(options_description, options, external_tables_arguments, hosts_and_ports_arguments);
{
std::unordered_set<std::string> alias_names;
alias_names.reserve(options_description.main_description->options().size());
for (const auto& option : options_description.main_description->options())
alias_names.insert(option->long_name());
argsToConfig(common_arguments, getClientConfiguration(), 100, &alias_names);
}
clearPasswordFromCommandLine(argc, argv);
/// Limit on total memory usage
std::string max_client_memory_usage = getClientConfiguration().getString("max_memory_usage_in_client", "0" /*default value*/);
if (max_client_memory_usage != "0")
{
UInt64 max_client_memory_usage_int = parseWithSizeSuffix<UInt64>(max_client_memory_usage.c_str(), max_client_memory_usage.length());
total_memory_tracker.setHardLimit(max_client_memory_usage_int);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}
/// Print stacktrace in case of crash
HandledSignals::instance().setupTerminateHandler();
HandledSignals::instance().setupCommonDeadlySignalHandlers();
/// We don't setup signal handlers for SIGINT, SIGQUIT, SIGTERM because we don't
/// have an option for client to shutdown gracefully.
fatal_channel_ptr = new Poco::SplitterChannel;
fatal_console_channel_ptr = new Poco::ConsoleChannel;
fatal_channel_ptr->addChannel(fatal_console_channel_ptr);
if (options.count("client_logs_file"))
{
fatal_file_channel_ptr = new Poco::SimpleFileChannel(options["client_logs_file"].as<std::string>());
fatal_channel_ptr->addChannel(fatal_file_channel_ptr);
}
fatal_log = createLogger("ClientBase", fatal_channel_ptr.get(), Poco::Message::PRIO_FATAL);
signal_listener = std::make_unique<SignalListener>(nullptr, fatal_log);
signal_listener_thread.start(*signal_listener);
#if USE_GWP_ASAN
GWPAsan::initFinished();
#endif
}
}

View File

@ -1,26 +1,32 @@
#pragma once
#include <string_view>
#include "Common/NamePrompter.h"
#include <Parsers/ASTCreateQuery.h>
#include <Common/ProgressIndication.h>
#include <Client/Suggest.h>
#include <Client/QueryFuzzer.h>
#include <Common/DNSResolver.h>
#include <Common/InterruptListener.h>
#include <Common/ProgressIndication.h>
#include <Common/ShellCommand.h>
#include <Common/Stopwatch.h>
#include <Common/DNSResolver.h>
#include <Core/ExternalTable.h>
#include <Core/Settings.h>
#include <Poco/Util/Application.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/SimpleFileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Interpreters/Context.h>
#include <Client/Suggest.h>
#include <Client/QueryFuzzer.h>
#include <boost/program_options.hpp>
#include <Storages/StorageFile.h>
#include <Storages/SelectQueryInfo.h>
#include <Parsers/ASTCreateQuery.h>
#include <Poco/Util/Application.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageFile.h>
#include <boost/program_options.hpp>
#include <atomic>
#include <optional>
#include <string_view>
#include <string>
namespace po = boost::program_options;
@ -64,9 +70,16 @@ std::istream& operator>> (std::istream & in, ProgressOption & progress);
class InternalTextLogs;
class WriteBufferFromFileDescriptor;
class ClientBase : public Poco::Util::Application, public IHints<2>
/**
* The base class which encapsulates the core functionality of a client.
* Can be used in a standalone application (clickhouse-client or clickhouse-local),
* or be embedded into server.
* Always keep in mind that there can be several instances of this class within
* a process. Thus, it cannot keep its state in global shared variables or even use them.
* The best example - std::cin, std::cout and std::cerr.
*/
class ClientBase
{
public:
using Arguments = std::vector<String>;
@ -79,12 +92,11 @@ public:
std::ostream & output_stream_ = std::cout,
std::ostream & error_stream_ = std::cerr
);
virtual ~ClientBase();
~ClientBase() override;
bool tryStopQuery() { return query_interrupt_handler.tryStop(); }
void stopQuery() { query_interrupt_handler.stop(); }
void init(int argc, char ** argv);
std::vector<String> getAllRegisteredNames() const override { return cmd_options; }
ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements);
protected:
@ -114,7 +126,7 @@ protected:
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);
static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks);
static void setupSignalHandler();
virtual void setupSignalHandler() = 0;
bool executeMultiQuery(const String & all_queries_text);
MultiQueryProcessingStage analyzeMultiQueryText(
@ -188,7 +200,6 @@ private:
String prompt() const;
void resetOutput();
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
void updateSuggest(const ASTPtr & ast);
@ -196,6 +207,31 @@ private:
bool addMergeTreeSettings(ASTCreateQuery & ast_create);
protected:
class QueryInterruptHandler : private boost::noncopyable
{
public:
/// Store how much interrupt signals can be before stopping the query
/// by default stop after the first interrupt signal.
void start(Int32 signals_before_stop = 1) { exit_after_signals.store(signals_before_stop); }
/// Set value not greater then 0 to mark the query as stopped.
void stop() { exit_after_signals.store(0); }
/// Return true if the query was stopped.
/// Query was stopped if it received at least "signals_before_stop" interrupt signals.
bool tryStop() { return exit_after_signals.fetch_sub(1) <= 0; }
bool cancelled() { return exit_after_signals.load() <= 0; }
/// Return how much interrupt signals remain before stop.
Int32 cancelled_status() { return exit_after_signals.load(); }
private:
std::atomic<Int32> exit_after_signals = 0;
};
QueryInterruptHandler query_interrupt_handler;
static bool isSyncInsertWithData(const ASTInsertQuery & insert_query, const ContextPtr & context);
bool processMultiQueryFromFile(const String & file_name);
@ -219,13 +255,6 @@ protected:
/// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server.
ContextMutablePtr client_context;
LoggerPtr fatal_log;
Poco::AutoPtr<Poco::SplitterChannel> fatal_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_console_channel_ptr;
Poco::AutoPtr<Poco::Channel> fatal_file_channel_ptr;
Poco::Thread signal_listener_thread;
std::unique_ptr<Poco::Runnable> signal_listener;
bool is_interactive = false; /// Use either interactive line editing interface or batch mode.
bool delayed_interactive = false;
@ -239,7 +268,6 @@ protected:
std::vector<String> queries; /// Queries passed via '--query'
std::vector<String> queries_files; /// If not empty, queries will be read from these files
std::vector<String> interleave_queries_files; /// If not empty, run queries from these files before processing every file from 'queries_files'.
std::vector<String> cmd_options;
bool stdin_is_a_tty = false; /// stdin is a terminal.
bool stdout_is_a_tty = false; /// stdout is a terminal.

View File

@ -1,4 +1,4 @@
#include <Client/ClientBase.h>
#include <Client/ClientApplicationBase.h>
#include <Core/BaseSettingsProgramOptions.h>
namespace DB
@ -80,7 +80,7 @@ private:
}
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
void ClientApplicationBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
{
if (allow_repeated_settings)
addProgramOptionsAsMultitokens(cmd_settings, options_description.main_description.value());

View File

@ -1,4 +1,5 @@
#include <Common/SharedMutex.h>
#include <base/getThreadId.h>
#ifdef OS_LINUX /// Because of futex
@ -12,6 +13,7 @@ namespace DB
SharedMutex::SharedMutex()
: state(0)
, waiters(0)
, writer_thread_id(0)
{}
void SharedMutex::lock()
@ -29,6 +31,10 @@ void SharedMutex::lock()
break;
}
/// The first step of acquiring the exclusive ownership is finished.
/// Now we just wait until all readers release the shared ownership.
writer_thread_id.store(getThreadId());
value |= writers;
while (value & readers)
futexWaitLowerFetch(state, value);
@ -37,11 +43,15 @@ void SharedMutex::lock()
bool SharedMutex::try_lock()
{
UInt64 value = 0;
return state.compare_exchange_strong(value, writers);
bool success = state.compare_exchange_strong(value, writers);
if (success)
writer_thread_id.store(getThreadId());
return success;
}
void SharedMutex::unlock()
{
writer_thread_id.store(0);
state.store(0);
if (waiters)
futexWakeUpperAll(state);

View File

@ -19,6 +19,8 @@ public:
~SharedMutex() = default;
SharedMutex(const SharedMutex &) = delete;
SharedMutex & operator=(const SharedMutex &) = delete;
SharedMutex(SharedMutex &&) = delete;
SharedMutex & operator=(SharedMutex &&) = delete;
// Exclusive ownership
void lock() TSA_ACQUIRE();
@ -36,6 +38,8 @@ private:
alignas(64) std::atomic<UInt64> state;
std::atomic<UInt32> waiters;
/// Is set while the lock is held (or is in the process of being acquired) in exclusive mode only to facilitate debugging
std::atomic<UInt64> writer_thread_id;
};
}

View File

@ -64,6 +64,7 @@
#cmakedefine01 USE_LIBARCHIVE
#cmakedefine01 USE_POCKETFFT
#cmakedefine01 USE_PROMETHEUS_PROTOBUFS
#cmakedefine01 USE_NUMACTL
/// This is needed for .incbin in assembly. For some reason, include paths don't work there in presence of LTO.
/// That's why we use absolute paths.

View File

@ -893,6 +893,8 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \

View File

@ -81,7 +81,9 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"ignore_on_cluster_for_replicated_named_collections_queries", false, false, "Ignore ON CLUSTER clause for replicated named collections management queries."},
{"backup_restore_s3_retry_attempts", 1000,1000, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries. It takes place only for backup/restore."},
{"postgresql_connection_attempt_timeout", 2, 2, "Allow to control 'connect_timeout' parameter of PostgreSQL connection."},
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."}
{"postgresql_connection_pool_retries", 2, 2, "Allow to control the number of retries in PostgreSQL connection pool."},
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."}
}},
{"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"},
{"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"},

View File

@ -252,7 +252,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.msgpack.number_of_columns = settings.input_format_msgpack_number_of_columns;
format_settings.msgpack.output_uuid_representation = settings.output_format_msgpack_uuid_representation;
format_settings.max_rows_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.max_bytes_to_read_for_schema_inference = settings.input_format_max_rows_to_read_for_schema_inference;
format_settings.max_bytes_to_read_for_schema_inference = settings.input_format_max_bytes_to_read_for_schema_inference;
format_settings.column_names_for_schema_inference = settings.column_names_for_schema_inference;
format_settings.schema_inference_hints = settings.schema_inference_hints;
format_settings.schema_inference_make_columns_nullable = settings.schema_inference_make_columns_nullable;

View File

@ -582,11 +582,11 @@ namespace
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
FunctionArgumentDescriptors mandatory_args{
{"time", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"},
{"format", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"}
{"time", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"}
};
FunctionArgumentDescriptors optional_args{
{"format", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), nullptr, "String"},
{"timezone", static_cast<FunctionArgumentDescriptor::TypeValidator>(&isString), &isColumnConst, "const String"}
};
@ -2029,14 +2029,24 @@ namespace
String getFormat(const ColumnsWithTypeAndName & arguments) const
{
const auto * format_column = checkAndGetColumnConst<ColumnString>(arguments[1].column.get());
if (!format_column)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of second ('format') argument of function {}. Must be constant string.",
arguments[1].column->getName(),
getName());
return format_column->getValue<String>();
if (arguments.size() == 1)
{
if constexpr (parse_syntax == ParseSyntax::MySQL)
return "%Y-%m-%d %H:%i:%s";
else
return "yyyy-MM-dd HH:mm:ss";
}
else
{
const auto * col_format = checkAndGetColumnConst<ColumnString>(arguments[1].column.get());
if (!col_format)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of second ('format') argument of function {}. Must be constant string.",
arguments[1].column->getName(),
getName());
return col_format->getValue<String>();
}
}
const DateLUTImpl & getTimeZone(const ColumnsWithTypeAndName & arguments) const

View File

@ -379,6 +379,10 @@ struct ContextSharedPart : boost::noncopyable
ActionLocksManagerPtr action_locks_manager; /// Set of storages' action lockers
OnceFlag system_logs_initialized;
std::unique_ptr<SystemLogs> system_logs TSA_GUARDED_BY(mutex); /// Used to log queries and operations on parts
mutable std::mutex dashboard_mutex;
std::optional<Context::Dashboards> dashboards;
std::optional<S3SettingsByEndpoint> storage_s3_settings TSA_GUARDED_BY(mutex); /// Settings of S3 storage
std::vector<String> warnings TSA_GUARDED_BY(mutex); /// Store warning messages about server configuration.
@ -4313,6 +4317,51 @@ std::vector<ISystemLog *> Context::getSystemLogs() const
return shared->system_logs->logs;
}
std::optional<Context::Dashboards> Context::getDashboards() const
{
std::lock_guard lock(shared->dashboard_mutex);
if (!shared->dashboards)
return {};
return shared->dashboards;
}
namespace
{
String trim(const String & text)
{
std::string_view view(text);
::trim(view, '\n');
return String(view);
}
}
void Context::setDashboardsConfig(const ConfigurationPtr & config)
{
Poco::Util::AbstractConfiguration::Keys keys;
config->keys("dashboards", keys);
Dashboards dashboards;
for (const auto & key : keys)
{
const auto & prefix = "dashboards." + key + ".";
dashboards.push_back({
{ "dashboard", config->getString(prefix + "dashboard") },
{ "title", config->getString(prefix + "title") },
{ "query", trim(config->getString(prefix + "query")) },
});
}
{
std::lock_guard lock(shared->dashboard_mutex);
if (!dashboards.empty())
shared->dashboards.emplace(std::move(dashboards));
else
shared->dashboards.reset();
}
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{

View File

@ -1153,6 +1153,10 @@ public:
std::vector<ISystemLog *> getSystemLogs() const;
using Dashboards = std::vector<std::map<String, String>>;
std::optional<Dashboards> getDashboards() const;
void setDashboardsConfig(const ConfigurationPtr & config);
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.
std::shared_ptr<PartLog> getPartLog(const String & part_database) const;

View File

@ -959,12 +959,40 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1015,6 +1043,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}

View File

@ -15,11 +15,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: JSONAsRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_, format_settings_) {}
JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & format_settings_) :
JSONEachRowRowInputFormat(*buf_, header_, std::move(params_), format_settings_, false), buf(std::move(buf_))
JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) :
JSONEachRowRowInputFormat(in_, header_, std::move(params_), format_settings_, false)
{
if (header_.columns() > 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -27,19 +24,6 @@ JSONAsRowInputFormat::JSONAsRowInputFormat(const Block & header_, std::unique_pt
header_.columns());
}
void JSONAsRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
JSONEachRowRowInputFormat::setReadBuffer(*buf);
}
void JSONAsRowInputFormat::resetReadBuffer()
{
buf.reset();
JSONEachRowRowInputFormat::resetReadBuffer();
}
bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
assert(columns.size() == 1);
@ -48,35 +32,41 @@ bool JSONAsRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
if (!allow_new_rows)
return false;
skipWhitespaceIfAny(*buf);
if (!buf->eof())
skipWhitespaceIfAny(*in);
if (!in->eof())
{
if (!data_in_square_brackets && *buf->position() == ';')
if (!data_in_square_brackets && *in->position() == ';')
{
/// ';' means the end of query, but it cannot be before ']'.
return allow_new_rows = false;
}
else if (data_in_square_brackets && *buf->position() == ']')
else if (data_in_square_brackets && *in->position() == ']')
{
/// ']' means the end of query.
return allow_new_rows = false;
}
}
if (!buf->eof())
if (!in->eof())
readJSONObject(*columns[0]);
skipWhitespaceIfAny(*buf);
if (!buf->eof() && *buf->position() == ',')
++buf->position();
skipWhitespaceIfAny(*buf);
skipWhitespaceIfAny(*in);
if (!in->eof() && *in->position() == ',')
++in->position();
skipWhitespaceIfAny(*in);
return !buf->eof();
return !in->eof();
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: JSONAsRowInputFormat(header_, in_, params_, format_settings_)
const Block & header_, ReadBuffer & in_, IRowInputFormat::Params params_, const FormatSettings & format_settings_)
: JSONAsStringRowInputFormat(header_, std::make_unique<PeekableReadBuffer>(in_), params_, format_settings_)
{
}
JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(
const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & format_settings_)
: JSONAsRowInputFormat(header_, *buf_, params_, format_settings_), buf(std::move(buf_))
{
if (!isString(removeNullable(removeLowCardinality(header_.getByPosition(0).type))))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
@ -84,6 +74,18 @@ JSONAsStringRowInputFormat::JSONAsStringRowInputFormat(
header_.getByPosition(0).type->getName());
}
void JSONAsStringRowInputFormat::setReadBuffer(ReadBuffer & in_)
{
buf = std::make_unique<PeekableReadBuffer>(in_);
JSONAsRowInputFormat::setReadBuffer(*buf);
}
void JSONAsStringRowInputFormat::resetReadBuffer()
{
buf.reset();
JSONAsRowInputFormat::resetReadBuffer();
}
void JSONAsStringRowInputFormat::readJSONObject(IColumn & column)
{
PeekableReadBufferCheckpoint checkpoint{*buf};
@ -174,7 +176,7 @@ JSONAsObjectRowInputFormat::JSONAsObjectRowInputFormat(
void JSONAsObjectRowInputFormat::readJSONObject(IColumn & column)
{
serializations[0]->deserializeTextJSON(column, *buf, format_settings);
serializations[0]->deserializeTextJSON(column, *in, format_settings);
}
Chunk JSONAsObjectRowInputFormat::getChunkForCount(size_t rows)

View File

@ -18,17 +18,11 @@ class JSONAsRowInputFormat : public JSONEachRowRowInputFormat
public:
JSONAsRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings);
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;
private:
JSONAsRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & format_settings);
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
protected:
virtual void readJSONObject(IColumn & column) = 0;
std::unique_ptr<PeekableReadBuffer> buf;
};
/// Each JSON object is parsed as a whole to string.
@ -36,11 +30,18 @@ protected:
class JSONAsStringRowInputFormat final : public JSONAsRowInputFormat
{
public:
JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings);
JSONAsStringRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
String getName() const override { return "JSONAsStringRowInputFormat"; }
void setReadBuffer(ReadBuffer & in_) override;
void resetReadBuffer() override;
private:
JSONAsStringRowInputFormat(const Block & header_, std::unique_ptr<PeekableReadBuffer> buf_, Params params_, const FormatSettings & format_settings_);
void readJSONObject(IColumn & column) override;
std::unique_ptr<PeekableReadBuffer> buf;
};

View File

@ -1,5 +1,4 @@
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Common/MemoryTrackerBlockerInThread.h>
namespace DB
{
@ -72,21 +71,9 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
Columns IMergeTreeDataPartWriter::releaseIndexColumns()
{
/// The memory for index was allocated without thread memory tracker.
/// We need to deallocate it in shrinkToFit without memory tracker as well.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
Columns result;
result.reserve(index_columns.size());
for (auto & column : index_columns)
{
column->shrinkToFit();
result.push_back(std::move(column));
}
index_columns.clear();
return result;
return Columns(
std::make_move_iterator(index_columns.begin()),
std::make_move_iterator(index_columns.end()));
}
SerializationPtr IMergeTreeDataPartWriter::getSerialization(const String & column_name) const

View File

@ -255,12 +255,6 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex()
index_compressor_stream = std::make_unique<CompressedWriteBuffer>(*index_file_hashing_stream, primary_key_compression_codec, settings.primary_key_compress_block_size);
index_source_hashing_stream = std::make_unique<HashingWriteBuffer>(*index_compressor_stream);
}
const auto & primary_key_types = metadata_snapshot->getPrimaryKey().data_types;
index_serializations.reserve(primary_key_types.size());
for (const auto & type : primary_key_types)
index_serializations.push_back(type->getDefaultSerialization());
}
}
@ -306,33 +300,22 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
store = std::make_shared<GinIndexStore>(stream_name, data_part_storage, data_part_storage, storage_settings->max_digestion_size_per_segment);
gin_index_stores[stream_name] = store;
}
skip_indices_aggregators.push_back(skip_index->createIndexAggregatorForPart(store, settings));
skip_index_accumulated_marks.push_back(0);
}
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndexRow(const Block & index_block, size_t row)
{
chassert(index_block.columns() == index_serializations.size());
auto & index_stream = compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream;
for (size_t i = 0; i < index_block.columns(); ++i)
{
const auto & column = index_block.getByPosition(i).column;
index_columns[i]->insertFrom(*column, row);
index_serializations[i]->serializeBinary(*column, row, index_stream, {});
}
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Block & primary_index_block, const Granules & granules_to_write)
{
if (!metadata_snapshot->hasPrimaryKey())
return;
size_t primary_columns_num = primary_index_block.columns();
if (index_columns.empty())
index_columns = primary_index_block.cloneEmptyColumns();
{
index_types = primary_index_block.getDataTypes();
index_columns.resize(primary_columns_num);
last_block_index_columns.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
index_columns[i] = primary_index_block.getByPosition(i).column->cloneEmpty();
}
{
/** While filling index (index_columns), disable memory tracker.
@ -346,14 +329,22 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (const auto & granule : granules_to_write)
{
if (granule.mark_on_start)
calculateAndSerializePrimaryIndexRow(primary_index_block, granule.start_row);
if (metadata_snapshot->hasPrimaryKey() && granule.mark_on_start)
{
for (size_t j = 0; j < primary_columns_num; ++j)
{
const auto & primary_column = primary_index_block.getByPosition(j);
index_columns[j]->insertFrom(*primary_column.column, granule.start_row);
primary_column.type->getDefaultSerialization()->serializeBinary(
*primary_column.column, granule.start_row, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream, {});
}
}
}
}
/// Store block with last index row to write final mark at the end of column
if (with_final_mark)
last_index_block = primary_index_block;
/// store last index row to write final mark at the end of column
for (size_t j = 0; j < primary_columns_num; ++j)
last_block_index_columns[j] = primary_index_block.getByPosition(j).column;
}
void MergeTreeDataPartWriterOnDisk::calculateAndSerializeStatistics(const Block & block)
@ -430,11 +421,17 @@ void MergeTreeDataPartWriterOnDisk::fillPrimaryIndexChecksums(MergeTreeData::Dat
if (index_file_hashing_stream)
{
if (write_final_mark && last_index_block)
if (write_final_mark)
{
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
calculateAndSerializePrimaryIndexRow(last_index_block, last_index_block.rows() - 1);
last_index_block.clear();
for (size_t j = 0; j < index_columns.size(); ++j)
{
const auto & column = *last_block_index_columns[j];
size_t last_row_number = column.size() - 1;
index_columns[j]->insertFrom(column, last_row_number);
index_types[j]->getDefaultSerialization()->serializeBinary(
column, last_row_number, compress_primary_key ? *index_source_hashing_stream : *index_file_hashing_stream, {});
}
last_block_index_columns.clear();
}
if (compress_primary_key)

View File

@ -173,10 +173,10 @@ protected:
std::unique_ptr<HashingWriteBuffer> index_source_hashing_stream;
bool compress_primary_key;
/// Last block with index columns.
/// It's written to index file in the `writeSuffixAndFinalizePart` method.
Block last_index_block;
Serializations index_serializations;
DataTypes index_types;
/// Index columns from the last block
/// It's written to index file in the `writeSuffixAndFinalizePart` method
Columns last_block_index_columns;
bool data_written = false;
@ -193,7 +193,6 @@ private:
void initStatistics();
virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) = 0;
void calculateAndSerializePrimaryIndexRow(const Block & index_block, size_t row);
struct ExecutionStatistics
{

View File

@ -1,6 +1,7 @@
#include <string_view>
#include <Storages/System/StorageSystemDashboards.h>
#include <Common/StringUtils.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -22,9 +23,9 @@ String trim(const char * text)
return String(view);
}
void StorageSystemDashboards::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
void StorageSystemDashboards::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const
{
static const std::vector<std::map<String, String>> dashboards
static const std::vector<std::map<String, String>> default_dashboards
{
/// Default dashboard for self-managed ClickHouse
{
@ -371,13 +372,22 @@ ORDER BY t WITH FILL STEP {rounding:UInt32}
}
};
for (const auto & row : dashboards)
auto add_dashboards = [&](const auto & dashboards)
{
size_t i = 0;
res_columns[i++]->insert(row.at("dashboard"));
res_columns[i++]->insert(row.at("title"));
res_columns[i++]->insert(row.at("query"));
}
for (const auto & row : dashboards)
{
size_t i = 0;
res_columns[i++]->insert(row.at("dashboard"));
res_columns[i++]->insert(row.at("title"));
res_columns[i++]->insert(row.at("query"));
}
};
const auto & context_dashboards = context->getDashboards();
if (context_dashboards.has_value())
add_dashboards(*context_dashboards);
else
add_dashboards(default_dashboards);
}
}

View File

@ -22,7 +22,7 @@ public:
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const override;
void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const override;
};
}

View File

@ -173,5 +173,8 @@ endif()
if (TARGET ch_contrib::prometheus_protobufs)
set(USE_PROMETHEUS_PROTOBUFS 1)
endif()
if (TARGET ch_contrib::numactl)
set(USE_NUMACTL 1)
endif()
set(SOURCE_DIR ${PROJECT_SOURCE_DIR})

View File

@ -505,7 +505,7 @@ class Backport:
ReleaseBranch(
(
br
if self._repo_name == "ClickHouse/Clickhouse"
if self._repo_name == "ClickHouse/ClickHouse"
else f"release/{br}"
),
pr,

View File

@ -0,0 +1,15 @@
<clickhouse>
<dashboards>
<dashboard>
<dashboard>Overview</dashboard>
<title>Queries/second</title>
<query>
SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(ProfileEvent_Query)
FROM system.metric_log
WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32}
GROUP BY t
ORDER BY t WITH FILL STEP {rounding:UInt32}
</query>
</dashboard>
</dashboards>
</clickhouse>

View File

@ -0,0 +1,35 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
default = cluster.add_instance("default")
custom = cluster.add_instance("custom", main_configs=["configs/config.d/overrides.xml"])
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_custom_dashboards():
assert int(default.query("select count()>10 from system.dashboards")) == 1
assert int(custom.query("select count() from system.dashboards")) == 1
assert (
default.query(
"select normalizeQuery(query) from system.dashboards where dashboard = 'Overview' and title = 'Queries/second'"
).strip()
== """
SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(ProfileEvent_Query) FROM merge(?..) WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32}
""".strip()
)
custom.query(
"select normalizeQuery(query) from system.dashboards where dashboard = 'Overview' and title = 'Queries/second'"
).strip == """
SELECT toStartOfInterval(event_time, INTERVAL {rounding:UInt32} SECOND)::INT AS t, avg(ProfileEvent_Query) FROM system.metric_log WHERE event_date >= toDate(now() - {seconds:UInt32}) AND event_time >= now() - {seconds:UInt32} GROUP BY t ORDER BY t WITH FILL STEP {rounding:UInt32}
""".strip()

View File

@ -0,0 +1,14 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>replica1</host>
<port>9000</port>
</replica>
<replica>
<host>replica2</host>
<port>9000</port>
</replica>
<replica>
<host>replica3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,218 @@
import pytest
import pymysql.cursors
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
configs = ["configs/remote_servers.xml", "configs/backups_disk.xml"]
node1 = cluster.add_instance(
"replica1",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node2 = cluster.add_instance(
"replica2",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
node3 = cluster.add_instance(
"replica3",
with_zookeeper=True,
with_mysql8=True,
main_configs=configs,
external_dirs=["/backups/"],
)
nodes = [node1, node2, node3]
backup_id_counter = 0
def new_backup_name():
global backup_id_counter
backup_id_counter += 1
return f"Disk('backups', '{backup_id_counter}/')"
def cleanup_nodes(nodes, dbname):
for node in nodes:
node.query(f"DROP DATABASE IF EXISTS {dbname} SYNC")
def fill_nodes(nodes, dbname):
cleanup_nodes(nodes, dbname)
for node in nodes:
node.query(
f"CREATE DATABASE {dbname} ENGINE = Replicated('/clickhouse/databases/{dbname}', 'default', '{node.name}')"
)
def drop_mysql_table(conn, tableName):
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS `clickhouse`.`{tableName}`")
def get_mysql_conn(cluster):
conn = pymysql.connect(
user="root",
password="clickhouse",
host=cluster.mysql8_ip,
port=cluster.mysql8_port,
)
return conn
def fill_tables(cluster, dbname):
fill_nodes(nodes, dbname)
conn = get_mysql_conn(cluster)
with conn.cursor() as cursor:
cursor.execute("DROP DATABASE IF EXISTS clickhouse")
cursor.execute("CREATE DATABASE clickhouse")
cursor.execute("DROP TABLE IF EXISTS clickhouse.inference_table")
cursor.execute(
"CREATE TABLE clickhouse.inference_table (id INT PRIMARY KEY, data BINARY(16) NOT NULL)"
)
cursor.execute(
"INSERT INTO clickhouse.inference_table VALUES (100, X'9fad5e9eefdfb449')"
)
conn.commit()
parameters = "'mysql80:3306', 'clickhouse', 'inference_table', 'root', 'clickhouse'"
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_engine ENGINE=MySQL({parameters})"
)
node1.query(
f"CREATE TABLE {dbname}.mysql_schema_inference_function AS mysql({parameters})"
)
node1.query(f"CREATE TABLE {dbname}.merge_tree (id UInt64, b String) ORDER BY id")
node1.query(f"INSERT INTO {dbname}.merge_tree VALUES (100, 'abc')")
expected = "id\tInt32\t\t\t\t\t\ndata\tFixedString(16)\t\t\t\t\t\n"
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_engine")
== expected
)
assert (
node1.query(f"DESCRIBE TABLE {dbname}.mysql_schema_inference_function")
== expected
)
assert node1.query(f"SELECT id FROM mysql({parameters})") == "100\n"
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_engine") == "100\n"
)
assert (
node1.query(f"SELECT id FROM {dbname}.mysql_schema_inference_function")
== "100\n"
)
assert node1.query(f"SELECT id FROM {dbname}.merge_tree") == "100\n"
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def test_restore_table(start_cluster):
fill_tables(cluster, "replicated")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
node2.query(f"BACKUP DATABASE replicated TO {backup_name}")
node2.query("DROP TABLE replicated.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert node3.query("EXISTS replicated.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated FROM {backup_name} SETTINGS allow_different_database_def=true"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_engine"
)
== "1\t100\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated.mysql_schema_inference_function"
)
== "1\t100\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated.merge_tree") == "1\t100\n"
)
cleanup_nodes(nodes, "replicated")
def test_restore_table_null(start_cluster):
fill_tables(cluster, "replicated2")
backup_name = new_backup_name()
node2.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
node2.query(f"BACKUP DATABASE replicated2 TO {backup_name}")
node2.query("DROP TABLE replicated2.mysql_schema_inference_engine")
node2.query("DROP TABLE replicated2.mysql_schema_inference_function")
node3.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert node3.query("EXISTS replicated2.mysql_schema_inference_engine") == "0\n"
assert node3.query("EXISTS replicated2.mysql_schema_inference_function") == "0\n"
node3.query(
f"RESTORE DATABASE replicated2 FROM {backup_name} SETTINGS allow_different_database_def=1, allow_different_table_def=1 SETTINGS restore_replace_external_engines_to_null=1, restore_replace_external_table_functions_to_null=1"
)
node1.query(f"SYSTEM SYNC DATABASE REPLICA replicated2")
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_engine"
)
== "0\t0\n"
)
assert (
node1.query(
"SELECT count(), sum(id) FROM replicated2.mysql_schema_inference_function"
)
== "0\t0\n"
)
assert (
node1.query("SELECT count(), sum(id) FROM replicated2.merge_tree") == "1\t100\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%mysql%'"
)
== "Null\nNull\n"
)
assert (
node1.query(
"SELECT engine FROM system.tables where database = 'replicated2' and name like '%merge_tree%'"
)
== "MergeTree\n"
)
cleanup_nodes(nodes, "replicated2")

View File

@ -12,6 +12,12 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
MAX_PROCESS_WAIT=5
IS_SANITIZER=$($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.warnings WHERE message like '%built with sanitizer%'")
if [ "$IS_SANITIZER" -gt 0 ]; then
# Query may hang for more than 5 seconds, especially in tsan build
MAX_PROCESS_WAIT=15
fi
# TCP CLIENT: As of today (02/12/21) uses PullingAsyncPipelineExecutor
### Should be cancelled after 1 second and return a 159 exception (timeout)
timeout -s KILL $MAX_PROCESS_WAIT $CLICKHOUSE_CLIENT --max_execution_time 1 -q \

View File

@ -239,7 +239,7 @@ select sTr_To_DaTe('10:04:11 03-07-2019', '%s:%i:%H %d-%m-%Y', 'UTC') = toDateTi
select str_to_date('10:04:11 invalid 03-07-2019', '%s:%i:%H %d-%m-%Y', 'UTC') IS NULL;
1
-- Error handling
select parseDateTime('12 AM'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTime(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTime('12 AM', '%h %p', 'UTC', 'a fourth argument'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
-- Fuzzer crash bug #53715
select parseDateTime('', '', toString(number)) from numbers(13); -- { serverError ILLEGAL_COLUMN }
@ -270,3 +270,7 @@ select parseDateTime('8 13, 2022, 7:58:32', '%c %e, %G, %k:%i:%s', 'UTC');
2022-08-13 07:58:32
select parseDateTime('08 13, 2022, 07:58:32', '%c %e, %G, %k:%i:%s', 'UTC');
2022-08-13 07:58:32
-- The format string argument is optional
set session_timezone = 'UTC'; -- don't randomize the session timezone
select parseDateTime('2021-01-04 23:12:34') = toDateTime('2021-01-04 23:12:34');
1

View File

@ -162,7 +162,7 @@ select sTr_To_DaTe('10:04:11 03-07-2019', '%s:%i:%H %d-%m-%Y', 'UTC') = toDateTi
select str_to_date('10:04:11 invalid 03-07-2019', '%s:%i:%H %d-%m-%Y', 'UTC') IS NULL;
-- Error handling
select parseDateTime('12 AM'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTime(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTime('12 AM', '%h %p', 'UTC', 'a fourth argument'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
-- Fuzzer crash bug #53715
@ -187,4 +187,9 @@ select parseDateTime('08 13, 2022, 07:58:32', '%m %e, %G, %k:%i:%s', 'UTC');
select parseDateTime('8 13, 2022, 7:58:32', '%c %e, %G, %k:%i:%s', 'UTC');
select parseDateTime('08 13, 2022, 07:58:32', '%c %e, %G, %k:%i:%s', 'UTC');
-- The format string argument is optional
set session_timezone = 'UTC'; -- don't randomize the session timezone
select parseDateTime('2021-01-04 23:12:34') = toDateTime('2021-01-04 23:12:34');
-- { echoOff }

View File

@ -354,5 +354,9 @@ select parseDateTimeInJodaSyntaxOrNull('2001 366 2000', 'yyyy D yyyy', 'UTC') =
select parseDateTimeInJodaSyntaxOrNull('2001 invalid 366 2000', 'yyyy D yyyy', 'UTC') IS NULL;
1
-- Error handling
select parseDateTimeInJodaSyntax('12 AM'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTimeInJodaSyntax(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTimeInJodaSyntax('12 AM', 'h a', 'UTC', 'a fourth argument'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
-- The format string argument is optional
set session_timezone = 'UTC'; -- don't randomize the session timezone
select parseDateTimeInJodaSyntax('2021-01-04 23:12:34') = toDateTime('2021-01-04 23:12:34');
1

View File

@ -239,6 +239,11 @@ select parseDateTimeInJodaSyntaxOrNull('2001 366 2000', 'yyyy D yyyy', 'UTC') =
select parseDateTimeInJodaSyntaxOrNull('2001 invalid 366 2000', 'yyyy D yyyy', 'UTC') IS NULL;
-- Error handling
select parseDateTimeInJodaSyntax('12 AM'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTimeInJodaSyntax(); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
select parseDateTimeInJodaSyntax('12 AM', 'h a', 'UTC', 'a fourth argument'); -- { serverError NUMBER_OF_ARGUMENTS_DOESNT_MATCH }
-- The format string argument is optional
set session_timezone = 'UTC'; -- don't randomize the session timezone
select parseDateTimeInJodaSyntax('2021-01-04 23:12:34') = toDateTime('2021-01-04 23:12:34');
-- { echoOff }

View File

@ -1,44 +0,0 @@
<1: created view> a [] 1
CREATE MATERIALIZED VIEW default.a\nREFRESH AFTER 2 SECOND\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT number AS x\nFROM numbers(2)\nUNION ALL\nSELECT rand64() AS x
<2: refreshed> 3 1 1
<3: time difference at least> 1000
<4: next refresh in> 2
<4.5: altered> Scheduled Finished 2052-01-01 00:00:00
CREATE MATERIALIZED VIEW default.a\nREFRESH EVERY 2 YEAR\n(\n `x` UInt64\n)\nENGINE = Memory\nAS SELECT x * 2 AS x\nFROM default.src
<5: no refresh> 3
<6: refreshed> 2
<7: refreshed> Scheduled Finished 2054-01-01 00:00:00
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR DEPENDS ON default.a\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a
<8: refreshed> 20
<9: refreshed> a Scheduled Finished 2054-01-01 00:00:00
<9: refreshed> b Scheduled Finished 2054-01-01 00:00:00
<10: waiting> a Scheduled [] 2054-01-01 00:00:00
<10: waiting> b WaitingForDependencies ['default.a'] 2054-01-01 00:00:00
<11: chain-refreshed a> 4
<12: chain-refreshed b> 40
<13: chain-refreshed> a Scheduled [] Finished 2054-01-01 00:00:01 2056-01-01 00:00:00
<13: chain-refreshed> b Scheduled ['default.a'] Finished 2054-01-24 23:22:21 2056-01-01 00:00:00
<14: waiting for next cycle> a Scheduled [] 2058-01-01 00:00:00
<14: waiting for next cycle> b WaitingForDependencies ['default.a'] 2060-01-01 00:00:00
<15: chain-refreshed a> 6
<16: chain-refreshed b> 60
<17: chain-refreshed> a Scheduled 2062-01-01 00:00:00
<17: chain-refreshed> b Scheduled 2062-01-01 00:00:00
<18: removed dependency> b Scheduled [] 2062-03-03 03:03:03 2064-01-01 00:00:00 5
CREATE MATERIALIZED VIEW default.b\nREFRESH EVERY 2 YEAR\n(\n `y` Int32\n)\nENGINE = MergeTree\nORDER BY y\nSETTINGS index_granularity = 8192\nAS SELECT x * 10 AS y\nFROM default.a
<19: exception> 1
<20: unexception> 1
<21: rename> 1
<22: rename> d Finished
<23: simple refresh> 1
<24: rename during refresh> 1
<25: rename during refresh> f Running
<27: cancelled> f Scheduled
CREATE MATERIALIZED VIEW default.g\nREFRESH EVERY 1 WEEK OFFSET 3 DAY 4 HOUR RANDOMIZE FOR 4 DAY 1 HOUR\n(\n `x` Int64\n)\nENGINE = Memory\nAS SELECT 42
<29: randomize> 1 1
CREATE MATERIALIZED VIEW default.h\nREFRESH EVERY 1 SECOND TO default.dest\n(\n `x` Int64\n)\nAS SELECT x * 10 AS x\nFROM default.src
<30: to existing table> 10
<31: to existing table> 10
<31: to existing table> 20
<32: empty> i Scheduled Unknown
<32: empty> j Scheduled Finished

View File

@ -1,307 +0,0 @@
#!/usr/bin/env bash
# Tags: atomic-database
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# Set session timezone to UTC to make all DateTime formatting and parsing use UTC, because refresh
# scheduling is done in UTC.
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT" | sed 's/--session_timezone[= ][^ ]*//g'`"
CLICKHOUSE_CLIENT="`echo "$CLICKHOUSE_CLIENT --allow_experimental_refreshable_materialized_view=1 --session_timezone Etc/UTC"`"
$CLICKHOUSE_CLIENT -nq "create view refreshes as select * from system.view_refreshes where database = '$CLICKHOUSE_DATABASE' order by view"
# Basic refreshing.
$CLICKHOUSE_CLIENT -nq "
create materialized view a
refresh after 2 second
engine Memory
empty
as select number as x from numbers(2) union all select rand64() as x"
$CLICKHOUSE_CLIENT -nq "select '<1: created view>', view, remaining_dependencies, exception, last_refresh_result in ('Unknown', 'Finished') from refreshes";
$CLICKHOUSE_CLIENT -nq "show create a"
# Wait for any refresh. (xargs trims the string and turns \t and \n into spaces)
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" == 'Unknown' ]
do
sleep 0.1
done
start_time="`$CLICKHOUSE_CLIENT -nq "select reinterpret(now64(), 'Int64')"`"
# Check table contents.
$CLICKHOUSE_CLIENT -nq "select '<2: refreshed>', count(), sum(x=0), sum(x=1) from a"
# Wait for table contents to change.
res1="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values'`"
while :
do
res2="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res2" == "$res1" ] || break
sleep 0.1
done
# Wait for another change.
while :
do
res3="`$CLICKHOUSE_CLIENT -nq 'select * from a order by x format Values -- $LINENO'`"
[ "$res3" == "$res2" ] || break
sleep 0.1
done
# Check that the two changes were at least 1 second apart, in particular that we're not refreshing
# like crazy. This is potentially flaky, but we need at least one test that uses non-mocked timer
# to make sure the clock+timer code works at all. If it turns out flaky, increase refresh period above.
$CLICKHOUSE_CLIENT -nq "
select '<3: time difference at least>', min2(reinterpret(now64(), 'Int64') - $start_time, 1000);
select '<4: next refresh in>', next_refresh_time-last_refresh_time from refreshes;"
# Create a source table from which views will read.
$CLICKHOUSE_CLIENT -nq "
create table src (x Int8) engine Memory as select 1"
# Switch to fake clock, change refresh schedule, change query.
$CLICKHOUSE_CLIENT -nq "
system test view a set fake time '2050-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2050-01-01 00:00:01 2050-01-01 00:00:03' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
alter table a modify refresh every 2 year;
alter table a modify query select x*2 as x from src;
select '<4.5: altered>', status, last_refresh_result, next_refresh_time from refreshes;
show create a;"
# Advance time to trigger the refresh.
$CLICKHOUSE_CLIENT -nq "
select '<5: no refresh>', count() from a;
system test view a set fake time '2052-02-03 04:05:06';"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes -- $LINENO" | xargs`" != '2052-02-03 04:05:06' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<6: refreshed>', * from a;
select '<7: refreshed>', status, last_refresh_result, next_refresh_time from refreshes;"
# Create a dependent view, refresh it once.
$CLICKHOUSE_CLIENT -nq "
create materialized view b refresh every 2 year depends on a (y Int32) engine MergeTree order by y empty as select x*10 as y from a;
show create b;
system test view b set fake time '2052-11-11 11:11:11';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2052-11-11 11:11:11' ]
do
sleep 0.1
done
# Next refresh shouldn't start until the dependency refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<8: refreshed>', * from b;
select '<9: refreshed>', view, status, last_refresh_result, next_refresh_time from refreshes;
system test view b set fake time '2054-01-24 23:22:21';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies 2054-01-01 00:00:00' ]
do
sleep 0.1
done
# Update source table (by dropping and re-creating it - to test that tables are looked up by name
# rather than uuid), kick off refresh of the dependency.
$CLICKHOUSE_CLIENT -nq "
select '<10: waiting>', view, status, remaining_dependencies, next_refresh_time from refreshes;
drop table src;
create table src (x Int16) engine Memory as select 2;
system test view a set fake time '2054-01-01 00:00:01';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled' ]
do
sleep 0.1
done
# Both tables should've refreshed.
$CLICKHOUSE_CLIENT -nq "
select '<11: chain-refreshed a>', * from a;
select '<12: chain-refreshed b>', * from b;
select '<13: chain-refreshed>', view, status, remaining_dependencies, last_refresh_result, last_refresh_time, next_refresh_time, exception from refreshes;"
# Make the dependent table run ahead by one refresh cycle, make sure it waits for the dependency to
# catch up to the same cycle.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2059-01-01 00:00:00';
system refresh view b;"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2060-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2061-01-01 00:00:00';
system test view a set fake time '2057-01-01 00:00:00';"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, next_refresh_time from refreshes -- $LINENO" | xargs`" != 'Scheduled 2058-01-01 00:00:00 WaitingForDependencies 2060-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<14: waiting for next cycle>', view, status, remaining_dependencies, next_refresh_time from refreshes;
truncate src;
insert into src values (3);
system test view a set fake time '2060-02-02 02:02:02';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != '2062-01-01 00:00:00' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<15: chain-refreshed a>', * from a;
select '<16: chain-refreshed b>', * from b;
select '<17: chain-refreshed>', view, status, next_refresh_time from refreshes;"
# Get to WaitingForDependencies state and remove the depencency.
$CLICKHOUSE_CLIENT -nq "
system test view b set fake time '2062-03-03 03:03:03'"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'b' -- $LINENO" | xargs`" != 'WaitingForDependencies' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
alter table b modify refresh every 2 year"
while [ "`$CLICKHOUSE_CLIENT -nq "select status, last_refresh_time from refreshes where view = 'b' -- $LINENO" | xargs`" != 'Scheduled 2062-03-03 03:03:03' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<18: removed dependency>', view, status, remaining_dependencies, last_refresh_time,next_refresh_time, refresh_count from refreshes where view = 'b';
show create b;"
# Select from a table that doesn't exist, get an exception.
$CLICKHOUSE_CLIENT -nq "
drop table a;
drop table b;
create materialized view c refresh every 1 second (x Int64) engine Memory empty as select * from src;
drop table src;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'c' -- $LINENO" | xargs`" != 'Exception' ]
do
sleep 0.1
done
# Check exception, create src, expect successful refresh.
$CLICKHOUSE_CLIENT -nq "
select '<19: exception>', exception ilike '%UNKNOWN_TABLE%' ? '1' : exception from refreshes where view = 'c';
create table src (x Int64) engine Memory as select 1;
system refresh view c;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Rename table.
$CLICKHOUSE_CLIENT -nq "
select '<20: unexception>', * from c;
rename table c to d;
select '<21: rename>', * from d;
select '<22: rename>', view, last_refresh_result from refreshes;"
# Do various things during a refresh.
# First make a nonempty view.
$CLICKHOUSE_CLIENT -nq "
drop table d;
truncate src;
insert into src values (1);
create materialized view e refresh every 1 second (x Int64) engine MergeTree order by x empty as select x + sleepEachRow(1) as x from src settings max_block_size = 1;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
# Stop refreshes.
$CLICKHOUSE_CLIENT -nq "
select '<23: simple refresh>', * from e;
system stop view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Disabled' ]
do
sleep 0.1
done
# Make refreshes slow, wait for a slow refresh to start. (We stopped refreshes first to make sure
# we wait for a slow refresh, not a previous fast one.)
$CLICKHOUSE_CLIENT -nq "
insert into src select * from numbers(1000) settings max_block_size=1;
system start view e;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Rename.
$CLICKHOUSE_CLIENT -nq "
rename table e to f;
select '<24: rename during refresh>', * from f;
select '<25: rename during refresh>', view, status from refreshes where view = 'f';
alter table f modify refresh after 10 year;"
# Cancel.
$CLICKHOUSE_CLIENT -nq "
system cancel view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Cancelled' ]
do
sleep 0.1
done
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" = 'Running' ]
do
sleep 0.1
done
# Check that another refresh doesn't immediately start after the cancelled one.
$CLICKHOUSE_CLIENT -nq "
select '<27: cancelled>', view, status from refreshes where view = 'f';
system refresh view f;"
while [ "`$CLICKHOUSE_CLIENT -nq "select status from refreshes where view = 'f' -- $LINENO" | xargs`" != 'Running' ]
do
sleep 0.1
done
# Drop.
$CLICKHOUSE_CLIENT -nq "
drop table f;
select '<28: drop during refresh>', view, status from refreshes;"
# Try OFFSET and RANDOMIZE FOR.
$CLICKHOUSE_CLIENT -nq "
create materialized view g refresh every 1 week offset 3 day 4 hour randomize for 4 day 1 hour (x Int64) engine Memory empty as select 42;
show create g;
system test view g set fake time '2050-02-03 15:30:13';"
while [ "`$CLICKHOUSE_CLIENT -nq "select next_refresh_time > '2049-01-01' from refreshes -- $LINENO" | xargs`" != '1' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
with '2050-02-10 04:00:00'::DateTime as expected
select '<29: randomize>', abs(next_refresh_time::Int64 - expected::Int64) <= 3600*(24*4+1), next_refresh_time != expected from refreshes;"
# Send data 'TO' an existing table.
$CLICKHOUSE_CLIENT -nq "
drop table g;
create table dest (x Int64) engine MergeTree order by x;
truncate src;
insert into src values (1);
create materialized view h refresh every 1 second to dest empty as select x*10 as x from src;
show create h;"
while [ "`$CLICKHOUSE_CLIENT -nq "select last_refresh_result from refreshes -- $LINENO" | xargs`" != 'Finished' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<30: to existing table>', * from dest;
insert into src values (2);"
while [ "`$CLICKHOUSE_CLIENT -nq "select count() from dest -- $LINENO" | xargs`" != '2' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<31: to existing table>', * from dest;
drop table dest;
drop table src;
drop table h;"
# EMPTY
$CLICKHOUSE_CLIENT -nq "
create materialized view i refresh after 1 year engine Memory empty as select number as x from numbers(2);
create materialized view j refresh after 1 year engine Memory as select number as x from numbers(2)"
while [ "`$CLICKHOUSE_CLIENT -nq "select sum(last_success_time is null) from refreshes -- $LINENO" | xargs`" == '2' ]
do
sleep 0.1
done
$CLICKHOUSE_CLIENT -nq "
select '<32: empty>', view, status, last_refresh_result from refreshes order by view;
drop table i;
drop table j"
$CLICKHOUSE_CLIENT -nq "
drop table refreshes;"

View File

@ -1,4 +1,4 @@
100000000 100000000
100000000 140000000
0 0
1
100000000 100000000

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,43 @@
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set allow_experimental_dynamic_type = 1;
drop table if exists test;
{% for engine in ['Memory', 'MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000', 'MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1'] -%}
create table test (id UInt64, d Dynamic) engine={{ engine }};
insert into test select number, number from numbers(10);
insert into test select number, 'str_' || toString(number) from numbers(10, 10);
insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(20, 10);
insert into test select number, NULL from numbers(30, 10);
insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(40, 40);
insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(10, 10);
select distinct dynamicType(d) as type from test order by type;
select count() from test where dynamicType(d) == 'UInt64';
select count() from test where d.UInt64 is not NULL;
select count() from test where dynamicType(d) == 'String';
select count() from test where d.String is not NULL;
select count() from test where dynamicType(d) == 'Date';
select count() from test where d.Date is not NULL;
select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))';
select count() from test where not empty(d.`Array(Variant(String, UInt64))`);
select count() from test where dynamicType(d) == 'Array(Array(Dynamic))';
select count() from test where not empty(d.`Array(Array(Dynamic))`);
select count() from test where d is NULL;
select count() from test where not empty(d.`Tuple(a Array(Dynamic))`.a.String);
select d, d.UInt64, d.String, d.`Array(Variant(String, UInt64))` from test order by id, d;
select d.UInt64, d.String, d.`Array(Variant(String, UInt64))` from test order by id, d;
select d.Int8, d.Date, d.`Array(String)` from test order by id, d;
select d, d.UInt64, d.Date, d.`Array(Variant(String, UInt64))`, d.`Array(Variant(String, UInt64))`.size0, d.`Array(Variant(String, UInt64))`.UInt64 from test order by id, d;
select d.UInt64, d.Date, d.`Array(Variant(String, UInt64))`, d.`Array(Variant(String, UInt64))`.size0, d.`Array(Variant(String, UInt64))`.UInt64, d.`Array(Variant(String, UInt64))`.String from test order by id, d;
select d, d.`Tuple(a UInt64, b String)`.a, d.`Array(Dynamic)`.`Variant(String, UInt64)`.UInt64, d.`Array(Variant(String, UInt64))`.UInt64 from test order by id, d;
select d.`Array(Dynamic)`.`Variant(String, UInt64)`.UInt64, d.`Array(Dynamic)`.size0, d.`Array(Variant(String, UInt64))`.UInt64 from test order by id, d;
select d.`Array(Array(Dynamic))`.size1, d.`Array(Array(Dynamic))`.UInt64, d.`Array(Array(Dynamic))`.`Map(String, Tuple(a UInt64))`.values.a from test order by id, d;
drop table test;
{% endfor -%}

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type=1;
drop table if exists test;
@ -30,4 +30,4 @@ system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
drop table test;
drop table test;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type=1;
drop table if exists test;
@ -30,4 +30,4 @@ system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
drop table test;
drop table test;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type=1;
drop table if exists test;
@ -31,4 +31,4 @@ system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
drop table test;
drop table test;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type=1;
drop table if exists test;
@ -30,4 +30,4 @@ system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
drop table test;
drop table test;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type = 1;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type = 1;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type = 1;

View File

@ -1,4 +1,4 @@
-- Tags: long
-- Tags: long, no-tsan, no-msan, no-ubsan, no-asan
set allow_experimental_dynamic_type = 1;

View File

@ -0,0 +1,112 @@
5 DateTime
6 Date
7 Array(UInt16)
8 String
10 None
10 UInt64
7 Array(UInt16)
10 None
10 UInt64
19 String
7 Array(UInt16)
10 None
10 UInt64
19 String
20 Map(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
1 Tuple(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
10 None
10 UInt64
20 Map(UInt64, UInt64)
27 String
5 DateTime
6 Date
7 Array(UInt16)
8 String
10 None
10 UInt64
7 Array(UInt16)
10 None
10 UInt64
19 String
7 Array(UInt16)
10 None
10 UInt64
19 String
20 Map(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
1 Tuple(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
10 None
10 UInt64
20 Map(UInt64, UInt64)
27 String
5 DateTime
6 Date
7 Array(UInt16)
8 String
10 None
10 UInt64
7 Array(UInt16)
10 None
10 UInt64
19 String
7 Array(UInt16)
10 None
10 UInt64
19 String
20 Map(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
1 Tuple(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
10 None
10 UInt64
20 Map(UInt64, UInt64)
27 String
5 DateTime
6 Date
7 Array(UInt16)
8 String
10 None
10 UInt64
7 Array(UInt16)
10 None
10 UInt64
19 String
7 Array(UInt16)
10 None
10 UInt64
19 String
20 Map(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
1 Tuple(UInt64, UInt64)
10 None
10 UInt64
20 Map(UInt64, UInt64)
26 String
10 None
10 UInt64
20 Map(UInt64, UInt64)
27 String

View File

@ -0,0 +1,42 @@
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set allow_experimental_dynamic_type = 1;
drop table if exists test;
{% for engine in ['MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000',
'MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000, vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=1',
'MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1',
'MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1, vertical_merge_algorithm_min_rows_to_activate=1, vertical_merge_algorithm_min_columns_to_activate=1'] -%}
create table test (id UInt64, d Dynamic(max_types=3)) engine={{ engine }};
system stop merges test;
insert into test select number, number from numbers(10);
insert into test select number, 'str_' || toString(number) from numbers(8);
insert into test select number, range(number % 10 + 1) from numbers(7);
insert into test select number, toDate(number) from numbers(6);
insert into test select number, toDateTime(number) from numbers(5);
insert into test select number, NULL from numbers(10);
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
system start merges test; optimize table test final;;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
system stop merges test;
insert into test select number, map(number, number) from numbers(20);
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
system stop merges test;
insert into test select number, tuple(number, number) from numbers(1);
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
system start merges test;
optimize table test final;
select count(), dynamicType(d) from test group by dynamicType(d) order by count(), dynamicType(d);
drop table test;
{% endfor -%}

View File

@ -1,8 +1,8 @@
100000000 100000000
100000000 100000000
100000000 100000000
100000000 140000000
100000000 140000000
100000000 140000000
0 0
100000000 100000000
100000000 140000000
0 0
0 0
1

View File

@ -1,4 +1,4 @@
100000000 100000000
100000000 100000000
100000000 140000000
100000000 140000000
0 0
0 0

View File

@ -1,4 +1,4 @@
Memory
--- Memory ---
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
@ -17,7 +17,7 @@ UInt64
10
20
0
MergeTree compact
--- MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000 ---
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))
@ -36,7 +36,7 @@ UInt64
10
20
0
MergeTree wide
--- MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1 ---
test
Array(Array(Dynamic))
Array(Variant(String, UInt64))

View File

@ -1,60 +0,0 @@
#!/usr/bin/env bash
# Tags: long
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
CH_CLIENT="$CLICKHOUSE_CLIENT --allow_experimental_variant_type=1 --use_variant_as_common_type=1 --allow_experimental_dynamic_type=1 --optimize_functions_to_subcolumns=0"
function test()
{
echo "test"
$CH_CLIENT -q "insert into test select number, number from numbers(10) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, 'str_' || toString(number) from numbers(10, 10) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(20, 10) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, NULL from numbers(30, 10) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(40, 40) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(10, 10) settings min_insert_block_size_rows=50000"
$CH_CLIENT -q "select distinct dynamicType(d) as type from test order by type"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'UInt64'"
$CH_CLIENT -q "select count() from test where d.UInt64 is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'String'"
$CH_CLIENT -q "select count() from test where d.String is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Date'"
$CH_CLIENT -q "select count() from test where d.Date is not NULL"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Variant(String, UInt64))\`)"
$CH_CLIENT -q "select count() from test where dynamicType(d) == 'Array(Array(Dynamic))'"
$CH_CLIENT -q "select count() from test where not empty(d.\`Array(Array(Dynamic))\`)"
$CH_CLIENT -q "select count() from test where d is NULL"
$CH_CLIENT -q "select count() from test where not empty(d.\`Tuple(a Array(Dynamic))\`.a.String)"
$CH_CLIENT -q "select d, d.UInt64.null, d.String.null, d.\`Array(Variant(String, UInt64))\`.null from test format Null"
$CH_CLIENT -q "select d.UInt64.null, d.String.null, d.\`Array(Variant(String, UInt64))\`.null from test format Null"
$CH_CLIENT -q "select d.Int8.null, d.Date.null, d.\`Array(String)\`.null from test format Null"
$CH_CLIENT -q "select d, d.UInt64.null, d.Date.null, d.\`Array(Variant(String, UInt64))\`.null, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64.null from test format Null"
$CH_CLIENT -q "select d.UInt64.null, d.Date.null, d.\`Array(Variant(String, UInt64))\`.null, d.\`Array(Variant(String, UInt64))\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64.null, d.\`Array(Variant(String, UInt64))\`.String.null from test format Null"
$CH_CLIENT -q "select d, d.\`Tuple(a UInt64, b String)\`.a, d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64.null, d.\`Array(Variant(String, UInt64))\`.UInt64.null from test format Null"
$CH_CLIENT -q "select d.\`Array(Dynamic)\`.\`Variant(String, UInt64)\`.UInt64.null, d.\`Array(Dynamic)\`.size0, d.\`Array(Variant(String, UInt64))\`.UInt64.null from test format Null"
$CH_CLIENT -q "select d.\`Array(Array(Dynamic))\`.size1, d.\`Array(Array(Dynamic))\`.UInt64.null, d.\`Array(Array(Dynamic))\`.\`Map(String, Tuple(a UInt64))\`.values.a from test format Null"
}
$CH_CLIENT -q "drop table if exists test;"
echo "Memory"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=Memory"
test
$CH_CLIENT -q "drop table test;"
echo "MergeTree compact"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000;"
test
$CH_CLIENT -q "drop table test;"
echo "MergeTree wide"
$CH_CLIENT -q "create table test (id UInt64, d Dynamic) engine=MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1;"
test
$CH_CLIENT -q "drop table test;"

View File

@ -0,0 +1,49 @@
# Tags: long
set allow_experimental_variant_type = 1;
set use_variant_as_common_type = 1;
set allow_experimental_dynamic_type = 1;
set optimize_functions_to_subcolumns = 0;
drop table if exists test;
{% for engine in ['Memory', 'MergeTree order by id settings min_rows_for_wide_part=1000000000, min_bytes_for_wide_part=10000000000', 'MergeTree order by id settings min_rows_for_wide_part=1, min_bytes_for_wide_part=1'] -%}
SELECT '--- {{ engine }} ---';
create table test (id UInt64, d Dynamic) engine={{ engine }};
select 'test';
insert into test select number, number from numbers(10) settings min_insert_block_size_rows=50000;
insert into test select number, 'str_' || toString(number) from numbers(10, 10) settings min_insert_block_size_rows=50000;
insert into test select number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1)) from numbers(20, 10) settings min_insert_block_size_rows=50000;
insert into test select number, NULL from numbers(30, 10) settings min_insert_block_size_rows=50000;
insert into test select number, multiIf(number % 4 == 3, 'str_' || toString(number), number % 4 == 2, NULL, number % 4 == 1, number, arrayMap(x -> multiIf(number % 9 == 0, NULL, number % 9 == 3, 'str_' || toString(number), number), range(number % 10 + 1))) from numbers(40, 40) settings min_insert_block_size_rows=50000;
insert into test select number, [range((number % 10 + 1)::UInt64)]::Array(Array(Dynamic)) from numbers(10, 10) settings min_insert_block_size_rows=50000;
select distinct dynamicType(d) as type from test order by type;
select count() from test where dynamicType(d) == 'UInt64';
select count() from test where d.UInt64 is not NULL;
select count() from test where dynamicType(d) == 'String';
select count() from test where d.String is not NULL;
select count() from test where dynamicType(d) == 'Date';
select count() from test where d.Date is not NULL;
select count() from test where dynamicType(d) == 'Array(Variant(String, UInt64))';
select count() from test where not empty(d.`Array(Variant(String, UInt64))`);
select count() from test where dynamicType(d) == 'Array(Array(Dynamic))';
select count() from test where not empty(d.`Array(Array(Dynamic))`);
select count() from test where d is NULL;
select count() from test where not empty(d.`Tuple(a Array(Dynamic))`.a.String);
select d, d.UInt64.null, d.String.null, d.`Array(Variant(String, UInt64))`.null from test format Null;
select d.UInt64.null, d.String.null, d.`Array(Variant(String, UInt64))`.null from test format Null;
select d.Int8.null, d.Date.null, d.`Array(String)`.null from test format Null;
select d, d.UInt64.null, d.Date.null, d.`Array(Variant(String, UInt64))`.null, d.`Array(Variant(String, UInt64))`.size0, d.`Array(Variant(String, UInt64))`.UInt64.null from test format Null;
select d.UInt64.null, d.Date.null, d.`Array(Variant(String, UInt64))`.null, d.`Array(Variant(String, UInt64))`.size0, d.`Array(Variant(String, UInt64))`.UInt64.null, d.`Array(Variant(String, UInt64))`.String.null from test format Null;
select d, d.`Tuple(a UInt64, b String)`.a, d.`Array(Dynamic)`.`Variant(String, UInt64)`.UInt64.null, d.`Array(Variant(String, UInt64))`.UInt64.null from test format Null;
select d.`Array(Dynamic)`.`Variant(String, UInt64)`.UInt64.null, d.`Array(Dynamic)`.size0, d.`Array(Variant(String, UInt64))`.UInt64.null from test format Null;
select d.`Array(Array(Dynamic))`.size1, d.`Array(Array(Dynamic))`.UInt64.null, d.`Array(Array(Dynamic))`.`Map(String, Tuple(a UInt64))`.values.a from test format Null;
drop table test;
{% endfor -%}

View File

@ -0,0 +1,2 @@
x Nullable(Int64)
schema_inference_hints=, max_rows_to_read_for_schema_inference=25000, max_bytes_to_read_for_schema_inference=1000, schema_inference_make_columns_nullable=true, try_infer_integers=true, try_infer_dates=true, try_infer_datetimes=true, try_infer_numbers_from_strings=false, read_bools_as_numbers=true, read_bools_as_strings=true, read_objects_as_strings=true, read_numbers_as_strings=true, read_arrays_as_strings=true, try_infer_objects_as_tuples=true, infer_incomplete_types_as_strings=true, try_infer_objects=false, use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects=false

View File

@ -0,0 +1,13 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
echo '{"x" : 42}' > $CLICKHOUSE_TEST_UNIQUE_NAME.json
$CLICKHOUSE_LOCAL -nm -q "
DESC file('$CLICKHOUSE_TEST_UNIQUE_NAME.json') SETTINGS input_format_max_bytes_to_read_for_schema_inference=1000;
SELECT additional_format_info from system.schema_inference_cache"
rm $CLICKHOUSE_TEST_UNIQUE_NAME.json

View File

@ -318,6 +318,7 @@ std_cerr_cout_excludes=(
src/Interpreters/Context.cpp
# IProcessor::dump()
src/Processors/IProcessor.cpp
src/Client/ClientApplicationBase.cpp
src/Client/ClientBase.cpp
src/Client/LineReader.cpp
src/Client/QueryFuzzer.cpp